Come cambiare la dimensione della partizione in Spark SQL

Ho il requisito di caricare i dati da una tabella Hive usando spark-SQL HiveContext e caricarli in HDFS. Per impostazione predefinita, DataFrame dall’output SQL sta avendo 2 partizioni. Per ottenere più parallelismo ho bisogno di più partizioni fuori dall’SQL. Non esiste un metodo sovraccarico in HiveContext per prendere il numero del parametro delle partizioni.

Ripartizionare l’RDD causa mescolamento e risultati in più tempo di elaborazione.

 val result = sqlContext.sql("select * from bt_st_ent") 

Ha l’output del registro di:

 Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes) Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes) 

Mi piacerebbe sapere c’è un modo per aumentare la dimensione delle partizioni dell’output di SQL.

Spark <2.0 :

Puoi utilizzare le opzioni di configurazione di Hadoop:

  • mapred.min.split.size .
  • mapred.max.split.size

così come la dimensione del blocco HDFS per controllare le dimensioni della partizione per i formati basati su file system *.

 val minSplit: Int = ??? val maxSplit: Int = ??? sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit) sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit) 

Spark 2.0+ :

È ansible utilizzare la configurazione spark.sql.files.maxPartitionBytes :

 spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit) 

In entrambi i casi, questi valori potrebbero non essere utilizzati da un’API di origine dati specifica, pertanto è necessario verificare sempre la documentazione / i dettagli di implementazione del formato utilizzato.


* Altri formati di input possono utilizzare impostazioni diverse. Vedi per esempio

  • Partizionamento in scintilla durante la lettura da RDBMS tramite JDBC
  • Differenza tra paragone di mapreduce e spark paritition

Inoltre, i Datasets creati da RDDs erediteranno il layout delle partizioni dai loro genitori.

Analogamente, le tabelle con bucket utilizzeranno il layout del bucket definito nel metastore con una relazione 1: 1 tra il bucket e la partizione Dataset .

Un problema molto comune e doloroso. Dovresti cercare una chiave che distribuisca i dati in partizioni uniformi. Puoi usare gli operatori DISTRIBUTE BY e CLUSTER BY per dire spark a raggruppare le righe in una partizione. Ciò comporterà un sovraccarico sulla query stessa. Ma si tradurrà in partizioni di dimensioni uguali. Deepsense ha un ottimo tutorial su questo.

Se il tuo SQL esegue un shuffle (ad esempio ha un join, o un qualche tipo di gruppo di), puoi impostare il numero di partizioni impostando la proprietà ‘spark.sql.shuffle.partitions’

  sqlContext.setConf( "spark.sql.shuffle.partitions", 64) 

Seguendo ciò che Fokko suggerisce, potresti usare una variabile casuale da raggruppare.

 val result = sqlContext.sql(""" select * from ( select *,random(64) as rand_part from bt_st_ent ) cluster by rand_part""")