Elaborazione di file multipli come RDD indipendenti in parallelo

Ho uno scenario in cui un certo numero di operazioni incluso un gruppo deve essere applicato su un numero di file piccoli (~ 300 MB ciascuno). L’operazione sembra così ..

df.groupBy(....).agg(....)

Ora per elaborarlo su più file, posso usare un carattere jolly “/**/*.csv”, tuttavia, che crea un singolo RDD e lo suddivide in per le operazioni. Tuttavia, osservando le operazioni, si tratta di un gruppo di e coinvolge molto shuffle che non è necessario se i file si escludono a vicenda.

Quello che sto guardando è un modo in cui posso creare file RDD indipendenti su file e operare su di essi in modo indipendente.

È più un’idea che una soluzione completa e non l’ho ancora testata.

È ansible iniziare con l’estrazione della pipeline di elaborazione dati in una funzione.

 def pipeline(f: String, n: Int) = { sqlContext .read .format("com.databricks.spark.csv") .option("header", "true") .load(f) .repartition(n) .groupBy(...) .agg(...) .cache // Cache so we can force computation later } 

Se i file sono piccoli, è ansible regolare il parametro n da utilizzare il minor numero di partizioni ansible per adattare i dati da un singolo file ed evitare di mischiare. Significa che stai limitando la concorrenza, ma torneremo su questo argomento più tardi.

 val n: Int = ??? 

Quindi è necessario ottenere un elenco di file di input. Questo passaggio dipende da un’origine dati, ma la maggior parte delle volte è più o meno semplice:

 val files: Array[String] = ??? 

Successivamente puoi mappare la lista sopra usando la funzione pipeline :

 val rdds = files.map(f => pipeline(f, n)) 

Poiché limitiamo la concorrenza al livello del singolo file, vogliamo compensare inviando più lavori. Aggiungiamo un semplice aiuto che costringe la valutazione e lo avvolge con Future

 import scala.concurrent._ import ExecutionContext.Implicits.global def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future { df.rdd.foreach(_ => ()) // Force computation df } 

Finalmente possiamo usare l’helper sopra il rdds :

 val result = Future.sequence( rdds.map(rdd => pipelineToFuture(rdd)).toList ) 

A seconda delle esigenze, è ansible aggiungere callback onComplete o utilizzare flussi reattivi per raccogliere i risultati.

Se hai molti file e ogni file è piccolo (dici 300MB al di sopra del quale potrei contare come piccolo per Spark), potresti provare a utilizzare SparkContext.wholeTextFiles che creerà un RDD in cui ogni record è un intero file.