Come definire lo schema per il tipo personalizzato in Spark SQL?

Il seguente codice di esempio tenta di inserire alcuni oggetti del caso in un dataframe. Il codice include la definizione di una gerarchia di oggetti caso e una class case utilizzando questa caratteristica:

import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.SQLContext sealed trait Some case object AType extends Some case object BType extends Some case class Data( name : String, t: Some) object Example { def main(args: Array[String]) : Unit = { val conf = new SparkConf() .setAppName( "Example" ) .setMaster( "local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF() df.show() } } 

Quando eseguo il codice, sfortunatamente incontro la seguente eccezione:

 java.lang.UnsupportedOperationException: Schema for type Some is not supported 

Domande

  • C’è la possibilità di aggiungere o definire uno schema per alcuni tipi (qui digitare Some )?
  • Esiste un altro approccio per rappresentare questo tipo di enumerazioni?
    • Ho provato a usare Enumeration direttamente, ma anche senza successo. (vedi sotto)

Codice per l’ Enumeration :

 object Some extends Enumeration { type Some = Value val AType, BType = Value } 

Grazie in anticipo. Spero che l’approccio migliore non sia quello di utilizzare le stringhe.

Spark 2.0.0+ :

UserDefinedType è stato reso privato in Spark 2.0.0 e per ora non ha sostituto di Dataset .

Vedi: SPARK-14155 (Nascondi UserDefinedType in Spark 2.0)

La maggior parte delle volte il Dataset tipizzato in modo statico può servire come sostituzione. C’è una Jira SPARK-7768 in sospeso per rendere nuovamente pubblica l’API UDT con la versione di destinazione 2.4.

Vedi anche Come conservare oggetti personalizzati in Dataset?

Spark <2.0.0

C’è la possibilità di aggiungere o definire uno schema per alcuni tipi (qui digitare alcuni)?

Immagino che la risposta dipenda da quanto hai bisogno di questo. Sembra che sia ansible creare un UserDefinedType ma richiede l’accesso a DeveloperApi e non è esattamente semplice o ben documentato.

 import org.apache.spark.sql.types._ @SQLUserDefinedType(udt = classOf[SomeUDT]) sealed trait Some case object AType extends Some case object BType extends Some class SomeUDT extends UserDefinedType[Some] { override def sqlType: DataType = IntegerType override def serialize(obj: Any) = { obj match { case AType => 0 case BType => 1 } } override def deserialize(datum: Any): Some = { datum match { case 0 => AType case 1 => BType } } override def userClass: Class[Some] = classOf[Some] } 

Probabilmente dovresti sovrascrivere hashCode e equals .

La sua controparte di PySpark può assomigliare a questo:

 from enum import Enum, unique from pyspark.sql.types import UserDefinedType, IntegerType class SomeUDT(UserDefinedType): @classmethod def sqlType(self): return IntegerType() @classmethod def module(cls): return cls.__module__ @classmethod def scalaUDT(cls): # Required in Spark < 1.5 return 'net.zero323.enum.SomeUDT' def serialize(self, obj): return obj.value def deserialize(self, datum): return {x.value: x for x in Some}[datum] @unique class Some(Enum): __UDT__ = SomeUDT() AType = 0 BType = 1 

In Spark <1.5 Python UDT richiede un UDT Scala appaiato, ma sembra che non sia più il caso in 1.5.

Per un semplice UDT come è ansible utilizzare tipi semplici (ad esempio IntegerType anziché Struct ).