Come faccio a leggere un file CSV di grandi dimensioni con la class Scala Stream?

Come faccio a leggere un file CSV di grandi dimensioni (> 1 Gb) con un stream di Scala? Hai un esempio di codice? Oppure utilizzeresti un modo diverso per leggere un file CSV di grandi dimensioni senza prima caricarlo in memoria?

Basta usare Source.fromFile(...).getLines come hai già detto.

Ciò restituisce un Iterator, che è già pigro (si userebbe lo stream come una raccolta lenta in cui si desiderava che i valori recuperati in precedenza vengano memoizzati, quindi è ansible leggerli di nuovo)

Se si verificano problemi di memoria, il problema si troverà in ciò che si sta facendo dopo getLines. Qualsiasi operazione come toList , che impone una raccolta rigorosa, causerà il problema.

Spero che tu non intenda la collection.immutable.Stream di Scala. Immutabile. Stream con Stream. Questo non è quello che vuoi. Lo streaming è pigro, ma la memoizzazione.

Non so cosa pensi di fare, ma leggere il file riga per riga dovrebbe funzionare molto bene senza utilizzare grandi quantità di memoria.

getLines dovrebbe valutare pigramente e non dovrebbe bloccarsi (a patto che il tuo file non abbia più di 2 ² di righe, afaik). Se lo fa, chiedi a #scala o fai un bug ticket (o fallo entrambi).

Se stai cercando di elaborare il file di grandi dimensioni riga per riga evitando di caricare l’intero contenuto del file in memoria tutto in una volta, puoi utilizzare l’ Iterator restituito da scala.io.Source .

Ho una piccola funzione, tryProcessSource , (contenente due sotto-funzioni) che uso esattamente per questi tipi di casi d’uso. La funzione richiede fino a quattro parametri, di cui è richiesto solo il primo. Gli altri parametri hanno valori predefiniti sani forniti.

Ecco il profilo della funzione (l’implementazione della funzione completa è in fondo):

 def tryProcessSource( file: File, parseLine: (Int, String) => Option[List[String]] = (index, unparsedLine) => Some(List(unparsedLine)), filterLine: (Int, List[String]) => Option[Boolean] = (index, parsedValues) => Some(true), retainValues: (Int, List[String]) => Option[List[String]] = (index, parsedValues) => Some(parsedValues), ): Try[List[List[String]]] = { ??? } 

Il primo parametro, file: File , è richiesto. Ed è solo una qualsiasi istanza valida di java.io.File che punta a un file di testo orientato alla linea, come un CSV.

Il secondo parametro, parseLine: (Int, String) => Option[List[String]] , è facoltativo. E se fornito, deve essere una funzione che prevede di ricevere due parametri di input; index: Int , unparsedLine: String . E quindi restituire Option[List[String]] . La funzione può restituire un List[String] Some pacchetti List[String] costituito dai valori di colonna validi. Oppure può restituire un None che indica l’interruzione anticipata dell’intero processo di streaming. Se questo parametro non viene fornito, viene fornito un valore predefinito di (index, line) => Some(List(line)) . Questo valore predefinito restituisce l’intera riga restituita come valore String singolo.

Il terzo parametro, filterLine: (Int, List[String]) => Option[Boolean] , è facoltativo. E se fornito, deve essere una funzione che prevede di ricevere due parametri di input; index: Int , parsedValues: List[String] . E quindi restituire Option[Boolean] . La funzione potrebbe restituire un valore Boolean Un Some avvolto che indica se questa particolare riga deve essere inclusa nell’output. Oppure può restituire un None che indica l’interruzione anticipata dell’intero processo di streaming. Se questo parametro non viene fornito, viene fornito un valore predefinito di (index, values) => Some(true) . Questo valore predefinito comporta l’inclusione di tutte le righe.

Il quarto e ultimo parametro, retainValues: (Int, List[String]) => Option[List[String]] , è facoltativo. E se fornito, deve essere una funzione che prevede di ricevere due parametri di input; index: Int , parsedValues: List[String] . E quindi restituire Option[List[String]] . La funzione può restituire un List[String] di alcune List[String] costituito da alcuni sottoinsiemi e / o alterazioni dei valori di colonna esistenti. Oppure può restituire un None che indica l’interruzione anticipata dell’intero processo di streaming. Se questo parametro non viene fornito, viene fornito un valore predefinito di (index, values) => Some(values) . Questo valore predefinito parseLine i valori analizzati dal secondo parametro, parseLine .

Considera un file con il seguente contenuto (4 righe):

 street,street2,city,state,zip 100 Main Str,,Irving,TX,75039 231 Park Ave,,Irving,TX,75039 1400 Beltline Rd,Apt 312,Dallas,Tx,75240 

