Come leggere più file di testo in un singolo RDD?

Voglio leggere un mucchio di file di testo da una posizione hdfs ed eseguire la mapping su di essa in una iterazione usando la scintilla.

JavaRDD records = ctx.textFile(args[1], 1); è in grado di leggere solo un file alla volta.

Voglio leggere più di un file ed elaborarli come un singolo RDD. Come?

È ansible specificare intere directory, utilizzare caratteri jolly e persino CSV di directory e caratteri jolly. Per esempio:

 sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file") 

Come sottolinea Nick Chammas, questa è un’esposizione di FileInputFormat di Hadoop e quindi funziona anche con Hadoop (e Scalding).

Usa union come segue:

 val sc = new SparkContext(...) val r1 = sc.textFile("xxx1") val r2 = sc.textFile("xxx2") ... val rdds = Seq(r1, r2, ...) val bigRdd = sc.union(rdds) 

Quindi il bigRdd è il RDD con tutti i file.

È ansible utilizzare una singola chiamata textFile per leggere più file. Scala:

 sc.textFile(','.join(files)) 

Puoi usare questo

Innanzitutto puoi ottenere un buffer / elenco di percorsi S3:

 import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala } 

Ora passa questo object List al seguente pezzo di codice, nota: sc è un object di SQLContext

 var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } } 

Ora hai un RDD unificato finale, ovvero df

Facoltativo E puoi anche ripartizionarlo in un singolo BigRDD

 val files = sc.textFile(filename, 1).repartition(1) 

Il riavvio funziona sempre: D

In PySpark, ho trovato un modo utile per analizzare i file. Forse c’è un equivalente in Scala, ma non mi sento abbastanza a mio agio con una traduzione funzionante. È, in effetti, una chiamata di file di testo con l’aggiunta di etichette (nell’esempio seguente la chiave = nomefile, valore = 1 riga dal file).

File di testo “etichettato”

ingresso:

 import glob from pyspark import SparkContext SparkContext.stop(sc) sc = SparkContext("local","example") # if running locally sqlContext = SQLContext(sc) for filename in glob.glob(Data_File + "/*"): Spark_Full += sc.textFile(filename).keyBy(lambda x: filename) 

output: array con ogni voce contenente una tupla usando filename-as-key e con valore = ogni riga di file. (Tecnicamente, usando questo metodo puoi anche usare un tasto diverso oltre al nome del percorso file effettivo – forse una rappresentazione di hashing da salvare in memoria). vale a dire.

 [('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'), ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'), ...] 

Puoi anche ricombinare come lista di linee:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

 [('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']), ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])] 

Oppure ricombina interi file in stringhe singole (in questo esempio il risultato è lo stesso di quello ottenuto da wholeTextFiles, ma con la stringa “file:” rimossa dal filepathing.):

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()

puoi usare – JavaRDD records = sc.wholeTextFiles (“percorso della tua directory”) qui otterrai il percorso del tuo file e il contenuto di quel file. in modo da poter eseguire qualsiasi azione di un intero file alla volta che salva l’overhead

Tutte le risposte sono corrette con sc.textFile

Mi stavo chiedendo perché non wholeTextFiles in questo caso …

 sc.wholeTextFiles(yourfileListFromFolder.mkString(",")) .flatMap{case (path, text) ... 

una limitazione è che, dobbiamo caricare file di piccole dimensioni altrimenti le prestazioni saranno cattive e potrebbero portare a OOM.

Nota :

  • L’intero file dovrebbe adattarsi alla memoria
  • Buono per i formati di file che NON sono divisibili per linea … come i file XML

Ulteriore riferimento alla visita

È disponibile una soluzione pulita semplice. Utilizzare il metodo fullTextFiles (). Ciò richiederà una directory e forma una coppia di valori chiave. Il RDD restituito sarà una coppia RDD. Trova sotto la descrizione di Spark docs :

SparkContext.wholeTextFiles ti consente di leggere una directory contenente più file di testo di piccole dimensioni e li restituisce come coppie (nome file, contenuto). Questo è in contrasto con textFile, che restituirebbe un record per riga in ogni file

PROVA QUESTA interfaccia utilizzata per scrivere un DataFrame su sistemi di archiviazione esterni (ad es. File system, archivi di valori-chiave, ecc.). Usa DataFrame.write () per accedere a questo.

Novità nella versione 1.4.

csv (path, mode = None, compression = None, sep = None, quote = None, escape = None, header = None, nullValue = None, escapeQuotes = None, quoteAll = None, dateFormat = None, timestampFormat = None) Salva il contenuto di DataFrame in formato CSV nel percorso specificato.

Parametri: percorso – il percorso in qualsiasi modalità file system supportata da Hadoop – specifica il comportamento dell’operazione di salvataggio quando i dati esistono già.

append: aggiungi i contenuti di questo DataFrame ai dati esistenti. sovrascrivi: sovrascrivi i dati esistenti. ignore: ignora silenziosamente questa operazione se i dati esistono già. errore (caso predefinito): genera un’eccezione se i dati esistono già. compressione – codec di compressione da utilizzare durante il salvataggio su file. Questo può essere uno dei nomi di abbreviazione noti per la distinzione tra maiuscole e minuscole (nessuno, bzip2, gzip, lz4, snappy e deflate). sep – imposta il singolo carattere come separatore per ogni campo e valore. Se None è impostato, utilizza il valore predefinito,,. quote – imposta il carattere singolo utilizzato per l’escaping dei valori quotati in cui il separatore può essere parte del valore. Se None è impostato, utilizza il valore predefinito. “Se si desidera distriggersre le quotazioni, è necessario impostare una stringa vuota escape – imposta il carattere singolo utilizzato per l’escaping delle virgolette all’interno di un valore già quotato. , usa il valore predefinito, \ escapeQuotes – Un flag che indica se i valori che contengono virgolette devono sempre essere racchiusi tra virgolette. Se None è impostato, utilizza il valore predefinito true, sfuggendo a tutti i valori che contengono un virgolettato. quoteAll – Un flag che indica se tutti i valori devono sempre essere racchiusi tra virgolette Se è impostato Nessuno, utilizza il valore predefinito false, solo i valori di escape che contengono un carattere preventivo. header – scrive i nomi delle colonne come prima riga. Se None è impostato, utilizza il valore predefinito value, false. nullValue – imposta la rappresentazione di stringa di un valore nullo Se None è impostato, utilizza il valore predefinito, stringa vuota dataFormat – imposta la stringa che indica un formato di data.I formati di data personalizzati seguono i formati in java.text .SimpleDate Formato. Questo vale per il tipo di data. Se None è impostato, utilizza il valore del valore predefinito, aaaa-MM-gg. timestampFormat – imposta la stringa che indica un formato di data / ora. I formati di data personalizzati seguono i formati su java.text.SimpleDateFormat. Questo vale per il tipo di timestamp. Se None è impostato, utilizza il valore del valore predefinito, aaaa-MM-gg’T’HH: mm: ss.SSSZZ.

 rdd = textFile('/data/{1.txt,2.txt}')