Come funziona HashPartitioner?

Ho letto sulla documentazione di HashPartitioner . Sfortunatamente non è stato spiegato molto, tranne per le chiamate API. Sono sotto il presupposto che HashPartitioner partiziona il set distribuito in base all’hash delle chiavi. Ad esempio se i miei dati sono simili

 (1,1), (1,2), (1,3), (2,1), (2,2), (2,3) 

Quindi il partizionatore lo metterebbe in diverse partizioni con le stesse chiavi che cadono nella stessa partizione. Tuttavia non capisco il significato dell’argomento costruttore

 new HashPartitoner(numPartitions) //What does numPartitions do? 

Per il set di dati sopra come i risultati sarebbero diversi se lo facessi

 new HashPartitoner(1) new HashPartitoner(2) new HashPartitoner(10) 

Quindi come funziona HashPartitioner ?

Bene, rendiamo il tuo set di dati marginalmente più interessante:

 val rdd = sc.parallelize(for { x <- 1 to 3 y <- 1 to 2 } yield (x, None), 8) 

Abbiamo sei elementi:

 rdd.count 
 Long = 6 

nessun partizionatore:

 rdd.partitioner 
 Option[org.apache.spark.Partitioner] = None 

e otto partizioni:

 rdd.partitions.length 
 Int = 8 

Ora consente di definire un piccolo helper per contare il numero di elementi per partizione:

 import org.apache.spark.rdd.RDD def countByPartition(rdd: RDD[(Int, None.type)]) = { rdd.mapPartitions(iter => Iterator(iter.length)) } 

Poiché non abbiamo il partizionatore, il nostro set di dati viene distribuito in modo uniforms tra le partizioni ( schema di partizionamento predefinito in Spark ):

 countByPartition(rdd).collect() 
 Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1) 

inital distribuzione

Ora lascia la ripartizione del nostro set di dati:

 import org.apache.spark.HashPartitioner val rddOneP = rdd.partitionBy(new HashPartitioner(1)) 

Poiché il parametro passato a HashPartitioner definisce il numero di partizioni ci si aspetta una partizione:

 rddOneP.partitions.length 
 Int = 1 

Dato che abbiamo una sola partizione contiene tutti gli elementi:

 countByPartition(rddOneP).collect 
 Array[Int] = Array(6) 

hash-partizionamento-1

Nota che l'ordine dei valori dopo lo shuffle non è deterministico.

Stesso modo se usiamo HashPartitioner(2)

 val rddTwoP = rdd.partitionBy(new HashPartitioner(2)) 

otterremo 2 partizioni:

 rddTwoP.partitions.length 
 Int = 2 

Dato che rdd è partizionato da dati chiave, non sarà più distribuito uniformsmente:

 countByPartition(rddTwoP).collect() 
 Array[Int] = Array(2, 4) 

Perché con tre chiavi e solo due diversi valori di hashCode mod numPartitions non c'è nulla di inaspettato qui:

 (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2)) 
 scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1)) 

Solo per confermare quanto sopra:

 rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect() 
 Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3)) 

hash-partizionamento-2

Infine con HashPartitioner(7) otteniamo sette partizioni, tre non vuote con 2 elementi ciascuna:

 val rddSevenP = rdd.partitionBy(new HashPartitioner(7)) rddSevenP.partitions.length 
 Int = 7 
 countByPartition(rddTenP).collect() 
 Array[Int] = Array(0, 2, 2, 2, 0, 0, 0) 

hash-partizionamento-7

Riepilogo e note

  • HashPartitioner accetta un singolo argomento che definisce il numero di partizioni
  • i valori sono assegnati alle partizioni usando l' hash delle chiavi. hash funzione di hash può variare a seconda della lingua (Scala RDD può usare hashCode , DataSets usa MurmurHash 3, PySpark, portable_hash ).

    In un caso semplice come questo, dove key è un numero intero piccolo, puoi assumere che l' hash sia un'identity framework ( i = hash(i) ).

    L'API Scala usa nonNegativeMod per determinare la partizione basata nonNegativeMod calcolato,

  • se la distribuzione delle chiavi non è uniforms, si può finire in situazioni in cui parte del cluster è intriggers

  • le chiavi devono essere lavabili. Puoi controllare la mia risposta per un elenco come chiave per ridurreByKey di PySpark per leggere i problemi specifici di PySpark. Un altro ansible problema è evidenziato dalla documentazione di HashPartitioner :

    Gli array Java hanno hashCode basati sulle id quadro degli array piuttosto che sul loro contenuto, quindi il tentativo di partizionare un RDD [Array [ ]] o RDD [(Array [ ], _)] usando un HashPartitioner produrrà un risultato inaspettato o errato.

  • In Python 3 devi assicurarti che l'hashing sia coerente. Vedi cosa fa eccezione: la casualità dell'hash di stringa dovrebbe essere disabilitata tramite PYTHONHASHSEED in pyspark?

  • Il partizionatore di hash non è né iniettivo né suriettivo. È ansible assegnare più chiavi a una singola partizione e alcune partizioni possono rimanere vuote.

  • Si noti che attualmente i metodi basati su hash non funzionano in Scala se combinati con classi di casi definite da REPL ( uguaglianza della class Case in Apache Spark ).

  • HashPartitioner (o qualsiasi altro Partitioner ) mischia i dati. A meno che il partizionamento non venga riutilizzato tra più operazioni, non riduce la quantità di dati da mescolare.

RDD è distribuito questo significa che è diviso su un certo numero di parti. Ciascuna di queste partizioni è potenzialmente su una macchina diversa. Il partizionatore di hash con arument numPartitions sceglie quale partizione posizionare la coppia (key, value) nel modo seguente:

  1. Crea partizioni esattamente numPartitions .
  2. Luoghi (key, value) nella partizione con il numero Hash(key) % numPartitions

Il metodo HashPartitioner.getPartition accetta una chiave come argomento e restituisce l’ indice della partizione a cui appartiene la chiave. Il partizionatore deve sapere quali sono gli indici validi, quindi restituisce numeri nell’intervallo corretto. Il numero di partizioni è specificato tramite l’argomento del costruttore di numPartitions .

L’implementazione restituisce circa key.hashCode() % numPartitions . Vedi Partitioner.scala per maggiori dettagli.