NullPointerException in Scala Spark, sembra essere causato da tipo di raccolta?

sessionIdList è di tipo:

scala> sessionIdList res19: org.apache.spark.rdd.RDD [String] = MappedRDD [17] distinto a: 30

Quando provo a eseguire il codice seguente:

val x = sc.parallelize(List(1,2,3)) val cartesianComp = x.cartesian(x).map(x => (x)) val kDistanceNeighbourhood = sessionIdList.map(s => { cartesianComp.filter(v => v != null) }) kDistanceNeighbourhood.take(1) 

Ricevo un’eccezione

 14/05/21 16:20:46 ERROR Executor: Exception in task ID 80 java.lang.NullPointerException at org.apache.spark.rdd.RDD.filter(RDD.scala:261) at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:38) at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:36) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) 

Tuttavia se io uso:

 val l = sc.parallelize(List("1","2")) val kDistanceNeighbourhood = l.map(s => { cartesianComp.filter(v => v != null) }) kDistanceNeighbourhood.take(1) 

Quindi non viene visualizzata alcuna eccezione

La differenza tra i due snippet di codice è che nel primo frammento sessionIdList è di tipo:

 res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at :30 

e nel secondo frammento “l” è di tipo

 scala> l res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at :12 

Perché si verifica questo errore?

Devo convertire sessionIdList in ParallelCollectionRDD per risolvere questo problema?

Spark non supporta l’annidamento di RDD (consultare https://stackoverflow.com/a/14130534/590203 per un’altra occorrenza dello stesso problema), quindi non è ansible eseguire trasformazioni o azioni su RDD all’interno di altre operazioni RDD.

Nel primo caso, viene visualizzata una eccezione NullPointerException generata dal worker quando tenta di accedere a un object SparkContext presente solo nel driver e non negli worker.

Nel secondo caso, il mio sospetto è che il lavoro è stato eseguito localmente sull’autista e ha funzionato per puro caso.

È una domanda ragionevole e ho sentito che lo ha chiesto abbastanza volte. Proverò a provare a spiegare perché è vero, perché potrebbe essere d’aiuto.

Gli RDD nidificati generano sempre un’eccezione nella produzione. Le chiamate di funzioni nidificate come penso che le stiate descrivendo qui, se ciò significa chiamare un’operazione RDD all’interno di un’operazione RDD, causeranno anche errori di causa poiché è effettivamente la stessa cosa. (Gli RDD sono immutabili, quindi eseguire un’operazione RDD come una “mappa” equivale a creare un nuovo RDD.) La possibilità di creare RDD nidificati è una conseguenza necessaria del modo in cui un RDD è definito e del modo in cui Spark Application è impostare.

Un RDD è una raccolta distribuita di oggetti (chiamati partizioni) che vivono su Spark Executor. Gli esecutori Spark non possono comunicare tra loro, solo con il driver Spark. Le operazioni RDD sono tutte calcolate in pezzi su queste partizioni. Poiché l’ambiente dell’esecutore di RDD non è ricorsivo (ovvero è ansible configurare un driver Spark in un esecutore di scintilla con sub executor), nessuno dei due può eseguire un RDD.

Nel tuo programma, hai creato una collezione distribuita di partizioni di interi. Quindi stai eseguendo un’operazione di mapping. Quando il driver Spark vede un’operazione di mapping, invia le istruzioni per eseguire il mapping agli esecutori, che eseguono la trasformazione su ogni partizione in parallelo. Ma la tua mapping non può essere fatta, perché su ogni partizione stai cercando di chiamare “intero RDD” per eseguire un’altra operazione distribuita. Questo non può essere fatto, perché ogni partizione non ha accesso alle informazioni sulle altre partizioni, se così fosse, il calcolo non potrebbe essere eseguito in parallelo.

Cosa invece si può fare, poiché i dati necessari nella mappa sono probabilmente di piccole dimensioni (poiché si sta facendo un filtro e il filtro non richiede alcuna informazione su sessionIdList) si deve innanzitutto filtrare l’elenco degli ID di sessione. Quindi raccogliere quella lista per il conducente. Quindi trasmetterlo agli esecutori, dove è ansible utilizzarlo sulla mappa. Se l’elenco sessionID è troppo grande, probabilmente dovrai fare un join.