La funzione restituisce una lista vuota in Spark

Di seguito è riportato il codice per ottenere l’elenco dei nomi di file in un file zippato

def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = { val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open)) val filesInZip = new ArrayBuffer[String]() var ze : Option[ZipEntry] = None zipInputStream.foreach(stream =>{ do{ ze = Option(stream.getNextEntry); ze.foreach{ze => if(ze.getName.endsWith("java") && !ze.isDirectory()){ var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java")) filesInZip += fileName } } stream.closeEntry() } while(ze.isDefined) println(filesInZip.toList.length) // print 889 (correct) }) println(filesInZip.toList.length) // print 0 (WHY..?) (filesInZip.toList) } 

Eseguo il codice sopra nel modo seguente:

 scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip") zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at :25 scala> getListOfFilesInRepo(zipRDD) 889 0 res12: List[String] = List() 

Perché non ottengo 889 e invece ottengo 0?

Succede perché filesInZip non è condiviso tra i lavoratori. foreach opera su una copia locale di filesInZip e quando finisce questa copia viene semplicemente scartata e raccolta dei dati inutili. Se si desidera conservare i risultati, è necessario utilizzare la trasformazione (molto probabilmente una flatMap ) e restituire i valori aggregati raccolti.

 def listFiles(stream: PortableDataStream): TraversableOnce[String] = ??? zipInputStream.flatMap(listFiles) 

Puoi imparare di più da Understanding chiusure