Il seguente profilo di chiamata …

 val tryLinesDefaults = tryProcessSource(new File("path/to/file.csv")) 

… genera questo output per tryLinesDefaults (il contenuto inalterato del file):

 Success( List( List("street,street2,city,state,zip"), List("100 Main Str,,Irving,TX,75039"), List("231 Park Ave,,Irving,TX,75039"), List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240") ) ) 

Il seguente profilo di chiamata …

 val tryLinesParseOnly = tryProcessSource( new File("path/to/file.csv") , parseLine = (index, unparsedLine) => Some(unparsedLine.split(",").toList) ) 

… genera questo output per tryLinesParseOnly (ogni riga analizzata nei singoli valori di colonna):

 Success( List( List("street","street2","city","state","zip"), List("100 Main Str","","Irving,TX","75039"), List("231 Park Ave","","Irving","TX","75039"), List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240") ) ) 

Il seguente profilo di chiamata …

 val tryLinesIrvingTxNoHeader = tryProcessSource( new File("C:/Users/Jim/Desktop/test.csv") , parseLine = (index, unparsedLine) => Some(unparsedLine.split(",").toList) , filterLine = (index, parsedValues) => Some( (index != 0) && //skip header line (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving (parsedValues(3).toLowerCase == "Tx".toLowerCase) ) ) 

… genera questo output per tryLinesIrvingTxNoHeader (ogni riga analizzata nei singoli valori di colonna, nessuna intestazione e solo le due righe in Irving, Tx):

 Success( List( List("100 Main Str","","Irving,TX","75039"), List("231 Park Ave","","Irving","TX","75039"), ) ) 

Ecco l’intera implementazione della funzione tryProcessSource :

 import scala.io.Source import scala.util.Try import java.io.File def tryProcessSource( file: File, parseLine: (Int, String) => Option[List[String]] = (index, unparsedLine) => Some(List(unparsedLine)), filterLine: (Int, List[String]) => Option[Boolean] = (index, parsedValues) => Some(true), retainValues: (Int, List[String]) => Option[List[String]] = (index, parsedValues) => Some(parsedValues) ): Try[List[List[String]]] = { def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] = try {Try(transfer(source))} finally {source.close()} def recursive( remaining: Iterator[(String, Int)], accumulator: List[List[String]], isEarlyAbort: Boolean = false ): List[List[String]] = { if (isEarlyAbort || !remaining.hasNext) accumulator else { val (line, index) = remaining.next parseLine(index, line) match { case Some(values) => filterLine(index, values) match { case Some(keep) => if (keep) retainValues(index, values) match { case Some(valuesNew) => recursive(remaining, valuesNew :: accumulator) //capture values case None => recursive(remaining, accumulator, isEarlyAbort = true) //early abort } else recursive(remaining, accumulator) //discard row case None => recursive(remaining, accumulator, isEarlyAbort = true) //early abort } case None => recursive(remaining, accumulator, isEarlyAbort = true) //early abort } } } Try(Source.fromFile(file)).flatMap( bufferedSource => usingSource(bufferedSource) { source => recursive(source.getLines().buffered.zipWithIndex, Nil).reverse } ) } 

Mentre questa soluzione è relativamente succinta, mi ci sono voluti molto tempo e molti passaggi di refactoring prima che potessi finalmente arrivare qui. Per favore fatemi sapere se vedete in che modo potrebbe essere migliorato.


AGGIORNAMENTO: Ho appena chiesto il problema sotto come è la propria domanda StackOverflow . E ora ha una risposta che fissa l’errore menzionato di seguito.

Ho avuto l’idea di provare a renderlo ancora più generico modificando il parametro retainValues per transformLine con la nuova definizione di funzione generica di seguito riportata. Tuttavia, continuo a ricevere l’errore di evidenziazione in IntelliJ “Espressione di tipo Some [List [String]] non è conforms al tipo previsto Opzione [A]” e non è stato in grado di capire come modificare il valore predefinito in modo che l’errore Va via.

 def tryProcessSource2[A <: AnyRef]( file: File, parseLine: (Int, String) => Option[List[String]] = (index, unparsedLine) => Some(List(unparsedLine)), filterLine: (Int, List[String]) => Option[Boolean] = (index, parsedValues) => Some(true), transformLine: (Int, List[String]) => Option[A] = (index, parsedValues) => Some(parsedValues) ): Try[List[A]] = { ??? } 

Qualsiasi aiuto su come realizzare questo lavoro sarebbe molto apprezzato.