Cosa c’è di sbagliato in `unionAll` di Spark` DataFrame`?

Usando Spark 1.5.0 e dato il seguente codice, mi aspetto unionAll to union DataFrame s in base al nome della colonna. Nel codice, sto usando alcuni FunSuite per passare in SparkContext sc :

 object Entities { case class A (a: Int, b: Int) case class B (b: Int, a: Int) val as = Seq( A(1,3), A(2,4) ) val bs = Seq( B(5,3), B(6,4) ) } class UnsortedTestSuite extends SparkFunSuite { configuredUnitTest("The truth test.") { sc => val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val aDF = sc.parallelize(Entities.as, 4).toDF val bDF = sc.parallelize(Entities.bs, 4).toDF aDF.show() bDF.show() aDF.unionAll(bDF).show } } 

Produzione:

 +---+---+ | a| b| +---+---+ | 1| 3| | 2| 4| +---+---+ +---+---+ | b| a| +---+---+ | 5| 3| | 6| 4| +---+---+ +---+---+ | a| b| +---+---+ | 1| 3| | 2| 4| | 5| 3| | 6| 4| +---+---+ 

Perché il risultato contiene colonne “b” e “a” unite, invece di allineare le basi delle colonne sui nomi delle colonne? Sembra un insetto serio !?

Non sembra affatto un bug. Quello che vedi è un comportamento SQL standard e tutti i principali RDMBS, inclusi PostgreSQL , MySQL , Oracle e MS SQL si comportano esattamente allo stesso modo. Troverai esempi di Fiddle SQL collegati con i nomi.

Per citare il manuale di PostgreSQL :

Per calcolare l’unione, l’intersezione o la differenza di due query, le due query devono essere “union compatibili”, il che significa che restituiscono lo stesso numero di colonne e le colonne corrispondenti hanno tipi di dati compatibili

I nomi delle colonne, esclusa la prima tabella nell’operazione set, vengono semplicemente ignorati.

Questo comportamento deriva direttamente dall’algebra relazionale in cui il blocco elementare di base è una tupla. Poiché le tuple sono ordinate, l’unione di due serie di tuple equivale (ignorando la gestione dei duplicati) all’output che si ottiene qui.

Se vuoi abbinare usando i nomi puoi fare qualcosa di simile

 import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.col def unionByName(a: DataFrame, b: DataFrame): DataFrame = { val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq a.select(columns: _*).unionAll(b.select(columns: _*)) } 

Per controllare sia i nomi che i tipi dovrebbe essere sufficiente sostituire le columns con:

 a.dtypes.toSet.intersect(b.dtypes.toSet).map{case (c, _) => col(c)}.toSeq 

Questo problema si sta risolvendo in spark2.3. Stanno aggiungendo il supporto di unionByName nel set di dati.

 https://issues.apache.org/jira/browse/SPARK-21043 

Come discusso in SPARK-9813 , sembra che finchè i tipi di dati e il numero di colonne siano uguali tra i frame, l’operazione unionAll dovrebbe funzionare. Si prega di consultare i commenti per ulteriori discussioni.