Scrivi su più uscite con il tasto Spark – un lavoro Spark

Come si può scrivere su più uscite a seconda della chiave usando Spark in un singolo Job.

Correlati: scrivere su più uscite tramite il tasto Scalding Hadoop, un lavoro MapReduce

Per esempio

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .writeAsMultiple(prefix, compressionCodecOption) 

garantirebbe il cat prefix/1

 a b 

e il cat prefix/2 sarebbe

 c 

Risposta

Per una risposta esatta con il codec completo di importazione, pimp e compressione, consultare https://stackoverflow.com/a/46118044/1586965

Se usi Spark 1.4+, questo è diventato molto più semplice grazie all’API DataFrame . (I DataFrames sono stati introdotti in Spark 1.3, ma partitionBy() , di cui abbiamo bisogno, è stato introdotto in 1.4 ).

Se stai iniziando con un RDD, devi prima convertirlo in un DataFrame:

 val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie"))) val people_df = people_rdd.toDF("number", "name") 

In Python, questo stesso codice è:

 people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")]) people_df = people_rdd.toDF(["number", "name"]) 

Una volta che hai un DataFrame, scrivere su più uscite in base a una particolare chiave è semplice. Cosa c’è di più – e questa è la bellezza dell’API DataFrame – il codice è praticamente lo stesso su Python, Scala, Java e R:

 people_df.write.partitionBy("number").text("people") 

E puoi facilmente usare altri formati di output se vuoi:

 people_df.write.partitionBy("number").json("people-json") people_df.write.partitionBy("number").parquet("people-parquet") 

In ognuno di questi esempi, Spark creerà una sottodirectory per ciascuna delle chiavi su cui abbiamo partizionato DataFrame:

 people/ _SUCCESS number=1/ part-abcd part-efgh number=2/ part-abcd part-efgh 

Lo farei come questo, che è scalabile

 import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } object Split { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Split" + args(1)) val sc = new SparkContext(conf) sc.textFile("input/path") .map(a => (k, v)) // Your own implementation .partitionBy(new HashPartitioner(num)) .saveAsHadoopFile("output/path", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() } } 

Ho appena visto una risposta simile sopra, ma in realtà non abbiamo bisogno di partizioni personalizzate. MultipleTextOutputFormat creerà il file per ogni chiave. Va bene che più record con le stesse chiavi cadano nella stessa partizione.

nuovo HashPartitioner (num), dove num è il numero di partizione desiderato. Se hai un gran numero di chiavi diverse, puoi impostare il numero su grande. In questo caso, ciascuna partizione non aprirà troppi gestori di file hdfs.

Se potenzialmente hai molti valori per una determinata chiave, penso che la soluzione scalabile sia quella di scrivere un file per chiave per partizione. Sfortunatamente non c’è un supporto integrato per questo in Spark, ma possiamo sferzare qualcosa.

 sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .mapPartitionsWithIndex { (p, it) => val outputs = new MultiWriter(p.toString) for ((k, v) <- it) { outputs.write(k.toString, v) } outputs.close Nil.iterator } .foreach((x: Nothing) => ()) // To trigger the job. // This one is Local, but you could write one for HDFS class MultiWriter(suffix: String) { private val writers = collection.mutable.Map[String, java.io.PrintWriter]() def write(key: String, value: Any) = { if (!writers.contains(key)) { val f = new java.io.File("output/" + key + "/" + suffix) f.getParentFile.mkdirs writers(key) = new java.io.PrintWriter(f) } writers(key).println(value) } def close = writers.values.foreach(_.close) } 

(Sostituire PrintWriter con la scelta del funzionamento del file system distribuito).

Questo esegue un passaggio singolo su RDD e non esegue shuffle. Ti dà una directory per chiave, con un numero di file all’interno di ciascuno.

Ciò include il codec richiesto, le importazioni necessarie e il magnaccia come richiesto.

 import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) { def writeAsMultiple(prefix: String, codec: String, keyName: String = "key") (implicit sqlContext: SQLContext): Unit = { import sqlContext.implicits._ rdd.toDF(keyName, "_2").write.partitionBy(keyName) .format("text").option("codec", codec).save(prefix) } } val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

Una sottile differenza rispetto = è che farà = ai nomi delle directory. Per esempio

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

Darebbe:

 prefix/key=1/part-00000 prefix/key=2/part-00000 

dove prefix/my_number=1/part-00000 conterrebbe le righe b , e prefix/my_number=2/part-00000 conterrà la riga c .

