Articles of apache spark

Quali operazioni preservano l’ordine RDD?

RDD ha un ordine significativo (al contrario di un ordine casuale imposto dal modello di archiviazione) se è stato elaborato da sortBy() , come spiegato in questa risposta . Ora, quali operazioni preservano quell’ordine? Ad esempio, è garantito che (dopo a.sortBy() ) a.map(f).zip(a) === a.map(x => (f(x),x)) Che ne dite di a.filter(f).map(g) === a.map(x => […]

Come creare un DataFrame vuoto con uno schema specificato?

Voglio creare su DataFrame con uno schema specificato in Scala. Ho provato ad usare JSON read (intendo leggere file vuoto) ma non penso che sia la migliore pratica.

Rinominare i nomi delle colonne di un DataFrame in Spark Scala

Sto cercando di convertire tutti i nomi delle intestazioni / colonne di un DataFrame in Spark-Scala. a partire da ora mi viene in mente il seguente codice che sostituisce solo un nome di singola colonna. for( i <- 0 to origCols.length – 1) { df.withColumnRenamed( df.columns(i), df.columns(i).toLowerCase ); }

Come utilizzare COGROUP per set di dati di grandi dimensioni

Ho due rdd’s vale a dire val tab_a: RDD[(String, String)] e val tab_b: RDD[(String, String)] Sto usando cogroup per quei set di dati come: val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } } Sto usando i valori tab_c cogrouped per la funzione map e funziona bene per piccoli set […]

Spark perde println () su stdout

Ho il codice seguente: val blueCount = sc.accumulator[Long](0) val output = input.map { data => for (value <- data.getValues()) { if (record.getEnum() == DataEnum.BLUE) { blueCount += 1 println("Enum = BLUE : " + value.toString() } } data }.persist(StorageLevel.MEMORY_ONLY_SER) output.saveAsTextFile("myOutput") Quindi blueCount non è zero, ma non ho ricevuto output println ()! Mi sto perdendo […]

NullPointerException in Scala Spark, sembra essere causato da tipo di raccolta?

sessionIdList è di tipo: scala> sessionIdList res19: org.apache.spark.rdd.RDD [String] = MappedRDD [17] distinto a: 30 Quando provo a eseguire il codice seguente: val x = sc.parallelize(List(1,2,3)) val cartesianComp = x.cartesian(x).map(x => (x)) val kDistanceNeighbourhood = sessionIdList.map(s => { cartesianComp.filter(v => v != null) }) kDistanceNeighbourhood.take(1) Ricevo un’eccezione 14/05/21 16:20:46 ERROR Executor: Exception in task ID […]

Come ottenere l’ID di un’attività sulla mappa in Spark?

C’è un modo per ottenere l’ID di un’attività sulla mappa in Spark? Ad esempio, se ogni attività della mappa chiama una funzione definita dall’utente, posso ottenere l’ID di tale attività della mappa da quella funzione definita dall’utente?

Come forzare la valutazione di DataFrame in Spark

A volte (ad es. Per test e bechmarking) voglio forzare l’esecuzione delle trasformazioni definite su un DataFrame. AFAIK che chiama un’azione come count non garantisce che tutte le Columns siano effettivamente calcolate, lo show può solo calcolare un sottoinsieme di tutte le Rows (vedi esempi sotto) La mia soluzione è scrivere il DataFrame su HDFS […]

Perché il parametro di partizione di SparkContext.textFile non ha effetto?

scala> val p=sc.textFile(“file:///c:/_home/so-posts.xml”, 8) //i’ve 8 cores p: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at :21 scala> p.partitions.size res33: Int = 729 Mi aspettavo di stampare 8 e vedo 729 compiti nell’interfaccia utente di Spark MODIFICARE: Dopo aver chiamato la repartition() come suggerito da @ zero323 scala> p1 = p.repartition(8) scala> p1.partitions.size res60: Int = 8 […]

Spark sql query vs funzioni dataframe

Per ottenere buone prestazioni con Spark. Mi chiedo se è bene usare query SQL tramite SQLContext o se è meglio fare query tramite funzioni df.select() come df.select() . Qualche idea? 🙂