Spark: lettura dei file utilizzando delimitatori diversi rispetto alla nuova riga

Sto usando Apache Spark 1.0.1. Ho molti file delimitati con UTF8 \u0001 e non con la solita nuova linea \n . Come posso leggere questi file in Spark? Significato, il delimitatore predefinito di sc.textfile("hdfs:///myproject/*") è \n , e voglio cambiarlo in \u0001 .

In Spark shell, ho estratto i dati in base a Setting textinputformat.record.delimiter in spark :

 $ spark-shell ... scala> import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.LongWritable scala> import org.apache.hadoop.io.Text import org.apache.hadoop.io.Text scala> import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.TextInputFormat scala> val conf = new Configuration conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml scala> conf.set("textinputformat.record.delimiter", "\u0001") scala> val data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString) data: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile at :19 

sc.newAPIHadoopFile("mydata.txt", ...) è un RDD[(LongWritable, Text)] , dove la prima parte degli elementi è l’indice dei caratteri di partenza e la seconda parte è il testo reale delimitato da "\u0001" .

È ansible utilizzare textinputformat.record.delimiter per impostare il delimitatore per TextInputFormat , ad es.

 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat val conf = new Configuration(sc.hadoopConfiguration) conf.set("textinputformat.record.delimiter", "X") val input = sc.newAPIHadoopFile("file_path", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) val lines = input.map { case (_, text) => text.toString} println(lines.collect) 

Ad esempio, il mio input è un file contenente una riga aXbXcXd . Il codice sopra uscirà

 Array(a, b, c, d) 

In Python questo potrebbe essere ottenuto usando:

 rdd = sc.newAPIHadoopFile(YOUR_FILE, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf={"textinputformat.record.delimiter": YOUR_DELIMITER}).map(lambda l:l[1]) 

Ecco una versione pronta all’uso delle risposte di Chad e di @zsxwing per gli utenti di Scala, che possono essere utilizzate in questo modo:

 sc.textFile("some/path.txt", "\u0001") 

Lo snippet seguente crea un metodo textFile aggiuntivo collegato allo SparkContext utilizzando una implicit class (per replicare il metodo textFile predefinito di textFile ):

 package com.whatever import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat object Spark { implicit class ContextExtensions(val sc: SparkContext) extends AnyVal { def textFile( path: String, delimiter: String, maxRecordLength: String = "1000000" ): RDD[String] = { val conf = new Configuration(sc.hadoopConfiguration) // This configuration sets the record delimiter: conf.set("textinputformat.record.delimiter", delimiter) // and this one limits the size of one record: conf.set("mapreduce.input.linerecordreader.line.maxlength", maxRecordLength) sc.newAPIHadoopFile( path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf ) .map { case (_, text) => text.toString } } } } 

che può essere usato in questo modo:

 import com.whatever.Spark.ContextExtensions sc.textFile("some/path.txt", "\u0001") 

Notare l’impostazione aggiuntiva mapreduce.input.linerecordreader.line.maxlength che limita la dimensione massima di un record. Ciò è utile quando si legge da un file corrotto per il quale un record potrebbe essere troppo lungo per essere contenuto nella memoria (maggiori possibilità che ciò accada quando si gioca con il delimitatore di record).

Con questa impostazione, durante la lettura di un file danneggiato, viene generata un’eccezione ( java.io.IOException – quindi catchable) anziché una memoria disordinata che interrompe SparkContext.