E

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo") 

Darebbe:

 prefix/foo=1/part-00000 prefix/foo=2/part-00000 

Dovrebbe essere chiaro come modificare per il parquet .

Finalmente di seguito è un esempio per Dataset , che è forse più bello che usare Tuples.

 implicit class PimpedDataset[T](dataset: Dataset[T]) { def writeAsMultiple(prefix: String, codec: String, field: String): Unit = { dataset.write.partitionBy(field) .format("text").option("codec", codec).save(prefix) } } 

Ho un bisogno simile e ho trovato un modo. Ma ha uno svantaggio (che non è un problema per il mio caso): è necessario ri-partizionare i dati con una partizione per file di output.

Per suddividere in questo modo, in genere, è necessario conoscere in anticipo quanti file verrà generato dal lavoro e trovare una funzione che mapperà ciascuna chiave per ogni partizione.

Per prima cosa creiamo la nostra class basata su MultipleTextOutputFormat:

 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] { override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = { key.toString } override protected def generateActualKey(key: T, value: V) = { null } } 

Con questa class Spark otterrà una chiave da una partizione (la prima / l'ultima, credo) e assegnerà il nome al file con questa chiave, quindi non è bene mescolare più chiavi sulla stessa partizione.

Per il tuo esempio, avrai bisogno di un partizionatore personalizzato. Questo farà il lavoro:

 import org.apache.spark.Partitioner class IdentityIntPartitioner(maxKey: Int) extends Partitioner { def numPartitions = maxKey def getPartition(key: Any): Int = key match { case i: Int if i < maxKey => i } } 

Ora mettiamo tutto insieme:

 val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e"))) // You need to know the max number of partitions (files) beforehand // In this case we want one partition per key and we have 3 keys, // with the biggest key being 7, so 10 will be large enough val partitioner = new IdentityIntPartitioner(10) val prefix = "hdfs://.../prefix" val partitionedRDD = rdd.partitionBy(partitioner) partitionedRDD.saveAsHadoopFile(prefix, classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]]) 

Questo genererà 3 file sotto il prefisso (denominati 1, 2 e 7), elaborando tutto in un unico passaggio.

Come puoi vedere, hai bisogno di una conoscenza delle tue chiavi per poter usare questa soluzione.

Per me è stato più facile perché avevo bisogno di un file di output per ogni hash delle chiavi e il numero di file era sotto il mio controllo, quindi avrei potuto usare l'HashPartitioner di serie per fare il trucco.

saveAsText () e saveAsHadoop (…) sono implementati in base ai dati RDD, in particolare con il metodo: PairRDD.saveAsHadoopDataset che preleva i dati dal PairRdd in cui è eseguito. Vedo due possibili opzioni: se i dati sono di dimensioni relativamente ridotte, è ansible risparmiare alcuni tempi di implementazione raggruppandoli su RDD, creando un nuovo RDD da ciascuna raccolta e utilizzando tale RDD per scrivere i dati. Qualcosa come questo:

 val byKey = dataRDD.groupByKey().collect() val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)} val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k} 

Si noti che non funzionerà con dataset di grandi dimensioni b / c la materializzazione v.toSeq su v.toSeq potrebbe non rientrare in memoria.

L’altra opzione che vedo, e in realtà quella che raccomanderei in questo caso è: roll your own, chiamando direttamente l’hadoop / hdfs api.

Ecco una discussione che ho iniziato durante la ricerca di questa domanda: come creare RDD da un altro RDD?

