Come calcolare il numero migliore di partizioni per coalizzarsi?

Quindi, capisco che in generale si dovrebbe usare coalesce() quando:

il numero di partizioni diminuisce a causa di un filter o di qualche altra operazione che può comportare la riduzione del set di dati originale (RDD, DF). coalesce() è utile per eseguire le operazioni in modo più efficiente dopo aver filtrato un set di dati di grandi dimensioni.

Capisco anche che è meno costoso della repartition in quanto riduce il rimescolamento spostando i dati solo se necessario. Il mio problema è come definire il parametro che assume la coalesce ( idealPartionionNo ). Sto lavorando a un progetto che mi è stato trasmesso da un altro ingegnere e che stava usando il calcolo sottostante per calcolare il valore di quel parametro.

 // DEFINE OPTIMAL PARTITION NUMBER implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5) implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2) val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR 

Questo viene quindi utilizzato con un object partitioner :

 val partitioner = new HashPartitioner(idealPartionionNo) 

ma anche usato con:

 RDD.filter(x=>x._3<30).coalesce(idealPartionionNo) 

È questo l’approccio giusto? Qual è l’idea principale alla base del calcolo del valore idealPartionionNo ? Che cos’è REPARTITION_FACTOR ? Come faccio generalmente a definirlo?

Inoltre, poiché YARN è responsabile dell’identificazione degli executors disponibili al volo, esiste un modo per ottenere quel numero ( AVAILABLE_EXECUTOR_INSTANCES ) al volo e utilizzarlo per calcolare idealPartionionNo (cioè sostituire NO_OF_EXECUTOR_INSTANCES con AVAILABLE_EXECUTOR_INSTANCES )?

Idealmente, alcuni esempi reali del modulo:

  • Ecco un dataset ( dimensione );
  • Ecco una serie di trasformazioni e possibili riutilizzi di un RDD / DF.
  • Qui è dove devi ripartire / coalizzarsi.
  • Supponiamo di avere n esecutori con m core e un fattore di partizione uguale a k

poi:

  • Il numero ideale di partizioni sarebbe ==> ???

Inoltre, se mi puoi riferire ad un bel blog che spiega questi, lo apprezzerei molto.

In pratica il numero ottimale di partizioni dipende più dai dati che si hanno, dalle trasformazioni che si utilizzano e dalla configurazione generale rispetto alle risorse disponibili.

  • Se il numero di partizioni è troppo basso, si verificheranno lunghe pause GC, diversi tipi di problemi di memoria e infine utilizzo ottimale delle risorse.
  • Se il numero di partizioni è troppo alto, i costi di manutenzione possono facilmente superare il costo di elaborazione. Inoltre, se si utilizzano operazioni di riduzione non distribuite (come la reduce in contrasto con treeReduce ), un numero elevato di partizioni comporta un carico più elevato sul driver.

È ansible trovare una serie di regole che suggeriscono l’oversubscribing delle partizioni rispetto al numero di core (il fattore 2 o 3 sembra essere comune) o mantenere le partizioni con una certa dimensione ma questo non tiene conto del proprio codice:

  • Se si assegna molto, ci si può aspettare lunghe pause del GC ed è probabilmente meglio andare con partizioni più piccole.
  • Se un certo pezzo di codice è costoso, allora il tuo costo shuffle può essere ammortizzato da una concorrenza più alta.
  • Se si dispone di un filtro, è ansible regolare il numero di partizioni in base a una potenza discriminante del predicato (si prendono decisioni diverse se si prevede di mantenere il 5% dei dati e il 99% dei dati).

Secondo me:

  • Con lavori una tantum mantenere partizioni di numeri più elevate per stare dalla parte della sicurezza (più lento è meglio che fallire).
  • Con i lavori riutilizzabili iniziare con una configurazione conservativa quindi eseguire – monitorare – regolare la configurazione – ripetere.
  • Non provare a utilizzare un numero fisso di partizioni in base al numero di esecutori o core. Prima capisci i tuoi dati e il codice, quindi regola la configurazione per riflettere la tua comprensione.

    Di solito, è relativamente facile determinare la quantità di dati grezzi per partizione per i quali il cluster mostra un comportamento stabile (nella mia esperienza si trova da qualche parte nell’intervallo di poche centinaia di megabyte, a seconda del formato, della struttura dati utilizzata per caricare i dati, e configurazione). Questo è il “numero magico” che stai cercando.

Alcune cose che devi ricordare in generale:

  • Il numero di partizioni non riflette necessariamente la distribuzione dei dati. Qualsiasi operazione che richiede shuffle ( *byKey , join , RDD.partitionBy , Dataset.repartition ) può comportare una distribuzione dei dati non uniforms. Monitora sempre i tuoi lavori per i sintomi di una significativa inclinazione dei dati.
  • Il numero di partizioni in generale non è costante. Qualsiasi operazione con più dipendenze ( union , coGroup , join ) può influire sul numero di partizioni.

