Salva Spark Dataframe in Elasticsearch: imansible gestire l’eccezione di tipo

Ho progettato un lavoro semplice per leggere i dati da MySQL e salvarlo in Elasticsearch con Spark.

Ecco il codice:

JavaSparkContext sc = new JavaSparkContext( new SparkConf().setAppName("MySQLtoEs") .set("es.index.auto.create", "true") .set("es.nodes", "127.0.0.1:9200") .set("es.mapping.id", "id") .set("spark.serializer", KryoSerializer.class.getName())); SQLContext sqlContext = new SQLContext(sc); // Data source options Map options = new HashMap(); options.put("driver", MYSQL_DRIVER); options.put("url", MYSQL_CONNECTION_URL); options.put("dbtable", "OFFERS"); options.put("partitionColumn", "id"); options.put("lowerBound", "10001"); options.put("upperBound", "499999"); options.put("numPartitions", "10"); // Load MySQL query result as DataFrame LOGGER.info("Loading DataFrame"); DataFrame jdbcDF = sqlContext.load("jdbc", options); DataFrame df = jdbcDF.select("id", "title", "description", "merchantId", "price", "keywords", "brandId", "categoryId"); df.show(); LOGGER.info("df.count : " + df.count()); EsSparkSQL.saveToEs(df, "offers/product"); 

Puoi vedere che il codice è molto semplice. Legge i dati in un DataFrame, seleziona alcune colonne e quindi esegue un count come azione di base sul Dataframe. Tutto funziona fino a questo punto.

Quindi cerca di salvare i dati in Elasticsearch, ma fallisce perché non può gestire alcun tipo. Puoi vedere il registro degli errori qui .

Non sono sicuro del motivo per cui non può gestire quel tipo. Qualcuno sa perché questo sta accadendo?

Sto usando Apache Spark 1.5.0, Elasticsearch 1.4.4 e elaticsearch-hadoop 2.1.1

MODIFICARE:

  • Ho aggiornato il link Gist con un set di dati campione insieme al codice sorgente.
  • Ho anche provato a usare le build di elasticsearch-hadoop dev menzionate da @costin sulla mailing list.

La risposta per questo è stata difficile, ma grazie a samklr , sono riuscito a capire quale fosse il problema.

Tuttavia, la soluzione non è semplice e potrebbe prendere in considerazione alcune trasformazioni “non necessarie”.

Per prima cosa parliamo di serializzazione .

Ci sono due aspetti della serializzazione da considerare nella serializzazione Spark dei dati e nella serializzazione delle funzioni. In questo caso, si tratta della serializzazione dei dati e quindi della deserializzazione.

Dal punto di vista di Spark, l’unica cosa necessaria è l’impostazione della serializzazione: Spark si affida di default alla serializzazione Java, che è comoda ma abbastanza inefficiente. Questo è il motivo per cui Hadoop stesso ha introdotto il proprio meccanismo di serializzazione e i propri tipi, vale a dire Writables . Come tale, InputFormat e OutputFormats sono obbligati a restituire Writables che, OutputFormats , Spark non capisce.

Con il connettore elasticsearch-spark è necessario abilitare una diversa serializzazione (Kryo) che gestisce la conversione automaticamente e lo fa anche in modo abbastanza efficiente.

 conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 

Anche dal momento che Kryo non richiede che una class implementa una particolare interfaccia per essere serializzata, il che significa che i POJO possono essere usati negli RDD senza ulteriori interventi oltre alla serializzazione di Kryo.

Detto questo, @samklr mi ha fatto notare che Kryo ha bisogno di registrare le classi prima di usarle.

Questo perché Kryo scrive un riferimento alla class dell’object che viene serializzato (un riferimento è scritto per ogni object scritto), che è solo un identificatore intero se la class è stata registrata, ma è altrimenti il ​​nome completo della class. Spark registra per te le classi Scala e molte altre classi di framework (come le classi Avro Generic o Thrift).

Registrare le lezioni con Kryo è semplice. Crea una sottoclass di KryoRegistrator e sostituisci il metodo registerClasses() :

 public class MyKryoRegistrator implements KryoRegistrator, Serializable { @Override public void registerClasses(Kryo kryo) { // Product POJO associated to a product Row from the DataFrame kryo.register(Product.class); } } 

Infine, nel tuo programma driver, imposta la proprietà spark.kryo.registrator sul nome class completo della tua implementazione di KryoRegistrator:

 conf.set("spark.kryo.registrator", "MyKryoRegistrator") 

In secondo luogo, anche il serializzatore Kryo è stato impostato e la class registrata, con le modifiche apportate a Spark 1.5, e per qualche motivo Elasticsearch non ha potuto de-serializzare il Dataframe perché non può inferire lo SchemaType del Dataframe nel connettore.

Quindi ho dovuto convertire il Dataframe in un JavaRDD

 JavaRDD products = df.javaRDD().map(new Function() { public Product call(Row row) throws Exception { long id = row.getLong(0); String title = row.getString(1); String description = row.getString(2); int merchantId = row.getInt(3); double price = row.getDecimal(4).doubleValue(); String keywords = row.getString(5); long brandId = row.getLong(6); int categoryId = row.getInt(7); return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId); } }); 

Ora i dati sono pronti per essere scritti in elasticsearch:

 JavaEsSpark.saveToEs(products, "test/test"); 

Riferimenti:

  • Documentazione di supporto Apache Spark di Elasticsearch.
  • Guida definitiva di Hadoop, Capitolo 19. Spark, ed. 4 – Tom White.
  • Utente samklr .