In che modo i record del processo Hadoop si suddividono attraverso i limiti dei blocchi?

Secondo la Hadoop - The Definitive Guide

I record logici definiti da FileInputFormats di solito non si adattano perfettamente ai blocchi HDFS. Ad esempio, i record logici di TextInputFormat sono linee, che attraverseranno i limiti HDFS il più delle volte. Ciò non ha alcun impatto sul funzionamento del tuo programma: le linee non sono perse o interrotte, ad esempio, ma vale la pena saperlo, poiché significa che le mappe dati locali (cioè le mappe che sono in esecuzione sullo stesso host del loro dati di input) eseguirà alcune letture remote. Il leggero sovraccarico che ciò causa non è normalmente significativo.

Supponiamo che una linea di registrazione sia suddivisa su due blocchi (b1 e b2). Il mapper che processa il primo blocco (b1) noterà che l’ultima riga non ha un separatore EOL e recupera il resto della linea dal successivo blocco di dati (b2).

In che modo il mappatore che elabora il secondo blocco (b2) determina che il primo record è incompleto e deve elaborare a partire dal secondo record nel blocco (b2)?

Interessante domanda, ho passato un po ‘di tempo a guardare il codice per i dettagli e qui ci sono i miei pensieri. Le suddivisioni sono gestite dal client da InputFormat.getSplits , quindi uno sguardo a FileInputFormat fornisce le seguenti informazioni:

  • Per ogni file di input, ottenere la lunghezza del file, la dimensione del blocco e calcolare la dimensione max(minSize, min(maxSize, blockSize)) come max(minSize, min(maxSize, blockSize)) dove maxSize corrisponde a mapred.max.split.size e minSize è mapred.min.split.size
  • Dividere il file in diversi FileSplit base alla dimensione parziale calcasting sopra. Ciò che è importante qui è che ogni FileSplit viene inizializzato con un parametro di start corrispondente all’offset nel file di input . Non c’è ancora alcuna gestione delle linee a quel punto. La parte rilevante del codice è simile a questa:

     while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } 

Dopodiché, se guardi il LineRecordReader che è definito da TextInputFormat , è qui che vengono gestite le righe:

  • Quando si inizializza LineRecordReader , tenta di LineReader un’istanza di LineReader che è un’astrazione per poter leggere le righe su FSDataInputStream . Ci sono 2 casi:
  • Se è definito un CompressionCodec , questo codec è responsabile della gestione dei limiti. Probabilmente non pertinente alla tua domanda.
  • Se non c’è codec comunque, è qui che le cose sono interessanti: se l’ start di InputSplit è diverso da 0, quindi fai il backtrack di 1 carattere e poi salta la prima riga identificata da \ n o \ r \ n (Windows) ! Il backtrack è importante perché nel caso in cui i limiti della linea siano gli stessi dei confini divisi, questo garantisce di non saltare la riga valida. Ecco il codice rilevante:

     if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; 

Quindi, poiché le divisioni sono calcolate nel client, i mappatori non devono essere eseguiti in sequenza, ogni mappatore sa già se è necessario eliminare la prima riga o meno.

Quindi, in pratica, se hai 2 linee per ogni 100 Mb nello stesso file, e per semplificare diciamo che le dimensioni divise sono 64Mb. Quindi, quando vengono calcolate le divisioni di input, avremo lo scenario seguente:

  • Divisione 1 contenente il percorso e gli host di questo blocco. Inizializzato all’avvio 200-200 = 0Mb, lunghezza 64Mb.
  • Split 2 inizializzato all’inizio 200-200 + 64 = 64Mb, lunghezza 64Mb.
  • Split 3 inizializzato all’inizio 200-200 + 128 = 128 Mb, lunghezza 64 Mb.
  • Split 4 inizializzato all’inizio 200-200 + 192 = 192Mb, lunghezza 8Mb.
  • Il mappatore A elaborerà la divisione 1, l’inizio è 0 quindi non saltare la prima riga e leggere una riga intera che va oltre il limite di 64 Mb, quindi ha bisogno della lettura remota.
  • Mapper B elaborerà split 2, start is! = 0 quindi salta la prima riga dopo 64Mb-1byte, che corrisponde alla fine della riga 1 a 100Mb che è ancora nella divisione 2, abbiamo 28Mb della linea in split 2, quindi il telecomando legge i restanti 72Mb.
  • Mapper C elaborerà split 3, start is! = 0, quindi salta la prima riga dopo 128 Mb-1byte, che corrisponde alla fine della riga 2 a 200 Mb, che è la fine del file, quindi non fare nulla.
  • Il mapper D è lo stesso del mappatore C tranne che cerca una nuova riga dopo 192 Mb-1byte.

