Spark – carica il file CSV come DataFrame?

Vorrei leggere un CSV in spark e convertirlo come DataFrame e memorizzarlo in HDFS con df.registerTempTable("table_name")

Ho provato:

 scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv") 

Errore che ho ottenuto:

 java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Qual è il comando giusto per caricare il file CSV come DataFrame in Apache Spark?

spark-csv fa parte della funzionalità di base di Spark e non richiede una libreria separata. Quindi potresti solo fare per esempio

 df = spark.read.format("csv").option("header", "true").load("csvfile.csv") 

analizzare CSV come DataFrame / DataSet con Spark 2.x

Prima inizializza SparkSession object SparkSession per impostazione predefinita sarà disponibile nelle shell come spark

 val spark = org.apache.spark.sql.SparkSession.builder .master("local") .appName("Spark CSV Reader") .getOrCreate; 

Utilizzare uno qualsiasi dei seguenti modi per caricare CSV come DataFrame/DataSet

1. Fatelo in modo programmatico

  val df = spark.read .format("csv") .option("header", "true") //reading the headers .option("mode", "DROPMALFORMED") .load("hdfs:///csv/file/dir/file.csv") 

2. Puoi anche fare questo modo SQL

  val df = spark.sql("SELECT * FROM csv.`csv/file/path/in/hdfs`") 

Dipendenze :

  "org.apache.spark" % "spark-core_2.11" % 2.0.0, "org.apache.spark" % "spark-sql_2.11" % 2.0.0, 


Spark versione <2.0

 val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("csv/file/path"); 

dipendenze:

 "org.apache.spark" % "spark-sql_2.10" % 1.6.0, "com.databricks" % "spark-csv_2.10" % 1.6.0, "com.univocity" % "univocity-parsers" % LATEST, 

È per il quale Hadoop è 2.6 e Spark è 1.6 e senza pacchetto “databricks”.

 import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; import org.apache.spark.sql.Row; val csv = sc.textFile("/path/to/file.csv") val rows = csv.map(line => line.split(",").map(_.trim)) val header = rows.first val data = rows.filter(_(0) != header(0)) val rdd = data.map(row => Row(row(0),row(1).toInt)) val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val", IntegerType, true)) val df = sqlContext.createDataFrame(rdd, schema) 

Con Spark 2.0, di seguito è come è ansible leggere CSV

 val conf = new SparkConf().setMaster("local[2]").setAppName("my app") val sc = new SparkContext(conf) val sparkSession = SparkSession.builder .config(conf = conf) .appName("spark session example") .getOrCreate() val path = "/Users/xxx/Downloads/usermsg.csv" val base_df = sparkSession.read.option("header","true"). csv(path) 

In Java 1.8 Questo frammento di codice funziona perfettamente per leggere i file CSV

pom.xml

  org.apache.spark spark-core_2.11 2.0.0    org.apache.spark spark-sql_2.10 2.0.0    org.scala-lang scala-library 2.11.8   com.databricks spark-csv_2.10 1.4.0  

Giava

 SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show(); 

L’esempio di Penny’s Spark 2 è il modo di farlo in spark2. C’è un altro trucco: avere quell’intestazione generata per te facendo una scansione iniziale dei dati, impostando l’opzione inferSchema su true

Qui, quindi, assumendo che la spark è una sessione scintilla che hai impostato, è l’operazione da caricare nel file indice CSV di tutte le immagini Landsat che ospita Amazon su S3.

  /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ val csvdata = spark.read.options(Map( "header" -> "true", "ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true", "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ", "inferSchema" -> "true", "mode" -> "FAILFAST")) .csv("s3a://landsat-pds/scene_list.gz") 

Le cattive notizie sono: questo fa scattare una scansione attraverso il file; per qualcosa di grande come questo file CSV zippato da 20 + MB, che può richiedere 30 secondi su una connessione a lungo raggio. Tienilo a mente: stai meglio codificando manualmente lo schema una volta arrivato.

(snippet di codice Apache Software License 2.0 concesso in licenza per evitare ogni ambiguità, cosa che ho fatto come test di dimostrazione / integrazione dell’integrazione S3)

Il formato file predefinito è Parquet con spark.read .. e il file che legge csv è il motivo per cui stai ricevendo l’eccezione. Specifica il formato csv con l’API che stai tentando di utilizzare

Ci sono molte sfide per analizzare un file CSV, continua a sumrsi se la dimensione del file è maggiore, se ci sono caratteri non-english / escape / separator / altri nei valori delle colonne, ciò potrebbe causare errori di parsing.

La magia quindi è nelle opzioni che vengono utilizzate. Quelli che hanno funzionato per me e la speranza dovrebbe coprire la maggior parte dei casi limite sono nel codice qui sotto:

 ### Create a Spark Session spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate() ### Note the options that are used. You may have to tweak these in case of error html_df = spark.read.csv(html_csv_file_path, header=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, encoding="UTF-8", sep=',', quote='"', escape='"', maxColumns=2, inferSchema=True) 

Spero possa aiutare. Per ulteriori informazioni: Utilizzo di PySpark 2 per leggere CSV con codice sorgente HTML

Nota: il codice sopra riportato proviene dall’API di Spark 2, in cui l’API di lettura del file CSV viene fornita in bundle con pacchetti integrati di Spark installabili.

Nota: PySpark è un wrapper Python per Spark e condivide la stessa API di Scala / Java.