In che modo le fasi si dividono in compiti in Spark?

Supponiamo per quanto segue che solo un lavoro Spark è in esecuzione in ogni momento.

Quello che ottengo finora

Ecco cosa capisco cosa succede in Spark:

  1. Quando viene creato uno SparkContext , ciascun nodo di lavoro avvia un executor. Gli esecutori sono processi separati (JVM), che si ricollegano al programma del driver. Ogni esecutore ha il barattolo del programma driver. Chiudendo un autista, arresta gli esecutori. Ogni esecutore può contenere alcune partizioni.
  2. Quando viene eseguito un lavoro, viene creato un piano di esecuzione in base al grafico di derivazione.
  3. Il lavoro di esecuzione è suddiviso in fasi, in cui le fasi contengono altrettante trasformazioni e azioni vicine (nel grafico del lignaggio), ma non mescolate. Quindi gli stadi sono separati da mescolanze.

immagine 1

lo capisco

  • Un’attività è un comando inviato dal driver a un executor serializzando l’object Function.
  • L’esecutore deserializza (con il barattolo del driver) il comando (task) e lo esegue su una partizione.

ma

Domande)

Come divido il palco in questi compiti?

In particolare:

  1. I task determinati dalle trasformazioni e dalle azioni o possono essere più trasformazioni / azioni devono essere in un’attività?
  2. Le attività sono determinate dalla partizione (ad esempio, un’attività per fase per partizione).
  3. Le attività sono determinate dai nodes (ad esempio un’attività per fase per nodo)?

Cosa penso (solo una risposta parziale, anche se giusta)

In https://0x0fff.com/spark-architecture-shuffle , lo shuffle viene spiegato con l’immagine

inserisci la descrizione dell'immagine qui

e ho l’impressione che la regola sia

ogni fase è suddivisa in # numero di partizioni, senza alcun riguardo per il numero di nodes

Per la mia prima immagine, direi che avrei 3 attività sulla mappa e 3 ridurre le attività.

Per l’immagine da 0x0fff, direi che ci sono 8 attività sulla mappa e 3 ridurre le attività (supponendo che ci siano solo tre file arancione e tre file verde scuro).

Domande aperte in ogni caso

È corretto? Ma anche se è corretto, le mie domande di cui sopra non sono tutte risposte, perché è ancora aperta, se più operazioni (ad esempio più mappe) sono all’interno di un’attività o sono separate in una attività per operazione.

Cosa dicono gli altri

Che cosa è un compito in Spark? Come fa il lavoratore Spark a eseguire il file jar? e In che modo lo schedulatore Apache Spark suddivide i file in attività? sono simili, ma non ritenevo che la mia domanda avesse una risposta chiara.

Hai un bel contorno qui. Per rispondere alle tue domande

  • È necessario avviare task separata per ciascuna partizione di dati per ogni stage . Si consideri che ogni partizione risiederà probabilmente in posizioni fisiche distinte, ad esempio blocchi in HDFS o directory / volumi per un file system locale.

Si noti che l’invio di Stage s è guidato dallo DAG Scheduler . Ciò significa che le fasi che non sono interdipendenti possono essere sottoposte al cluster per l’esecuzione in parallelo: questo massimizza la capacità di parallelizzazione sul cluster. Quindi, se le operazioni nel nostro stream di dati possono avvenire contemporaneamente, ci aspettiamo di vedere più fasi lanciate.

Possiamo vederlo in azione nel seguente esempio di giocattolo in cui eseguiamo i seguenti tipi di operazioni:

  • caricare due origini dati
  • eseguire alcune operazioni sulla mappa su entrambe le origini dati separatamente
  • Unisciti a loro
  • eseguire alcune operazioni di mapping e filtro sul risultato
  • salva il risultato

Allora, con quanti palcoscenici ci ritroveremo?

  • 1 stadio ciascuno per caricare le due origini dati in parallelo = 2 fasi
  • Una terza fase che rappresenta l’ join che dipende dalle altre due fasi
  • Nota: tutte le operazioni successive che lavorano sui dati uniti possono essere eseguite nello stesso stadio perché devono avvenire in sequenza. Non vi è alcun vantaggio per il lancio di fasi aggiuntive perché non possono iniziare il lavoro fino al completamento dell’operazione precedente.

