Che cosa è un compito in Spark? Come fa il lavoratore Spark a eseguire il file jar?

Dopo aver letto alcuni documenti su http://spark.apache.org/docs/0.8.0/cluster-overview.html , ho ricevuto alcune domande che voglio chiarire.

Prendi questo esempio da Spark:

JavaSparkContext spark = new JavaSparkContext( new SparkConf().setJars("...").setSparkHome....); JavaRDD file = spark.textFile("hdfs://..."); // step1 JavaRDD words = file.flatMap(new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } }); // step2 JavaPairRDD pairs = words.map(new PairFunction() { public Tuple2 call(String s) { return new Tuple2(s, 1); } }); // step3 JavaPairRDD counts = pairs.reduceByKey(new Function2() { public Integer call(Integer a, Integer b) { return a + b; } }); counts.saveAsTextFile("hdfs://..."); 

Diciamo che ho un cluster di 3 nodes e il nodo 1 in esecuzione come master, e il programma di driver sopra riportato è stato correttamente jared (ad esempio application-test.jar). Quindi ora sto eseguendo questo codice sul nodo master e credo che subito dopo la SparkContext dello SparkContext , il file application-test.jar verrà copiato sui nodes worker (e ogni worker creerà una dir per quell’applicazione).

Quindi ora la mia domanda: sono step1, step2 e step3 nelle attività di esempio che vengono inviate agli operai? Se sì, allora come lo esegue il lavoratore? Come java -cp "application-test.jar" step1 e così via?

    Quando crei SparkContext , ciascun lavoratore avvia un esecutore . Questo è un processo separato (JVM) e carica anche il tuo barattolo. Gli esecutori si connettono al tuo programma di guida. Ora il driver può inviare loro comandi, come flatMap , map e reduceByKey nel tuo esempio. Quando l’autista si chiude, gli esecutori si spengono.

    Gli RDD sono un po ‘come i grandi array che sono suddivisi in partizioni e ogni esecutore può contenere alcune di queste partizioni.

    Un’attività è un comando inviato dal driver a un executor serializzando l’object Function . L’esecutore deserializza il comando (ciò è ansible perché ha caricato il jar) e lo esegue su una partizione.

    (Questa è una panoramica concettuale. Sto soffermandomi su alcuni dettagli, ma spero che sia utile).


    Per rispondere alla tua domanda specifica: No, non è stato avviato un nuovo processo per ogni passaggio. SparkContext viene SparkContext lo SparkContext viene avviato un nuovo processo su ciascun lavoratore.

    Per avere una visione chiara su come le attività sono create e pianificate, dobbiamo capire come funziona il modello di esecuzione in Spark. In breve, un’applicazione in scintilla viene eseguita in tre passaggi:

    1. Crea un grafico RDD
    2. Crea un piano di esecuzione in base al grafico RDD. Le fasi vengono create in questo passaggio
    3. Genera attività in base al piano e pianificale su tutti i lavoratori

    Nel tuo esempio di conteggio delle parole, il grafico RDD è piuttosto semplice, è qualcosa come segue:

    file -> linee -> parole -> numero per parola -> numero di parole globali -> output

    Sulla base di questo grafico, vengono create due fasi. La regola di creazione scenica si basa sull’idea di canalizzare quante più piccole trasformazioni possibili. Nel tuo esempio, la trasformazione stretta termina con il conteggio delle parole. Pertanto, ottieni due fasi

    1. file -> linee -> parole -> conteggio per parola
    2. conteggio parole globali -> output

    Una volta individuate le fasi, la scintilla genererà compiti da fasi. La prima fase creerà ShuffleMapTasks e l’ultima fase creerà i ResultTasks perché nell’ultima fase è inclusa un’operazione di azione per produrre risultati.

    Il numero di attività da generare dipende da come vengono distribuiti i file. Supponiamo che tu abbia 3 tre file diversi in tre nodes diversi, il primo stadio genererà 3 compiti: un compito per partizione.

    Pertanto, non è necessario mappare direttamente i passaggi alle attività. Un’attività appartiene a uno stage ed è correlata a una partizione.

    Di solito, il numero di attività eseguite per uno stage è esattamente il numero di partizioni ShuffleMapStages finale, ma poiché gli RDD possono essere condivisi (e quindi ShuffleMapStages ) il loro numero varia a seconda della condivisione RDD / stage. Si prega di fare riferimento a Come funziona DAG sotto le copertine in RDD?