Come modificare i tipi di colonna in DataFrame di Spark SQL?

Supponiamo che io stia facendo qualcosa del tipo:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true")) df.printSchema() root |-- year: string (nullable = true) |-- make: string (nullable = true) |-- model: string (nullable = true) |-- comment: string (nullable = true) |-- blank: string (nullable = true) df.show() year make model comment blank 2012 Tesla S No comment 1997 Ford E350 Go get one now th... 

ma volevo davvero che l’ year Int (e forse trasformassi qualche altra colonna).

Il meglio che potrei inventare è

 df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank) org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string] 

che è un po ‘contorto.

Vengo da R e sono abituato a scrivere, ad es

 df2 % mutate(year = year %>% as.integer, make = make %>% toupper) 

Probabilmente mi manchi qualcosa, poiché dovrebbe esserci un modo migliore per farlo in spark / scala …

Dalla versione 1.4 di Spark è ansible applicare il metodo di cast con DataType sulla colonna:

 import org.apache.spark.sql.types.IntegerType val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType)) .drop("year") .withColumnRenamed("yearTmp", "year") 

Se stai usando espressioni sql puoi anche fare:

 val df2 = df.selectExpr("cast(year as int) year", "make", "model", "comment", "blank") 

Per maggiori informazioni consulta i documenti: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

[EDIT: marzo 2016: grazie per i voti! Anche se in realtà, questa non è la migliore risposta, penso che le soluzioni basate su withColumn , withColumnRenamed e cast proposte da msemelman, Martin Senne e altre siano più semplici e pulite].

Penso che il tuo approccio sia ok, ricorda che Spark DataFrame è un RDD di righe (immutabile), quindi non sostituiremo mai una colonna, semplicemente creando ogni volta un nuovo DataFrame con un nuovo schema.

Supponendo di avere un file df originale con il seguente schema:

 scala> df.printSchema root |-- Year: string (nullable = true) |-- Month: string (nullable = true) |-- DayofMonth: string (nullable = true) |-- DayOfWeek: string (nullable = true) |-- DepDelay: string (nullable = true) |-- Distance: string (nullable = true) |-- CRSDepTime: string (nullable = true) 

E alcune UDF definite su una o più colonne:

 import org.apache.spark.sql.functions._ val toInt = udf[Int, String]( _.toInt) val toDouble = udf[Double, String]( _.toDouble) val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) val days_since_nearest_holidays = udf( (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12 ) 

Cambiare i tipi di colonna o anche build un nuovo DataFrame da un altro può essere scritto in questo modo:

 val featureDf = df .withColumn("departureDelay", toDouble(df("DepDelay"))) .withColumn("departureHour", toHour(df("CRSDepTime"))) .withColumn("dayOfWeek", toInt(df("DayOfWeek"))) .withColumn("dayOfMonth", toInt(df("DayofMonth"))) .withColumn("month", toInt(df("Month"))) .withColumn("distance", toDouble(df("Distance"))) .withColumn("nearestHoliday", days_since_nearest_holidays( df("Year"), df("Month"), df("DayofMonth")) ) .select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", "month", "distance", "nearestHoliday") 

che produce:

 scala> df.printSchema root |-- departureDelay: double (nullable = true) |-- departureHour: integer (nullable = true) |-- dayOfWeek: integer (nullable = true) |-- dayOfMonth: integer (nullable = true) |-- month: integer (nullable = true) |-- distance: double (nullable = true) |-- nearestHoliday: integer (nullable = true) 

Questo è abbastanza vicino alla tua soluzione. Semplicemente, mantenendo le modifiche di tipo e le altre trasformazioni come udf val separati, il codice diventa più leggibile e riutilizzabile.

Poiché l’operazione di cast è disponibile per Spark Column (e personalmente non udf come proposto da @ Svend a questo punto), che ne dici di:

 df.select( df("year").cast(IntegerType).as("year"), ... ) 

trasmettere al tipo richiesto? Come effetto collaterale, i valori non calcolabili / “convertibili” in questo senso, diventeranno null .

Se hai bisogno di questo come metodo di supporto , usa:

 object DFHelper{ def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = { df.withColumn( cn, df(cn).cast(tpe) ) } } 

che viene utilizzato come:

 import DFHelper._ val df2 = castColumnTo( df, "year", IntegerType ) 

Innanzitutto se vuoi digitare il tipo

 import org.apache.spark.sql df.withColumn("year", $"year".cast(sql.types.IntegerType)) 

Con lo stesso nome di colonna, la colonna verrà sostituita con una nuova, non è necessario aggiungere ed eliminare.

Secondo, su Scala vs R. il codice Scala più simile a R che posso ottenere:

 val df2 = df.select( df.columns.map { case year @ "year" => df(year).cast(IntegerType).as(year) case make @ "make" => functions.upper(df(make)).as(make) case other => df(other) }: _* ) 

Anche se la lunghezza è un po ‘più lunga di quella di R. Si noti che il mutate è una funzione per il frame di dati R, quindi Scala è abbastanza buono in termini di potenza espressiva senza usare una funzione speciale.

( df.columns è sorprendentemente un array [String] invece di Array [Column], forse lo vogliono come il dataframe dei panda di Python.)

Puoi usare selectExpr per renderlo un po ‘più pulito:

 df.selectExpr("cast(year as int) as year", "upper(make) as make", "model", "comment", "blank") 

Per convertire l’anno da stringa a int, è ansible aggiungere la seguente opzione al lettore csv: “inferSchema” -> “true”, consultare la documentazione di DataBricks

Quindi questo funziona davvero solo se hai problemi di salvataggio su un driver jdbc come sqlserver, ma è davvero utile per gli errori che potresti incontrare con syntax e tipi.

 import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect} import org.apache.spark.sql.jdbc.JdbcType val SQLServerDialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver") override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT)) case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT)) case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE)) case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL)) case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC") } } JdbcDialects.registerDialect(SQLServerDialect) 