La tua domanda è valida, ma l’ottimizzazione del partizionamento Spark dipende interamente dal calcolo che stai eseguendo. È necessario avere una buona ragione per ripartizionare / coalizzarsi; se stai solo contando un RDD (anche se ha un numero enorme di partizioni scarsamente popolate), qualsiasi passaggio di ripartizione / coalizione ti farà rallentare.

Ripartizione vs coalizione

La differenza tra la repartition(n) (che coalesce(n, shuffle = true) a coalesce(n, shuffle = true) e coalesce(n, shuffle = false) ha a che fare con il modello di esecuzione. Il modello shuffle prende ogni partizione nell’RDD originale, invia casualmente fornisce i dati a tutti gli esecutori e genera un RDD con il nuovo (più piccolo o più grande) numero di partizioni. Il modello no-shuffle crea un nuovo RDD che carica più partizioni come un’unica attività.

Consideriamo questo calcolo:

 sc.textFile("massive_file.txt") .filter(sparseFilterFunction) // leaves only 0.1% of the lines .coalesce(numPartitions, shuffle = shuffle) 

Se shuffle è true , i calcoli del file / filtro di testo si verificano in un certo numero di attività date dai valori predefiniti in textFile e i risultati dei piccoli filtri vengono mescolati. Se shuffle è false , il numero di task totali è al massimo numPartitions .

Se numPartitions è 1, la differenza è piuttosto netta. Il modello shuffle elaborerà e filtrerà i dati in parallelo, quindi invierà lo 0,1% dei risultati filtrati a un executor per le operazioni DAG downstream. Il modello no-shuffle elaborerà e filtrerà tutti i dati su un core dall’inizio.

Passi da fare

Considera le tue operazioni a valle. Se si utilizza questo set di dati solo una volta, probabilmente non è necessario ripartizionarlo. Se si sta salvando l’RDD filtrato per un uso successivo (su disco, ad esempio), considerare i compromessi sopra riportati. Ci vuole esperienza per familiarizzare con questi modelli e quando uno si comporta meglio, quindi prova entrambi e guarda come si comportano!

Come altri hanno risposto, non esiste una formula che calcoli ciò che chiedi. Detto questo, puoi fare un’ipotesi istruita sulla prima parte e perfezionarla nel tempo.

Il primo passo è assicurarsi di avere abbastanza partizioni. Se hai NO_OF_EXECUTOR_INSTANCES executors e NO_OF_EXECUTOR_CORES core per executor, puoi elaborare NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES partizioni allo stesso tempo (ognuna andrebbe a un nucleo specifico di un’istanza specifica). Detto ciò, si presume che tutto sia diviso in parti uguali tra i core e tutto impiega esattamente lo stesso tempo per elaborare. Questo è raramente il caso. C’è una buona probabilità che alcuni di essi siano terminati prima degli altri a causa della località (ad esempio i dati devono provenire da un nodo diverso) o semplicemente perché non sono bilanciati (ad esempio se i dati sono partizionati dal dominio principale e quindi partizioni incluse google sarebbe probabilmente abbastanza grande). È qui che entra in gioco REPARTITION_FACTOR. L’idea è che noi “prenotiamo” ogni core e quindi se uno finisce molto velocemente e uno finisce lentamente abbiamo la possibilità di dividere i compiti tra loro. Un fattore 2-3 è generalmente una buona idea.

Ora diamo un’occhiata alle dimensioni di una singola partizione. Diciamo che tutti i tuoi dati hanno dimensioni di X MB e hai N partizioni. Ogni partizione sarebbe in media X / N MB. Se N è grande rispetto a X, potresti avere dimensioni di partizione medie molto piccole (ad esempio pochi KB). In questo caso, di solito è una buona idea abbassare N perché il sovraccarico di gestione di ogni partizione diventa troppo alto. D’altra parte, se la dimensione è molto grande (ad esempio un paio di GB), è necessario tenere un sacco di dati allo stesso tempo, il che causerebbe problemi come la raccolta dei dati inutili, l’utilizzo di memoria elevato ecc.

La dimensione ottimale è una buona domanda, ma in genere le persone sembrano preferire partizioni di 100-1000 MB, ma in realtà probabilmente decine di MB sarebbero anche buone.

Un’altra cosa che dovresti notare è quando fai il calcolo su come cambiano le tue partizioni. Ad esempio, diciamo che inizi con 1000 partizioni da 100 MB ciascuna, ma poi filtri i dati in modo che ogni partizione diventi 1K, quindi dovresti probabilmente coalizzarli. Problemi simili possono accadere quando si fa un groupby o si partecipa. In questi casi, la dimensione della partizione e il numero di partizioni cambiano e potrebbero raggiungere dimensioni indesiderate.