Scrivi un singolo file CSV usando spark-csv

Sto usando https://github.com/databricks/spark-csv , sto provando a scrivere un singolo CSV, ma non in grado di farlo, sta facendo una cartella.

Serve una funzione Scala che assuma parametri come il percorso e il nome del file e scriva quel file CSV.

Sta creando una cartella con più file, perché ogni partizione viene salvata individualmente. Se hai bisogno di un singolo file di output (sempre in una cartella) puoi repartition (preferito se i dati upstream sono grandi, ma richiede un shuffle):

 df .repartition(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv") 

o coalesce :

 df .coalesce(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv") 

data frame prima di salvare:

Tutti i dati verranno scritti su mydata.csv/part-00000 . Prima di utilizzare questa opzione assicurati di capire cosa sta succedendo e qual è il costo del trasferimento di tutti i dati a un singolo lavoratore . Se si utilizza il file system distribuito con la replica, i dati verranno trasferiti più volte, prima recuperati su un singolo worker e successivamente distribuiti su nodes di archiviazione.

In alternativa puoi lasciare il tuo codice così com’è e utilizzare strumenti generici come cat o HDFS getmerge per unire semplicemente tutte le parti in seguito.

Se stai utilizzando Spark con HDFS, ho risolto il problema scrivendo file CSV normalmente e sfruttando HDFS per fare la fusione. Lo faccio direttamente in Spark (1.6):

 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ def merge(srcPath: String, dstPath: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) // the "true" setting deletes the source files once they are merged into the new output } val newData = << create your dataframe >> val outputfile = "/user/feeds/project/outputs/subject" var filename = "myinsights" var outputFileName = outputfile + "/temp_" + filename var mergedFileName = outputfile + "/merged_" + filename var mergeFindGlob = outputFileName newData.write .format("com.databricks.spark.csv") .option("header", "false") .mode("overwrite") .save(outputFileName) merge(mergeFindGlob, mergedFileName ) newData.unpersist() 

Non ricordo dove ho imparato questo trucco, ma potrebbe funzionare per te.

Potrei essere un po ‘in ritardo nel gioco, ma usare coalesce(1) o repartition(1) potrebbe funzionare per piccoli set di dati, ma i set di dati di grandi dimensioni verrebbero tutti gettati in una partizione su un nodo. È probabile che crei errori OOM o, al massimo, proceda lentamente.

Suggerisco caldamente di utilizzare la funzione FileUtil.copyMerge() dall’API Hadoop. Questo unirà le uscite in un singolo file.

EDIT – Questo porta efficacemente i dati al driver piuttosto che un nodo executor. Coalesce() andrebbe bene se un singolo executor ha più RAM per l’uso rispetto al driver.

EDIT 2: copyMerge() viene rimosso in Hadoop 3.0. Vedere il seguente articolo sull’overflow dello stack per ulteriori informazioni su come utilizzare la versione più recente: Hadoop come eseguire CopyMerge in Hadoop 3.0

Se si utilizzano i databricks e si possono inserire tutti i dati nella RAM su un worker (e quindi possono utilizzare .coalesce(1) ), è ansible utilizzare dbfs per trovare e spostare il file CSV risultante:

 val fileprefix= "/mnt/aws/path/file-prefix" dataset .coalesce(1) .write //.mode("overwrite") // I usually don't use this, but you may want to. .option("header", "true") .option("delimiter","\t") .csv(fileprefix+".tmp") val partition_path = dbutils.fs.ls(fileprefix+".tmp/") .filter(file=>file.name.endsWith(".csv"))(0).path dbutils.fs.cp(partition_path,fileprefix+".tab") dbutils.fs.rm(fileprefix+".tmp",recurse=true) 

Se il tuo file non si adatta alla RAM sul lavoratore, potresti prendere in considerazione il suggerimento di chaotic3quilibrium di usare FileUtils.copyMerge () . Non l’ho fatto e non so ancora se è ansible o no, ad es. Su S3.

Questa risposta è basata su risposte precedenti a questa domanda, nonché sui miei test dello snippet di codice fornito. Inizialmente l’ho pubblicato su Databricks e lo sto ripubblicando qui.

La migliore documentazione per l’opzione ricorsiva di dbfs rm che ho trovato è su un forum di Databricks .

ripartizione / coalizione in 1 partizione prima di salvare (si otterrebbe comunque una cartella ma in essa sarebbe presente un file di parte)

puoi usare rdd.coalesce(1, true).saveAsTextFile(path)

memorizzerà i dati come file singile in path / part-00000

C’è un altro modo di usare Java

 import java.io._ def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) { val p = new java.io.PrintWriter(f); try { op(p) } finally { p.close() } } printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}