MatchError durante l’accesso alla colonna di vettori in Spark 2.0

Sto cercando di creare un modello LDA su un file JSON.

Creazione di un contesto spark con il file JSON:

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder .master("local") .appName("my-spark-app") .config("spark.some.config.option", "config-value") .getOrCreate() val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt") 

La visualizzazione del df dovrebbe mostrare DataFrame

 display(df) 

Tokenizzare il testo

 import org.apache.spark.ml.feature.RegexTokenizer // Set params for RegexTokenizer val tokenizer = new RegexTokenizer() .setPattern("[\\W_]+") .setMinTokenLength(4) // Filter away tokens with length < 4 .setInputCol("text") .setOutputCol("tokens") // Tokenize document val tokenized_df = tokenizer.transform(df) 

Questo dovrebbe visualizzare tokenized_df

 display(tokenized_df) 

Ottieni le stopwords

 %sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords 

Facoltativo: copia degli stopword nella cartella tmp

 %fs cp file:/tmp/stopwords dbfs:/tmp/stopwords 

Raccolta di tutte le stopwords

 val stopwords = sc.textFile("/tmp/stopwords").collect() 

Filtraggio delle stopwords

  import org.apache.spark.ml.feature.StopWordsRemover // Set params for StopWordsRemover val remover = new StopWordsRemover() .setStopWords(stopwords) // This parameter is optional .setInputCol("tokens") .setOutputCol("filtered") // Create new DF with Stopwords removed val filtered_df = remover.transform(tokenized_df) 

La visualizzazione del df filtrato dovrebbe verificare che le stopwords rimosse

  display(filtered_df) 

Vettorizzazione della frequenza di occorrenza di parole

  import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.Row import org.apache.spark.ml.feature.CountVectorizer // Set params for CountVectorizer val vectorizer = new CountVectorizer() .setInputCol("filtered") .setOutputCol("features") .fit(filtered_df) 

Verifica il vectorizer

  vectorizer.transform(filtered_df) .select("id", "text","features","filtered").show() 

Dopo questo, sto riscontrando un problema nel assembly di questo vectorizer in LDA. Il problema che ritengo sia CountVectorizer stia dando un vettore CountVectorizer ma LDA richiede un vettore denso. Sto ancora cercando di capire il problema.

Ecco l’eccezione in cui la mappa non è in grado di convertire.

 import org.apache.spark.mllib.linalg.Vector val ldaDF = countVectors.map { case Row(id: String, countVector: Vector) => (id, countVector) } display(ldaDF) 

Eccezione :

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 

Esiste un campione funzionante per l’ADL che non genera alcun problema

 import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.Row import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} val a = Vectors.dense(Array(1.0,2.0,3.0)) val b = Vectors.dense(Array(3.0,4.0,5.0)) val df = Seq((1L,a),(2L,b),(2L,a)).toDF val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) } val model = new LDA().setK(3).run(ldaDF.javaRDD) display(df) 

L’unica differenza è nel secondo frammento che stiamo avendo una matrice densa.