Avevo bisogno della stessa cosa in Java. Pubblicando la mia traduzione della risposta Scala di Zhang Zhan agli utenti API di Spark Java:

 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat { @Override protected String generateFileNameForKeyValue(A key, B value, String name) { return key.toString(); } } public class Main { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("Split Job") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"}; sc.parallelize(Arrays.asList(strings)) // The first character of the string is the key .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s)) .saveAsHadoopFile("output/", String.class, String.class, RDDMultipleTextOutputFormat.class); sc.stop(); } } 

Ho avuto un caso di utilizzo simile in cui ho diviso il file di input su Hadoop HDFS in più file in base a una chiave (1 file per chiave). Ecco il mio codice scala per la scintilla

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; val hadoopconf = new Configuration(); val fs = FileSystem.get(hadoopconf); @serializable object processGroup { def apply(groupName:String, records:Iterable[String]): Unit = { val outFileStream = fs.create(new Path("/output_dir/"+groupName)) for( line <- records ) { outFileStream.writeUTF(line+"\n") } outFileStream.close() } } val infile = sc.textFile("input_file") val dateGrouped = infile.groupBy( _.split(",")(0)) dateGrouped.foreach( (x) => processGroup(x._1, x._2)) 

Ho raggruppato i record in base alla chiave. I valori per ogni chiave sono scritti in file separati.

buone notizie per l’utente python nel caso tu abbia multi colonne e vuoi salvare tutte le altre colonne non partizionate in formato csv che falliscono se usi il metodo “text” come suggerimento di Nick Chammas.

 people_df.write.partitionBy("number").text("people") 

il messaggio di errore è “AnalysisException: u’L’origine dati del testo supporta solo una singola colonna e hai 2 colonne.”

Nella scintilla 2.0.0 (il mio ambiente di test è hdp’s spark 2.0.0) il pacchetto “com.databricks.spark.csv” è ora integrato e ci consente di salvare il file di testo partizionato da una sola colonna, vedere il colpo di esempio:

 people_rdd = sc.parallelize([(1,"2016-12-26", "alice"), (1,"2016-12-25", "alice"), (1,"2016-12-25", "tom"), (1, "2016-12-25","bob"), (2,"2016-12-26" ,"charlie")]) df = people_rdd.toDF(["number", "date","name"]) df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people") [[email protected] people]# tree . ├── number=1 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv ├── number=2 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv └── _SUCCESS [[email protected] people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,alice 2016-12-25,alice 2016-12-25,tom 2016-12-25,bob [[email protected] people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,charlie 

Nel mio ambiente scintilla 1.6.1, il codice non ha generato alcun errore, tuttavia è solo un file generato. non è partizionato da due cartelle.

Spero che questo possa aiutare.

Ho avuto un caso d’uso simile. Ho risolto il problema in Java scrivendo due classi personalizzate che RecordWriter MultipleTextOutputFormat e RecordWriter .

Il mio input era un JavaPairRDD> e volevo memorizzarlo in un file chiamato con la sua chiave, con tutte le righe contenute nel suo valore.

Ecco il codice per la mia implementazione MultipleTextOutputFormat

 class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat { @Override protected String generateFileNameForKeyValue(K key, V value, String name) { return key.toString(); //The return will be used as file name } /** The following 4 functions are only for visibility purposes (they are used in the class MyRecordWriter) **/ protected String generateLeafFileName(String name) { return super.generateLeafFileName(name); } protected V generateActualValue(K key, V value) { return super.generateActualValue(key, value); } protected String getInputFileBasedOutputFileName(JobConf job, String name) { return super.getInputFileBasedOutputFileName(job, name); } protected RecordWriter getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { return super.getBaseRecordWriter(fs, job, name, arg3); } /** Use my custom RecordWriter **/ @Override RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException { final String myName = this.generateLeafFileName(name); return new MyRecordWriter(this, fs, job, arg3, myName); } } 

Ecco il codice per la mia implementazione di RecordWriter .

 class MyRecordWriter implements RecordWriter { private RDDMultipleTextOutputFormat rddMultipleTextOutputFormat; private final FileSystem fs; private final JobConf job; private final Progressable arg3; private String myName; TreeMap> recordWriters = new TreeMap(); MyRecordWriter(RDDMultipleTextOutputFormat rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) { this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat; this.fs = fs; this.job = job; this.arg3 = arg3; this.myName = myName; } @Override void write(K key, V value) throws IOException { String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName); String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath); Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value); RecordWriter rw = this.recordWriters.get(finalPath); if(rw == null) { rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3); this.recordWriters.put(finalPath, rw); } List lines = (List) actualValue; for (String line : lines) { rw.write(null, line); } } @Override void close(Reporter reporter) throws IOException { Iterator keys = this.recordWriters.keySet().iterator(); while(keys.hasNext()) { RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next()); rw.close(reporter); } this.recordWriters.clear(); } } 

La maggior parte del codice è esattamente la stessa di FileOutputFormat . L’unica differenza sono quelle poche righe

 List lines = (List) actualValue; for (String line : lines) { rw.write(null, line); } 

Queste righe mi hanno permesso di scrivere ogni riga della mia List input List sul file. Il primo argomento della funzione di write è impostato su null per evitare di scrivere la chiave su ogni riga.

Per finire, ho solo bisogno di fare questa chiamata per scrivere i miei file

 javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);