L’ algoritmo Map Reduece non funziona sui blocchi fisici del file. Funziona su split di input logici. La divisione dell’input dipende da dove è stato scritto il record. Un record può estendersi su due mapper.

Il modo in cui HDFS è stato impostato, suddivide i file molto grandi in blocchi di grandi dimensioni (ad esempio, misurando 128 MB) e memorizza tre copie di questi blocchi su diversi nodes nel cluster.

HDFS non ha consapevolezza del contenuto di questi file. Un record potrebbe essere stato avviato in Block-a ma la fine di quel record potrebbe essere presente nel blocco-b .

Per risolvere questo problema, Hadoop utilizza una rappresentazione logica dei dati memorizzati nei blocchi di file, noti come split di input. Quando un client di lavoro MapReduce calcola le divisioni di input , determina dove inizia il primo intero record in un blocco e dove finisce l’ultimo record nel blocco .

Il punto chiave:

Nei casi in cui l’ultimo record di un blocco è incompleto, lo split dell’input include le informazioni sulla posizione per il blocco successivo e lo scostamento di byte dei dati necessari per completare il record.

Dai uno sguardo al diagramma sottostante.

inserisci la descrizione dell'immagine qui

Dai un’occhiata a questo articolo e alla relativa domanda SE: Informazioni sulla divisione di file Hadoop / HDFS

Maggiori dettagli possono essere letti dalla documentazione

Il framework Map-Reduce si basa sul InputFormat del lavoro per:

  1. Convalidare le specifiche di input del lavoro.
  2. Suddividi i file di input in InputSplits logici, ciascuno dei quali viene quindi assegnato a un singolo Mapper.
  3. Ogni InputSplit viene quindi assegnato a un singolo Mapper per l’elaborazione. Split potrebbe essere una tupla . InputSplit[] getSplits(JobConf job,int numSplits ) è l’API per prendersi cura di queste cose.

FileInputFormat , che estende il InputFormat getSplits () implementato da getSplits . Dai un’occhiata agli interni di questo metodo su grepcode

Lo vedo come segue: InputFormat è responsabile di suddividere i dati in suddivisioni logiche tenendo conto della natura dei dati.
Niente gli impedisce di farlo, anche se può aggiungere una significativa latenza al lavoro – tutta la logica e la lettura intorno ai confini della dimensione split desiderata avverranno nel jobtracker.
Il formato di input più sensibile alla registrazione è TextInputFormat. Funziona come segue (per quanto ho capito dal codice) – il formato di input crea divisioni per dimensione, indipendentemente dalle linee, ma LineRecordReader sempre:
a) Salta la prima riga nella divisione (o parte di essa), se non è la prima divisione
b) Leggere una riga dopo il confine della divisione alla fine (se i dati sono disponibili, quindi non è l’ultima divisione).

Da quello che ho capito, quando il FileSplit viene inizializzato per il primo blocco, viene chiamato il costruttore predefinito. Pertanto, i valori di inizio e lunghezza sono inizialmente pari a zero. Alla fine dell’elaborazione del blocco pugno, se l’ultima riga è incompleta, il valore della lunghezza sarà maggiore della lunghezza della divisione e leggerà anche la prima riga del blocco successivo. A causa di ciò il valore di start per il primo blocco sarà maggiore di zero e in questa condizione, LineRecordReader salterà la prima riga del secondo blocco. (Vedi sorgente )

Se l’ultima riga del primo blocco è completa, il valore della lunghezza sarà uguale alla lunghezza del primo blocco e il valore dell’inizio per il secondo blocco sarà zero. In tal caso, LineRecordReader non salterà la prima riga e leggerà il secondo blocco dall’inizio.

Ha senso?

I mappatori non devono comunicare. I blocchi di file sono in HDFS e il mapper corrente (RecordReader) può leggere il blocco che ha la parte rimanente della linea. Questo succede dietro le quinte.

Dal codice sorgente di hadoop di LineRecordReader.java il costruttore: trovo alcuni commenti:

 // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; 

da questo credo che hadoop leggerà una linea in più per ogni spaccatura (alla fine della divisione corrente, leggi la riga successiva nella successiva divisione), e se non la prima divisione, la prima riga verrà eliminata. in modo che nessun record di linea sarà perso e incompleto