Come sovrascrivere la directory di output in spark

Ho un’applicazione di streaming spark che produce un set di dati per ogni minuto. Devo salvare / sovrascrivere i risultati dei dati elaborati.

Quando ho provato a sovrascrivere il set di dati org.apache.hadoop.mapred.FileAlreadyExistsException interrompe l’esecuzione.

Ho impostato il set di proprietà Spark set("spark.files.overwrite","true") , ma non c’è fortuna.

Come sovrascrivere o predelettere i file dalla scintilla?

AGGIORNARE: Dataframes uso di Dataframes , oltre a qualcosa come ... .write.mode(SaveMode.Overwrite) ...

Per le versioni precedenti prova

 yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false") val sc = SparkContext(yourSparkConf) 

Nella versione 1.1.0 puoi configurare le impostazioni di conf usando lo script spark-submit con il flag –conf.

ATTENZIONE: Secondo @piggybox c’è un bug in Spark in cui sovrascrive solo i file di cui ha bisogno per scrivere i suoi file di parti, tutti gli altri file rimarranno non rimossi.

La documentazione per il parametro spark.files.overwrite dice questo: “Se sovrascrivere i file aggiunti tramite SparkContext.addFile() quando il file di destinazione esiste e il suo contenuto non corrisponde a quelli dell’origine.” Quindi non ha alcun effetto sul metodo saveAsTextFiles.

Potresti farlo prima di salvare il file:

 val hadoopConf = new org.apache.hadoop.conf.Configuration() val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf) try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } } 

Aas spiegato qui: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

Dalla documentazione pyspark.sql.DataFrame.save (attualmente alla 1.3.1), è ansible specificare mode='overwrite' quando si salva un DataFrame:

 myDataFrame.save(path='myPath', source='parquet', mode='overwrite') 

Ho verificato che questo rimuoverà anche i file di partizione rimasti. Quindi se avevi detto 10 partizioni / file in origine, ma poi hai sovrascritto la cartella con un DataFrame con solo 6 partizioni, la cartella risultante avrà le 6 partizioni / file.

Consultare la documentazione di Spark SQL per ulteriori informazioni sulle opzioni di modalità.

poiché df.save(path, source, mode) è deprecato, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )

usa df.write.format(source).mode("overwrite").save(path)
dove df.write è DataFrameWriter

‘source’ può essere (“com.databricks.spark.avro” | “parquet” | “json”)

df.write.mode (‘overwrite’). parquet (“/ output / folder / path”) funziona se si desidera sovrascrivere un file parquet usando python. Questo è nella scintilla 1.6.2. L’API potrebbe essere diversa nelle versioni successive

  val jobName = "WordCount"; //overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false") val conf = new SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false"); val sc = new SparkContext(conf) 

Questa versione sovraccaricata della funzione di salvataggio funziona per me:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf (“Sovrascrivi”))

L’esempio precedente sovrascriverà una cartella esistente. Il savemode può anche prendere questi parametri ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Aggiungi : la modalità di aggiunta indica che quando si salva un DataFrame su un’origine dati, se i dati / la tabella esistono già, i contenuti di DataFrame dovrebbero essere aggiunti ai dati esistenti.

ErrorIfExists : La modalità ErrorIfExists indica che quando si salva un DataFrame su un’origine dati, se i dati esistono già, si prevede che venga generata un’eccezione.

Ignora : la modalità Ignora significa che quando si salva un DataFrame su un’origine dati, se i dati esistono già, si prevede che l’operazione di salvataggio non salvi il contenuto di DataFrame e non cambi i dati esistenti.

Se si è disposti a utilizzare il proprio formato di output personalizzato, si sarà in grado di ottenere il comportamento desiderato anche con RDD.

Dai un’occhiata alle seguenti classi: FileOutputFormat , FileOutputCommitter

Nel formato di output del file è presente un metodo denominato checkOutputSpecs, che verifica se la directory di output esiste. In FileOutputCommitter hai il commitJob che di solito trasferisce i dati dalla directory temporanea alla sua posizione finale.

Non ero ancora in grado di verificarlo (lo farei, non appena ho pochi minuti gratuiti) ma in teoria: se estendo FileOutputFormat e sovrascrivi checkOutputSpecs in un metodo che non genera un’eccezione sulla directory esiste già e aggiusta il metodo commitJob del mio commutatore di output personalizzato per eseguire quale logica sempre che voglio (ad esempio, sovrascrivere alcuni dei file, aggiungere altri) di quanto possa essere in grado di ottenere il comportamento desiderato anche con gli RDD.

Il formato di output è passato a: saveAsNewAPIHadoopFile (che è anche il metodo saveAsTextFile chiamato per salvare effettivamente i file). E il commutatore di uscita è configurato a livello di applicazione.