Spark Sql UDF con parametri di input complessi

Sto cercando di usare UDF con il tipo di input Array of struct. Ho la seguente struttura di dati questa è solo una parte rilevante di una struttura più grande

|--investments: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- funding_round: struct (nullable = true) | | | |-- company: struct (nullable = true) | | | | |-- name: string (nullable = true) | | | | |-- permalink: string (nullable = true) | | | |-- funded_day: long (nullable = true) | | | |-- funded_month: long (nullable = true) | | | |-- funded_year: long (nullable = true) | | | |-- raised_amount: long (nullable = true) | | | |-- raised_currency_code: string (nullable = true) | | | |-- round_code: string (nullable = true) | | | |-- source_description: string (nullable = true) | | | |-- source_url: string (nullable = true) 

Ho dichiarato le classi di casi:

 case class Company(name: String, permalink: String) case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String) case class Investments(funding_round: FundingRound) 

Dichiarazione UDF:

 sqlContext.udf.register("total_funding", (investments:Seq[Investments]) => { val totals = investments.map(r => r.funding_round.raised_amount) totals.sum }) 

Quando eseguo la seguente trasformazione, il risultato è come previsto

 scala> sqlContext.sql("""select total_funding(investments) from companies""") res11: org.apache.spark.sql.DataFrame = [_c0: bigint] 

Ma quando un’azione eseguita come raccolta ha un errore:

 Executor: Exception in task 0.0 in stage 4.0 (TID 10) java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments 

Grazie per tutto l’aiuto.

L’errore che vedi dovrebbe essere praticamente auto-esplicativo. Esiste una mapping rigorosa tra i tipi Catalyst / SQL e i tipi Scala che è ansible trovare nella sezione pertinente di Spark SQL, DataFrames e Dataset Guide .

In particolare i tipi di struct vengono convertiti in oassql.Row (nel tuo caso particolare i dati saranno esposti come Seq[Row] ).

Esistono diversi metodi che possono essere utilizzati per esporre i dati come tipi specifici:

  • Definire UDT (tipo definito dall’utente) che è stato rimosso in 2.0.0 e non ha sostituto per ora .
  • Conversione di DataFrame in Dataset[T] dove T è un tipo locale desiderato.

con solo il precedente approccio potrebbe essere applicabile in questo particolare scenario.

Se vuoi accedere a investments.funding_round.raised_amount usando UDF avrai bisogno di qualcosa del genere:

 val getRaisedAmount = udf((investments: Seq[Row]) => scala.util.Try( investments.map(_.getAs[Row]("funding_round").getAs[Long]("raised_amount")) ).toOption) 

ma la select semplice dovrebbe essere molto più sicura e più pulita:

 df.select($"investments.funding_round.raised_amount") 

Ho creato una semplice libreria che ricava gli encoder necessari per tipi di prodotti complessi in base ai parametri del tipo di input.

https://github.com/lesbroot/typedudf

 import typedudf.TypedUdf import typedudf.ParamEncoder._ case class Foo(x: Int, y: String) val fooUdf = TypedUdf((foo: Foo) => foo.x + foo.y.length) df.withColumn("sum", fooUdf($"foo"))