Ecco quel programma di giocattoli

 val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) } val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }} val spj = sfi.join(sp) val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }} val sf = sm.filter{ case (k,v) => v % 10 == 0 } sf.saveAsTextFile("/data/blah/out") 

Ed ecco il DAG del risultato

inserisci la descrizione dell'immagine qui

Ora: quanti compiti ? Il numero di compiti dovrebbe essere uguale a

Somma di ( Stage * #Partitions in the stage )

Nel mio caso le #Partitions in the stage equivalgono al number of processors sulle macchine del mio cluster.

Se capisco correttamente ci sono 2 (correlati) cose che ti confondono:

1) Cosa determina il contenuto di un’attività?

2) Cosa determina il numero di compiti da eseguire?

Il motore di Spark “incolla” insieme semplici operazioni su file rdds consecutivi, ad esempio:

 rdd1 = sc.textFile( ... ) rdd2 = rdd1.filter( ... ) rdd3 = rdd2.map( ... ) rdd3RowCount = rdd3.count 

quindi quando rdd3 è (pigramente) calcolato, spark genererà un task per partizione di rdd1 e ogni task eseguirà sia il filtro che la mappa per riga per produrre rdd3.

Il numero di attività è determinato dal numero di partizioni. Ogni RDD ha un numero definito di partizioni. Per un RDD di origine letto da HDFS (utilizzando sc.textFile (…) per esempio) il numero di partizioni è il numero di divisioni generate dal formato di input. Alcune operazioni su RDD possono generare un RDD con un numero diverso di partizioni:

 rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ). 

Un altro esempio è join:

 rdd3 = rdd1.join( rdd2 , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ). 

(La maggior parte) operazioni che cambiano il numero di partizioni implicano un shuffle, quando facciamo per esempio:

 rdd2 = rdd1.repartition( 1000 ) 

ciò che accade in realtà è che l’attività su ogni partizione di rdd1 deve produrre un output finale che può essere letto dallo stage successivo in modo da fare in modo che rdd2 abbia esattamente 1000 partizioni (come fanno? hash o sort ). Le attività su questo lato sono a volte denominate “attività della mappa (lato)”. Un’attività che verrà eseguita in seguito su rdd2 agirà su una partizione (di rdd2!) E dovrebbe capire come leggere / combinare gli output della mappa rilevanti per quella partizione. Le attività su questo lato sono talvolta denominate “Riduci (lato) attività”.

Le 2 domande sono correlate: il numero di compiti in una fase è il numero di partizioni (comune ai rdds consecutivi “incollati” insieme) e il numero di partizioni di un rdd può cambiare tra le fasi (specificando il numero di partizioni per alcuni shuffle che causa l’operazione, ad esempio).

Una volta iniziata l’esecuzione di una fase, i suoi compiti possono occupare le aree di attività. Il numero di task-slot simultanee è numExecutors * ExecutorCores. In generale, questi possono essere occupati da compiti provenienti da fasi diverse e non dipendenti.

Questo potrebbe aiutarti a capire meglio diversi pezzi:

  • Stage: è una raccolta di attività. Lo stesso processo in esecuzione su diversi sottoinsiemi di dati (partizioni).
  • Attività: rappresenta un’unità di lavoro su una partizione di un set di dati distribuito. Quindi in ogni fase, numero di compiti = numero di partizioni, o come hai detto “un compito per livello per partizione”.
  • Ogni executer gira su un contenitore di filo e ogni contenitore risiede su un nodo.
  • Ogni fase utilizza più esecutori, a ciascun utente vengono assegnati più vcores.
  • Ogni vcore può eseguire esattamente un compito alla volta
  • Quindi in qualsiasi momento, più attività potrebbero essere eseguite in parallelo. numero di attività in esecuzione = numero di vcores in uso.