Sto cercando di creare un modello LDA su un file JSON.
Creazione di un contesto spark con il file JSON:
import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder .master("local") .appName("my-spark-app") .config("spark.some.config.option", "config-value") .getOrCreate() val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")
La visualizzazione del df
dovrebbe mostrare DataFrame
display(df)
Tokenizzare il testo
import org.apache.spark.ml.feature.RegexTokenizer // Set params for RegexTokenizer val tokenizer = new RegexTokenizer() .setPattern("[\\W_]+") .setMinTokenLength(4) // Filter away tokens with length < 4 .setInputCol("text") .setOutputCol("tokens") // Tokenize document val tokenized_df = tokenizer.transform(df)
Questo dovrebbe visualizzare tokenized_df
display(tokenized_df)
Ottieni le stopwords
%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords
Facoltativo: copia degli stopword nella cartella tmp
%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords
Raccolta di tutte le stopwords
val stopwords = sc.textFile("/tmp/stopwords").collect()
Filtraggio delle stopwords
import org.apache.spark.ml.feature.StopWordsRemover // Set params for StopWordsRemover val remover = new StopWordsRemover() .setStopWords(stopwords) // This parameter is optional .setInputCol("tokens") .setOutputCol("filtered") // Create new DF with Stopwords removed val filtered_df = remover.transform(tokenized_df)
La visualizzazione del df
filtrato dovrebbe verificare che le stopwords
rimosse
display(filtered_df)
Vettorizzazione della frequenza di occorrenza di parole
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.Row import org.apache.spark.ml.feature.CountVectorizer // Set params for CountVectorizer val vectorizer = new CountVectorizer() .setInputCol("filtered") .setOutputCol("features") .fit(filtered_df)
Verifica il vectorizer
vectorizer.transform(filtered_df) .select("id", "text","features","filtered").show()
Dopo questo, sto riscontrando un problema nel assembly di questo vectorizer
in LDA. Il problema che ritengo sia CountVectorizer
stia dando un vettore CountVectorizer
ma LDA richiede un vettore denso. Sto ancora cercando di capire il problema.
Ecco l’eccezione in cui la mappa non è in grado di convertire.
import org.apache.spark.mllib.linalg.Vector val ldaDF = countVectors.map { case Row(id: String, countVector: Vector) => (id, countVector) } display(ldaDF)
Eccezione :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
Esiste un campione funzionante per l’ADL che non genera alcun problema
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.Row import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} val a = Vectors.dense(Array(1.0,2.0,3.0)) val b = Vectors.dense(Array(3.0,4.0,5.0)) val df = Seq((1L,a),(2L,b),(2L,a)).toDF val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) } val model = new LDA().setK(3).run(ldaDF.javaRDD) display(df)
L’unica differenza è nel secondo frammento che stiamo avendo una matrice densa.