Articles of apache spark sql

Ricava più colonne da una singola colonna in Spark DataFrame

Ho un DF con un enorme metadata parseable come una singola colonna di stringa in un Dataframe, consente di chiamarlo DFA, con ColmnA. Vorrei rompere questa colonna, ColmnA in più colonne attraverso una funzione, ClassXYZ = Func1 (ColmnA). Questa funzione restituisce una class ClassXYZ, con più variabili, e ognuna di queste variabili deve ora essere […]

Partizionamento in scintilla durante la lettura da RDBMS tramite JDBC

Sto facendo scintilla in modalità cluster e leggendo i dati da RDBMS tramite JDBC. Come da Spark doc , questi parametri di partizionamento descrivono come partizionare la tabella quando si legge in parallelo da più worker: partitionColumn, lowerBound, upperBound, numPartitions Questi sono parametri opzionali. Cosa accadrebbe se non specificassi questi: Solo 1 lavoratore ha letto […]

MatchError durante l’accesso alla colonna di vettori in Spark 2.0

Sto cercando di creare un modello LDA su un file JSON. Creazione di un contesto spark con il file JSON: import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder .master(“local”) .appName(“my-spark-app”) .config(“spark.some.config.option”, “config-value”) .getOrCreate() val df = spark.read.json(“dbfs:/mnt/JSON6/JSON/sampleDoc.txt”) La visualizzazione del df dovrebbe mostrare DataFrame display(df) Tokenizzare il testo import org.apache.spark.ml.feature.RegexTokenizer // Set params for RegexTokenizer val tokenizer […]

Come aggregare i valori in collezione dopo groupBy?

Ho un dataframe con schema in quanto tale: [visitorId: string, trackingIds: array, emailIds: array] Alla ricerca di un modo per raggruppare (o forse rollup?) Questo dataframe da visitorid dove le colonne trackingIds e emailIds si aggiungerebbero insieme. Ad esempio, se il mio file df iniziale ha il seguente aspetto: visitorId |trackingIds|emailIds +———–+————+——– |a158| [666b] | […]

DataWrame-ified zipWithIndex

Sto cercando di risolvere l’annoso problema di aggiungere un numero di sequenza a un set di dati. Sto lavorando con DataFrames e sembra che non ci sia nessun DataFrame equivalente a RDD.zipWithIndex . D’altra parte, il seguente funziona più o meno nel modo in cui lo voglio: val origDF = sqlContext.load(…) val seqDF= sqlContext.createDataFrame( origDF.rdd.zipWithIndex.map(ln […]

Utilizza collect_list e collect_set in Spark SQL

Secondo i documenti , le funzioni collect_set e collect_list dovrebbero essere disponibili in Spark SQL. Tuttavia, non riesco a farlo funzionare. Sto usando Spark 1.6.0 usando un’immagine Docker . Sto provando a farlo in Scala: import org.apache.spark.sql.functions._ df.groupBy(“column1”) .agg(collect_set(“column2”)) .show() E ricevi il seguente errore in fase di runtime: Exception in thread “main” org.apache.spark.sql.AnalysisException: undefined […]

Come salvare / inserire ogni DStream in una tabella permanente

Ho riscontrato un problema con “Spark Streaming” sull’inserimento dell’output Dstream in una tabella SQL permanente . Mi piacerebbe inserire ogni output DStream (proveniente da un singolo batch che accenda i processi) in una tabella univoca. Sto usando Python con Spark versione 1.6.2. In questa parte del mio codice ho un Dstream costituito da uno o […]

Come comprimere due (o più) DataFrame in Spark

Ho due DataFrame a e b . è come Column 1 | Column 2 abc | 123 cde | 23 b è come Column 1 1 2 Voglio comprimere a e b (o anche più) DataFrames che diventa qualcosa come: Column 1 | Column 2 | Column 3 abc | 123 | 1 cde | […]

Ottimizzazione del join DataFrame – Broadcast Hash Join

Sto cercando di unire efficacemente due DataFrame, uno dei quali è grande e il secondo è un po ‘più piccolo. C’è un modo per evitare tutto questo mischiare? Non riesco a impostare autoBroadCastJoinThreshold , perché supporta solo autoBroadCastJoinThreshold interi – e la tabella che sto tentando di trasmettere è leggermente più grande del numero intero […]

Come connettersi a un metastore Hive in modo programmatico in SparkSQL?

Sto usando HiveContext con SparkSQL e sto provando a connettermi a un metastore Hive remoto, l’unico modo per impostare l’hive metastore è attraverso l’hive-site.xml sul classpath (o copiarlo su / etc / spark / conf /). C’è un modo per impostare questo parametro a livello di codice in un codice java senza includere hive-site.xml? In […]