Come posso passare parametri extra alle UDF in SparkSql?

Voglio analizzare le colonne della data in un DataFrame e per ciascuna colonna della data, la risoluzione per la data potrebbe cambiare (ovvero 2011/01/10 => DataFrame se la risoluzione è impostata su “Mese”).

Ho scritto il seguente codice:

 def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = { import org.apache.spark.sql.functions._ val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)} val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)} val allColNames = dataframe.columns val allCols = allColNames.map(name => dataframe.col(name)) val mappedCols = { for(i  convertDateFunc(allCols(i), resolution(i))) case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i)) case _ => allCols(i) } } } dataframe.select(mappedCols:_*) }} 

Tuttavia non funziona. Sembra che io possa passare solo da Column a UDF. E mi chiedo se sarà molto lento se converto il DataFrame in RDD e applichi la funzione su ogni riga.

Qualcuno conosce la soluzione corretta? Grazie!

Basta usare un po ‘di curry:

 def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => SparkDateTimeConverter.convertDate(x, resolution)) 

e usarlo come segue:

 case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i)) 

Una nota a sql.functions.trunc si dovrebbe dare un’occhiata a sql.functions.trunc e sql.functions.date_format . Questi dovrebbero almeno parte del lavoro senza utilizzare le UDF.

Nota :

In Spark 2.2 o typedLit successive è ansible utilizzare la funzione typedLit :

 import org.apache.spark.sql.functions.typedLit 

che supportano una gamma più ampia di letterali come Seq o Map .

È ansible creare una Column letterale per passare a un udf utilizzando la funzione lit(...) definita in org.apache.spark.sql.functions

Per esempio:

 val takeRight = udf((s: String, i: Int) => s.takeRight(i)) df.select(takeRight($"stringCol", lit(1)))