Perché la trasformazione sortBy triggers un lavoro Spark?

Secondo la documentazione Spark, solo le azioni RDD possono triggersre un lavoro Spark e le trasformazioni vengono ponderate quando viene chiamata un’azione.

Vedo che la funzione di trasformazione sortBy viene applicata immediatamente e viene mostrata come trigger di job in SparkUI. Perché?

sortBy è implementato usando sortByKey che dipende da un RangePartitioner (JVM) o dalla funzione di partizionamento (Python). Quando si chiama sortBy / sortByKey partizionatore (funzione di partizionamento) viene inizializzato con entusiasmo e campiona l’input RDD per calcolare i limiti della partizione. Il lavoro che vedi corrisponde a questo processo.

L’ordinamento effettivo viene eseguito solo se si esegue un’azione sul RDD appena creato o sui suoi discendenti.

Come per la documentazione Spark, solo l’azione triggers un lavoro in Spark, le trasformazioni vengono ponderate quando viene chiamata un’azione.

In generale hai ragione, ma come hai appena vissuto, ci sono alcune eccezioni e sortBy è tra questi (con zipWithIndex ).

Di fatto, è stato riportato in JIRA di Spark e chiuso con risoluzione di Will not Fix. Vedi SPARK-1021 sortByKey () avvia un lavoro cluster quando non dovrebbe .

È ansible visualizzare il lavoro in esecuzione con la registrazione di DAGScheduler abilitata (e successivamente nell’interfaccia utente Web):

 scala> sc.parallelize(0 to 8).sortBy(identity) INFO DAGScheduler: Got job 1 (sortBy at :25) with 8 output partitions INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at :25) INFO DAGScheduler: Parents of final stage: List() INFO DAGScheduler: Missing parents: List() DEBUG DAGScheduler: submitStage(ResultStage 1) DEBUG DAGScheduler: missing: List() INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at :25), which has no missing parents DEBUG DAGScheduler: submitMissingTasks(ResultStage 1) INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at :25) DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4) INFO DAGScheduler: ResultStage 1 (sortBy at :25) finished in 0.013 s DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0 INFO DAGScheduler: Job 1 finished: sortBy at :25, took 0.019755 s res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at :25