Articles of apache spark

Come definire una funzione di aggregazione personalizzata per sumre una colonna di Vettori?

Ho un DataFrame di due colonne, ID di tipo Int e Vec di tipo Vector ( org.apache.spark.mllib.linalg.Vector ). Il DataFrame si presenta come segue: ID,Vec 1,[0,0,5] 1,[4,0,1] 1,[1,2,1] 2,[7,5,0] 2,[3,3,4] 3,[0,8,1] 3,[0,0,1] 3,[7,7,7] …. Mi piacerebbe fare un groupBy($”ID”) quindi applicare un’aggregazione sulle righe all’interno di ciascun gruppo sumndo i vettori. L’output desiderato dell’esempio precedente […]

Come posso passare parametri extra alle UDF in SparkSql?

Voglio analizzare le colonne della data in un DataFrame e per ciascuna colonna della data, la risoluzione per la data potrebbe cambiare (ovvero 2011/01/10 => DataFrame se la risoluzione è impostata su “Mese”). Ho scritto il seguente codice: def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = { import org.apache.spark.sql.functions._ val convertDateFunc = […]

Spark UDF con vararg

È una sola opzione per elencare tutti gli argomenti fino a 22 come mostrato nella documentazione? https://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.UDFRegistration Qualcuno ha capito come fare qualcosa di simile a questo? sc.udf.register(“func”, (s: String*) => s…… (scrivendo la funzione di concat personalizzata che salta i null, ha avuto 2 argomenti al momento) Grazie

Moltiplicazione della matrice in Apache Spark

Sto cercando di eseguire la moltiplicazione della matrice usando Apache Spark e Java. Ho 2 domande principali: Come creare RDD che può rappresentare la matrice in Apache Spark? Come moltiplicare due di questi RDD?

Quali operazioni preservano l’ordine RDD?

RDD ha un ordine significativo (al contrario di un ordine casuale imposto dal modello di archiviazione) se è stato elaborato da sortBy() , come spiegato in questa risposta . Ora, quali operazioni preservano quell’ordine? Ad esempio, è garantito che (dopo a.sortBy() ) a.map(f).zip(a) === a.map(x => (f(x),x)) Che ne dite di a.filter(f).map(g) === a.map(x => […]

Come creare un DataFrame vuoto con uno schema specificato?

Voglio creare su DataFrame con uno schema specificato in Scala. Ho provato ad usare JSON read (intendo leggere file vuoto) ma non penso che sia la migliore pratica.

Rinominare i nomi delle colonne di un DataFrame in Spark Scala

Sto cercando di convertire tutti i nomi delle intestazioni / colonne di un DataFrame in Spark-Scala. a partire da ora mi viene in mente il seguente codice che sostituisce solo un nome di singola colonna. for( i <- 0 to origCols.length – 1) { df.withColumnRenamed( df.columns(i), df.columns(i).toLowerCase ); }

Come utilizzare COGROUP per set di dati di grandi dimensioni

Ho due rdd’s vale a dire val tab_a: RDD[(String, String)] e val tab_b: RDD[(String, String)] Sto usando cogroup per quei set di dati come: val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } } Sto usando i valori tab_c cogrouped per la funzione map e funziona bene per piccoli set […]

Spark perde println () su stdout

Ho il codice seguente: val blueCount = sc.accumulator[Long](0) val output = input.map { data => for (value <- data.getValues()) { if (record.getEnum() == DataEnum.BLUE) { blueCount += 1 println("Enum = BLUE : " + value.toString() } } data }.persist(StorageLevel.MEMORY_ONLY_SER) output.saveAsTextFile("myOutput") Quindi blueCount non è zero, ma non ho ricevuto output println ()! Mi sto perdendo […]

NullPointerException in Scala Spark, sembra essere causato da tipo di raccolta?

sessionIdList è di tipo: scala> sessionIdList res19: org.apache.spark.rdd.RDD [String] = MappedRDD [17] distinto a: 30 Quando provo a eseguire il codice seguente: val x = sc.parallelize(List(1,2,3)) val cartesianComp = x.cartesian(x).map(x => (x)) val kDistanceNeighbourhood = sessionIdList.map(s => { cartesianComp.filter(v => v != null) }) kDistanceNeighbourhood.take(1) Ricevo un’eccezione 14/05/21 16:20:46 ERROR Executor: Exception in task ID […]