Funzionalità Spark vs prestazioni UDF?

Spark ora offre funzioni predefinite che possono essere utilizzate nei dataframes e sembra che siano altamente ottimizzate. La mia domanda iniziale stava per essere più veloce, ma ho fatto alcuni test e ho trovato che le funzioni spark erano circa 10 volte più veloci almeno in un’istanza. Qualcuno sa perché è così, e quando sarebbe un udf più veloce (solo per le istanze che esiste una funzione scintilla identica)?

Ecco il mio codice di test (eseguito su Databricks community ed):

# UDF vs Spark function from faker import Factory from pyspark.sql.functions import lit, concat fake = Factory.create() fake.seed(4321) # Each entry consists of last_name, first_name, ssn, job, and age (at least 1) from pyspark.sql import Row def fake_entry(): name = fake.name().split() return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1) # Create a helper function to call a function repeatedly def repeat(times, func, *args, **kwargs): for _ in xrange(times): yield func(*args, **kwargs) data = list(repeat(500000, fake_entry)) print len(data) data[0] dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age')) dataDF.cache() 

Funzione UDF:

 concat_s = udf(lambda s: s+ 's') udfData = dataDF.select(concat_s(dataDF.first_name).alias('name')) udfData.count() 

Spark Function:

 spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name')) spfData.count() 

Effettuato entrambe le volte, l’udf in genere ha impiegato circa 1,1 – 1,4 s, e la funzione spark concat ha sempre richiesto meno di 0,15 s.

quando sarebbe un udf essere più veloce

Se chiedi di Python UDF la risposta è probabilmente mai. Poiché le funzioni SQL sono relativamente semplici e non sono progettate per attività complesse, è praticamente imansible compensare il costo di ripetute serializzazioni, deserializzazione e spostamento di dati tra l’interprete Python e JVM.

Qualcuno sa perché è così

Le ragioni principali sono già elencate sopra e possono essere ridotte a un semplice fatto che Spark DataFrame è nativamente una struttura JVM e metodi di accesso standard sono implementati da semplici chiamate all’API Java. Le UDF dall’altra parte sono implementate in Python e richiedono lo spostamento dei dati avanti e indietro.

Mentre PySpark in generale richiede movimenti di dati tra JVM e Python, in caso di API RDD di basso livello in genere non richiede costose attività serde. Spark SQL aggiunge costi aggiuntivi di serializzazione e serializzazione, nonché il costo di spostamento dei dati da e verso la rappresentazione non sicura su JVM. La successiva è specifica per tutte le UDF (Python, Scala e Java), ma la prima è specifica per le lingue non native.

A differenza delle UDF, le funzioni di Spark SQL operano direttamente su JVM e generalmente sono ben integrate con Catalyst e Tungsten. Significa che questi possono essere ottimizzati nel piano di esecuzione e la maggior parte del tempo può trarre vantaggio da codgen e da altre ottimizzazioni del tungsteno. Inoltre questi possono operare su dati nella sua rappresentazione “nativa”.

Quindi in un certo senso il problema qui è che Python UDF deve portare i dati al codice mentre le espressioni SQL vanno diversamente.

Dal 30 ottobre 2017, Spark ha appena introdotto i pdf vettorizzati per pyspark.

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

Il motivo per cui Python UDF è lento, è probabilmente il PySpark UDF non è implementato in un modo ottimizzato:

Secondo il paragrafo dal link.

Spark ha aggiunto un’API Python nella versione 0.7, con supporto per funzioni definite dall’utente. Queste funzioni definite dall’utente operano una riga alla volta e pertanto presentano un sovraccarico elevato di serializzazione e invocazione.

Tuttavia gli udfs appena vettorizzati sembrano migliorare molto le prestazioni:

vanno da 3x a oltre 100x.

inserisci la descrizione dell'immagine qui