unire i file di output dopo la riduzione della fase

In mapreduce ogni task di riduzione scrive il suo output in un file chiamato part-r-nnnnn dove nnnnn è un ID di partizione associato all’attività di riduzione. Mappa / Riduci unisci questi file? Se sì, come?

Anziché eseguire la fusione dei file da soli, è ansible debind l’unione dei file di output riducendo la chiamata:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt 

Nota Questo combina i file HDFS localmente. Assicurati di avere abbastanza spazio sul disco prima di eseguirlo

No, questi file non sono uniti da Hadoop. Il numero di file che ottieni è uguale al numero di attività ridotte.

Se ti serve come input per un lavoro successivo, non preoccuparti di avere file separati. Basta specificare l’intera directory come input per il prossimo lavoro.

Se hai bisogno dei dati al di fuori del cluster, di solito li unisco al lato ricevente quando estrai i dati dal cluster.

Cioè qualcosa del genere:

 hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt 

Questa è la funzione che puoi utilizzare per unire file in HDFS

 public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException { FileSystem fs = FileSystem.get(config); Path srcPath = new Path(src); Path dstPath = new Path(dest); // Check if the path already exists if (!(fs.exists(srcPath))) { logger.info("Path " + src + " does not exists!"); return false; } if (!(fs.exists(dstPath))) { logger.info("Path " + dest + " does not exists!"); return false; } return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null); } 

Solo per i file di testo e HDFS come sorgente e destinazione, utilizzare il comando seguente:

hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file

Questo concatenerà tutti i file in input_hdfs_dir e scriverà l’output su HDFS su output_hdfs_file . Tieni presente che tutti i dati verranno riportati al sistema locale e quindi caricati di nuovo su hdf, sebbene non vengano creati file temporanei e questo avviene al volo utilizzando UNIX pe.

Inoltre, questo non funzionerà con file non di testo come Avro, ORC, ecc.

Per i file binari, potresti fare qualcosa di simile (se hai tabelle Hive mappate nelle directory):

insert overwrite table tbl select * from tbl

A seconda della configurazione, questo potrebbe anche creare più dei file. Per creare un singolo file, impostare il numero di riduttori su 1 in modo esplicito usando mapreduce.job.reduces=1 o impostare la proprietà hive come hive.merge.mapredfiles=true .

È ansible eseguire un’ulteriore mappa / ridurre l’attività, in cui la mappa e la riduzione non cambiano i dati e il partizionatore assegna tutti i dati a un singolo riduttore.

I file part-r-nnnnn vengono generati dopo la fase di riduzione indicata da ‘r’ in mezzo. Ora il fatto è che se si ha un riduttore in esecuzione, si avrà un file di output come part-r-00000. Se il numero di riduttori è 2, allora avrai part-r-00000 e part-r-00001 e così via. Guarda, se il file di output è troppo grande per adattarsi alla memoria della macchina poiché il framework hadoop è stato progettato per essere eseguito su Commodity Machines , il file viene diviso. Come per MRv1, hai un limite di 20 riduttori per lavorare sulla tua logica. Potresti avere di più ma la stessa necessità deve essere personalizzata nei file di configurazione mapred-site.xml . Parlando della tua domanda; è ansible utilizzare getmerge o impostare il numero di riduttori su 1 incorporando la seguente dichiarazione nel codice del driver

 job.setNumReduceTasks(1); 

Spero che questo risponda alla tua domanda.

Oltre alla mia precedente risposta, ho ancora una risposta per te che stavo provando pochi minuti fa. È ansible utilizzare CustomOutputFormat che assomiglia al codice indicato di seguito

 public class VictorOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter( TaskAttemptContext tac) throws IOException, InterruptedException { //step 1: GET THE CURRENT PATH Path currPath=FileOutputFormat.getOutputPath(tac); //Create the full path Path fullPath=new Path(currPath,"Aniruddha.txt"); //create the file in the file system FileSystem fs=currPath.getFileSystem(tac.getConfiguration()); FSDataOutputStream fileOut=fs.create(fullPath,tac); return new VictorRecordWriter(fileOut); } } 

Solo, dai un’occhiata alla quarta riga dall’ultima. Ho usato il mio nome come nome del file di output e ho testato il programma con 15 riduttori. Tuttavia il file rimane lo stesso. Quindi ottenere un singolo file invece di due o più è ancora molto chiaro, la dimensione del file di output non deve superare la dimensione della memoria primaria, ovvero il file di output deve adattarsi alla memoria della macchina di base altrimenti potrebbe esserci un problema con la divisione del file di output. Grazie!!

Perché non utilizzare uno script maiale come questo per unire i file di partizione:

 stuff = load "/path/to/dir/*" store stuff into "/path/to/mergedir" 

Se i file hanno l’intestazione, puoi liberartene facendo ciò:

 hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv 

quindi aggiungi manualmente l’intestazione per output.csv

. Mappa / Riduci unisci questi file?

No. Non si fonde.

Puoi usare IdentityReducer per raggiungere il tuo objective.

Non esegue alcuna riduzione, scrivendo tutti i valori di input direttamente sull’output.

 public void reduce(K key, Iterator values, OutputCollector output, Reporter reporter) throws IOException 

Scrive tutte le chiavi e i valori direttamente sull’output.

Dai un’occhiata ai post correlati di SE:

hadoop: differenza tra 0 riduttore e riduttore di id quadro?