SparkSQL: applica funzioni di aggregazione a un elenco di colonne

C’è un modo per applicare una funzione di aggregazione a tutte le colonne (o un elenco di) di un dataframe, quando si fa un groupBy ? In altre parole, c’è un modo per evitare di farlo per ogni colonna:

 df.groupBy("col1") .agg(sum("col2").alias("col2"), sum("col3").alias("col3"), ...) 

Esistono diversi modi per applicare funzioni di aggregazione a più colonne.

GroupedData class GroupedData fornisce un numero di metodi per le funzioni più comuni, inclusi count , max , min , mean e sum , che possono essere utilizzati direttamente come segue:

  • Pitone:

     df = sqlContext.createDataFrame( [(1.0, 0.3, 1.0), (1.0, 0.5, 0.0), (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2)], ("col1", "col2", "col3")) df.groupBy("col1").sum() ## +----+---------+-----------------+---------+ ## |col1|sum(col1)| sum(col2)|sum(col3)| ## +----+---------+-----------------+---------+ ## | 1.0| 2.0| 0.8| 1.0| ## |-1.0| -2.0|6.199999999999999| 0.7| ## +----+---------+-----------------+---------+ 
  • Scala

     val df = sc.parallelize(Seq( (1.0, 0.3, 1.0), (1.0, 0.5, 0.0), (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2)) ).toDF("col1", "col2", "col3") df.groupBy($"col1").min().show // +----+---------+---------+---------+ // |col1|min(col1)|min(col2)|min(col3)| // +----+---------+---------+---------+ // | 1.0| 1.0| 0.3| 0.0| // |-1.0| -1.0| 0.6| 0.2| // +----+---------+---------+---------+ 

Opzionalmente puoi passare un elenco di colonne che dovrebbero essere aggregate

 df.groupBy("col1").sum("col2", "col3") 

Puoi anche passare dizionario / mappa con colonne a le chiavi e funziona come i valori:

  • Pitone

     exprs = {x: "sum" for x in df.columns} df.groupBy("col1").agg(exprs).show() ## +----+---------+ ## |col1|avg(col3)| ## +----+---------+ ## | 1.0| 0.5| ## |-1.0| 0.35| ## +----+---------+ 
  • Scala

     val exprs = df.columns.map((_ -> "mean")).toMap df.groupBy($"col1").agg(exprs).show() // +----+---------+------------------+---------+ // |col1|avg(col1)| avg(col2)|avg(col3)| // +----+---------+------------------+---------+ // | 1.0| 1.0| 0.4| 0.5| // |-1.0| -1.0|3.0999999999999996| 0.35| // +----+---------+------------------+---------+ 

Finalmente puoi usare varargs:

  • Pitone

     from pyspark.sql.functions import min exprs = [min(x) for x in df.columns] df.groupBy("col1").agg(*exprs).show() 
  • Scala

     import org.apache.spark.sql.functions.sum val exprs = df.columns.map(sum(_)) df.groupBy($"col1").agg(exprs.head, exprs.tail: _*) 

Ci sono altri modi per ottenere un effetto simile, ma questi dovrebbero essere più che sufficienti il ​​più delle volte.

Un altro esempio dello stesso concetto – ma diciamo – hai 2 colonne diverse – e vuoi applicare diverse funzioni agg ad ognuna di esse, ad es

 f.groupBy("col1").agg(sum("col2").alias("col2"), avg("col3").alias("col3"), ...) 

Ecco il modo per raggiungerlo, anche se non so ancora come aggiungere l’alias in questo caso

Vedi l’esempio qui sotto – Uso di Maps

 val Claim1 = StructType(Seq(StructField("pid", StringType, true),StructField("diag1", StringType, true),StructField("diag2", StringType, true), StructField("allowed", IntegerType, true), StructField("allowed1", IntegerType, true))) val claimsData1 = Seq(("PID1", "diag1", "diag2", 100, 200), ("PID1", "diag2", "diag3", 300, 600), ("PID1", "diag1", "diag5", 340, 680), ("PID2", "diag3", "diag4", 245, 490), ("PID2", "diag2", "diag1", 124, 248)) val claimRDD1 = sc.parallelize(claimsData1) val claimRDDRow1 = claimRDD1.map(p => Row(p._1, p._2, p._3, p._4, p._5)) val claimRDD2DF1 = sqlContext.createDataFrame(claimRDDRow1, Claim1) val l = List("allowed", "allowed1") val exprs = l.map((_ -> "sum")).toMap claimRDD2DF1.groupBy("pid").agg(exprs) show false val exprs = Map("allowed" -> "sum", "allowed1" -> "avg") claimRDD2DF1.groupBy("pid").agg(exprs) show false