Codice Java per la modifica del tipo di dati di DataFrame da String a Integer

 df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType)) 

Trasmetterà semplicemente l’esistente (tipo di dati String) a Integer.

le risposte che suggeriscono di usare il cast, FYI, il metodo cast nella scintilla 1.4.1 è rotto.

ad esempio, un dataframe con una colonna di stringhe con valore “8182175552014127960” quando è stato lanciato su bigint ha valore “8182175552014128100”

  df.show +-------------------+ | a| +-------------------+ |8182175552014127960| +-------------------+ df.selectExpr("cast(a as bigint) a").show +-------------------+ | a| +-------------------+ |8182175552014128100| +-------------------+ 

Abbiamo dovuto affrontare un sacco di problemi prima di trovare questo bug perché avevamo colonne bigint in produzione.

 df.select($"long_col".cast(IntegerType).as("int_col")) 

Puoi usare sotto il codice.

 df.withColumn("year", df("year").cast(IntegerType)) 

Che convertirà la colonna dell’anno nella colonna IntegerType .

Questo metodo eliminerà la vecchia colonna e creerà nuove colonne con gli stessi valori e un nuovo tipo di dati. I miei tipi di dati originali quando è stato creato DataFrame erano: –

 root |-- id: integer (nullable = true) |-- flag1: string (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag3: string (nullable = true) 

Dopo questo ho eseguito il codice seguente per cambiare il tipo di dati: –

 df=df.withColumnRenamed(,) // This was done for both flag1 and flag3 df=df.withColumn(,df.col().cast()).drop() 

Dopo questo il mio risultato è risultato essere: –

 root |-- id: integer (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag1: boolean (nullable = true) |-- flag3: boolean (nullable = true) 

Genera un semplice set di dati contenente cinque valori e converti int in string type:

 val df = spark.range(5).select( col("id").cast("string") ) 
  val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd //Schema to be applied to the table val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType) val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates() 

Si può cambiare il tipo di dati di una colonna usando cast in spark sql. il nome della tabella è tabella e ha solo due colonne, colonna1 e colonna2 e tipo di dati column1 devono essere modificati. ex-spark.sql (“seleziona cast (column1 come Double) column1NewName, column2 dalla tabella”) Al posto di double scrivi il tuo tipo di dati.

Un altro modo:

 // Generate a simple dataset containing five values and convert int to string type val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")