Appiattisci automaticamente ed elegante DataFrame in Spark SQL

Tutti,

Esiste un modo elegante e accettato per appiattire una tabella Spark SQL (Parquet) con colonne che sono di tipo StructType nidificato

Per esempio

Se il mio schema è:

 foo |_bar |_baz x y z 

Come selezionarlo in un modulo tabulare appiattito senza ricorrere all’esecuzione manuale

 df.select("foo.bar","foo.baz","x","y","z") 

In altre parole, come ottengo il risultato del codice sopra programmaticamente dato solo uno StructType e un DataFrame

La risposta breve è che non esiste un modo “accettato” per farlo, ma puoi farlo in modo molto elegante con una funzione ricorsiva che genera la tua select(...) passando attraverso il DataFrame.schema .

La funzione ricorsiva dovrebbe restituire una Array[Column] . Ogni volta che la funzione raggiunge un StructType , si chiama e aggiunge la Array[Column] restituita alla propria Array[Column] .

Qualcosa di simile a:

 def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName)) } }) } 

Dovresti quindi usarlo in questo modo:

 df.select(flattenSchema(df.schema):_*) 

Sto migliorando la mia risposta precedente e offrendo una soluzione al mio problema indicato nei commenti della risposta accettata.

Questa soluzione accettata crea una matrice di oggetti Column e la usa per selezionare queste colonne. In Spark, se si dispone di un DataFrame nidificato, è ansible selezionare la colonna figlio in questo modo: df.select("Parent.Child") e restituisce un DataFrame con i valori della colonna figlio ed è denominato Child . Ma se si hanno nomi identici per attributi di diverse strutture genitore, si perdono le informazioni sul genitore e possono finire con nomi di colonne identici e non possono più accedervi per nome poiché sono non ambigui.

Questo era il mio problema

Ho trovato una soluzione al mio problema, forse può aiutare qualcun altro. Ho chiamato separatamente il flattenSchema :

 val flattenedSchema = flattenSchema(df.schema) 

e questo ha restituito un object Array of Column. Invece di usare questo nella select() , che restituirebbe un DataFrame con colonne nominate dal child dell’ultimo livello, ho mappato i nomi delle colonne originali come stringhe, quindi dopo aver selezionato Parent.Child column, lo rinomina come Parent.Child invece di Child (Ho anche sostituito punti con caratteri di sottolineatura per mia comodità):

 val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_"))) 

E poi puoi usare la funzione select come mostrato nella risposta originale:

 var newDf = df.select(renamedCols:_*) 

Volevo solo condividere la mia soluzione per Pyspark – è più o meno una traduzione della soluzione di @David Griffin, quindi supporta qualsiasi livello di oggetti nidificati.

 from pyspark.sql.types import StructType, ArrayType def flatten(schema, prefix=None): fields = [] for field in schema.fields: name = prefix + '.' + field.name if prefix else field.name dtype = field.dataType if isinstance(dtype, ArrayType): dtype = dtype.elementType if isinstance(dtype, StructType): fields += flatten(dtype, prefix=name) else: fields.append(name) return fields df.select(flattenSchema(df.schema)).show() 

È anche ansible utilizzare SQL per selezionare colonne come piatte.

  1. Ottieni lo schema del frame dei dati originale
  2. Genera stringa SQL, sfogliando lo schema
  3. Richiedi il tuo frame dati originale

Ho fatto un’implementazione in Java: https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

(usa anche il metodo ricorsivo, preferisco il modo SQL, quindi puoi testarlo facilmente tramite Spark-shell).

Ho utilizzato un liner che si traduce in uno schema appiattito con 5 colonne di barre, baz, x, y, z:

 df.select("foo.*", "x", "y", "z") 

Per quanto riguarda l’ explode : in genere mi riservo di explode per appiattire una lista. Ad esempio se hai una idList colonne che è una lista di stringhe, puoi fare:

 df.withColumn("flattenedId", functions.explode(col("idList"))) .drop("idList") 

Ciò si tradurrà in un nuovo Dataframe con una colonna denominata flattenedId (non più una lista)

Ecco una funzione che sta facendo ciò che vuoi e che può trattare con più colonne annidate contenenti colonne con lo stesso nome, con un prefisso:

 from pyspark.sql import functions as F def flatten_df(nested_df): flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'] nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'] flat_df = nested_df.select(flat_cols + [F.col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols for c in nested_df.select(nc+'.*').columns]) return flat_df 

Prima:

 root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) |-- bar: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) 

Dopo:

 root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo_a: float (nullable = true) |-- foo_b: float (nullable = true) |-- foo_c: integer (nullable = true) |-- bar_a: float (nullable = true) |-- bar_b: float (nullable = true) |-- bar_c: integer (nullable = true) 

Ho aggiunto un metodo DataFrame#flattenSchema al progetto spark-daria open source.

Ecco come puoi utilizzare la funzione con il tuo codice.

 import com.github.mrpowers.spark.daria.sql.DataFrameExt._ df.flattenSchema().show() +-------+-------+---------+----+---+ |foo.bar|foo.baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+ 

È inoltre ansible specificare diversi delimitatori di nomi di colonne con il metodo flattenSchema() .

 df.flattenSchema(delimiter = "_").show() +-------+-------+---------+----+---+ |foo_bar|foo_baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+ 

Questo parametro delimitatore è sorprendentemente importante. Se stai appiattendo lo schema per caricare la tabella in Redshift, non sarai in grado di utilizzare i periodi come delimitatore.

Ecco lo snippet di codice completo per generare questo output.

 val data = Seq( Row(Row("this", "is"), "something", "cool", ";)") ) val schema = StructType( Seq( StructField( "foo", StructType( Seq( StructField("bar", StringType, true), StructField("baz", StringType, true) ) ), true ), StructField("x", StringType, true), StructField("y", StringType, true), StructField("z", StringType, true) ) ) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), StructType(schema) ) df.flattenSchema().show() 

Il codice sottostante è simile al codice di David Griffin (nel caso in cui non si voglia aggiungere la dipendenza spark-daria al proprio progetto).

 object StructTypeHelpers { def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = { schema.fields.flatMap(structField => { val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name structField.dataType match { case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName) case _ => Array(col(codeColName).alias(colName)) } }) } } object DataFrameExt { implicit class DataFrameMethods(df: DataFrame) { def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = { df.select( StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _* ) } } }