Articles of spark dataframe

Come posso aggiungere una colonna persistente di ID di riga a Spark DataFrame?

Questa domanda non è nuova, tuttavia trovo un comportamento sorprendente in Spark. Devo aggiungere una colonna di ID di riga a un DataFrame. Ho usato il metodo DataFrame monotonically_increasing_id () e mi fornisce un ulteriore col di ID di riga univoci (che NON sono consecutivi a proposito, ma sono unici). Il problema che sto avendo […]

Come importare più file CSV in un singolo caricamento?

Considera di avere uno schema definito per il caricamento di 10 file CSV in una cartella. C’è un modo per caricare automaticamente le tabelle usando Spark SQL. So che questo può essere eseguito utilizzando un singolo dataframe per ogni file [indicato di seguito], ma può essere automatizzato con un singolo comando piuttosto che puntare un […]

Come trovare la dimensione RDD / Dataframe della scintilla?

So come trovare la dimensione del file in scala.Ma come trovare una dimensione RDD / dataframe in scintilla? Scala: object Main extends App { val file = new java.io.File(“hdfs://localhost:9000/samplefile.txt”).toString() println(file.length) } Scintilla: val distFile = sc.textFile(file) println(distFile.length) ma se lo elaboro non ottenendo la dimensione del file. Come trovare la dimensione RDD?

Spark DataFrame Schema Nullable Fields

Ho scritto il seguente codice sia in scala che in python, tuttavia il DataFrame restituito non sembra applicare i campi non annullabili nel mio schema che sto applicando. italianVotes.csv è un file csv con ‘~’ come separatore e quattro campi. Sto usando la scintilla 2.1.0 . italianVotes.csv 2657~135~2~2013-11-22 00:00:00.0 2658~142~2~2013-11-22 00:00:00.0 2659~142~1~2013-11-22 00:00:00.0 2660~140~2~2013-11-22 00:00:00.0 […]

Spark UDAF con ArrayType come problemi di prestazioni bufferSchema

Sto lavorando su un UDAF che restituisce una serie di elementi. L’input per ogni aggiornamento è una tupla di indice e valore. Quello che fa l’UDAF è sumre tutti i valori sotto lo stesso indice. Esempio: Per input (indice, valore): (2,1), (3,1), (2,3) dovrebbe tornare (0,0,4,1, …, 0) La logica funziona bene, ma ho un […]

Spark Sql UDF con parametri di input complessi

Sto cercando di usare UDF con il tipo di input Array of struct. Ho la seguente struttura di dati questa è solo una parte rilevante di una struttura più grande |–investments: array (nullable = true) | |– element: struct (containsNull = true) | | |– funding_round: struct (nullable = true) | | | |– company: […]

spark.ml StringIndexer genera l’etichetta ‘Unseen’ su fit ()

Sto preparando un esempio di spark.ml giocattolo. Spark version 1.6.0 , in esecuzione su Oracle JDK version 1.8.0_65 , pyspark, notebook ipython. Innanzitutto, non ha nulla a che fare con Spark, ML, StringIndexer: gestione di etichette invisibili . L’eccezione viene generata mentre si adatta una pipeline a un set di dati, non a trasformarla. E […]

Spark, DataFrame: applica il trasformatore / stimatore sui gruppi

Ho un DataFrame che sembra seguire: +———–+—–+————+ | userID|group| features| +———–+—–+————+ |12462563356| 1| [5.0,43.0]| |12462563701| 2| [1.0,8.0]| |12462563701| 1| [2.0,12.0]| |12462564356| 1| [1.0,1.0]| |12462565487| 3| [2.0,3.0]| |12462565698| 2| [1.0,1.0]| |12462565698| 1| [1.0,1.0]| |12462566081| 2| [1.0,2.0]| |12462566081| 1| [1.0,15.0]| |12462566225| 2| [1.0,1.0]| |12462566225| 1| [9.0,85.0]| |12462566526| 2| [1.0,1.0]| |12462566526| 1| [3.0,79.0]| |12462567006| 2| [11.0,15.0]| |12462567006| 1| […]

Funzionalità Spark vs prestazioni UDF?

Spark ora offre funzioni predefinite che possono essere utilizzate nei dataframes e sembra che siano altamente ottimizzate. La mia domanda iniziale stava per essere più veloce, ma ho fatto alcuni test e ho trovato che le funzioni spark erano circa 10 volte più veloci almeno in un’istanza. Qualcuno sa perché è così, e quando sarebbe […]

Spark Dataframes Tabella da UPSERT a Postgres

Sto usando Apache Spark DataFrames per unire due origini dati e ottenere il risultato come un altro DataFrame. Voglio scrivere il risultato su un altro tavolo Postgres. Vedo questa opzione: myDataFrame.write.jdbc(url, table, connectionProperties) Ma, quello che voglio fare è UPSERT il dataframe nella tabella basata sulla chiave primaria della tabella. come va fatto? Sto usando […]