Как работает HashPartitioner?

82

Я прочитал документацию по HashPartitioner. К сожалению, ничего не было объяснено, кроме вызовов API. Я исхожу из предположения, что HashPartitionerраспределенный набор разбивается на разделы на основе хэша ключей. Например, если мои данные похожи на

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

Таким образом, разделитель поместит это в разные разделы с одинаковыми ключами, попадающими в один и тот же раздел. Однако я не понимаю значения аргумента конструктора

new HashPartitoner(numPartitions) //What does numPartitions do?

Для приведенного выше набора данных, как бы изменились результаты, если бы я сделал

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

Так как же на HashPartitionerсамом деле работает?

Сохаиб
источник

Ответы:

162

Что ж, давайте сделаем ваш набор данных немного интереснее:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

У нас есть шесть элементов:

rdd.count
Long = 6

нет разделителя:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

и восемь разделов:

rdd.partitions.length
Int = 8

Теперь давайте определим небольшой помощник для подсчета количества элементов на раздел:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

Поскольку у нас нет секционера, наш набор данных равномерно распределяется между секциями ( схема секционирования по умолчанию в Spark ):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

начальное распределение

Теперь давайте перераспределим наш набор данных:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Поскольку переданный параметр HashPartitionerопределяет количество разделов, мы ожидаем один раздел:

rddOneP.partitions.length
Int = 1

Поскольку у нас только один раздел, он содержит все элементы:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

хеш-разделитель-1

Обратите внимание, что порядок значений после перемешивания не является детерминированным.

Таким же образом, если мы используем HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

мы получим 2 раздела:

rddTwoP.partitions.length
Int = 2

Поскольку rddразделен по ключевым данным, данные больше не будут распределяться равномерно:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

Поскольку с тремя ключами и только двумя разными значениями hashCodeмода numPartitionsздесь нет ничего неожиданного:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Просто чтобы подтвердить вышесказанное:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

хеш-разделитель-2

Наконец, HashPartitioner(7)мы получаем семь разделов, три непустых по 2 элемента в каждом:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

хеш-разделитель-7

Резюме и примечания

  • HashPartitioner принимает единственный аргумент, который определяет количество разделов
  • значения присваиваются разделам с помощью hashключей. hashфункция может отличаться в зависимости от языка (может использовать Scala RDD hashCode, DataSetsиспользуйте MurmurHash 3, PySpark, portable_hash).

    В таком простом случае, как этот, где ключ - это небольшое целое число, вы можете предположить, что hashэто идентификатор ( i = hash(i)).

    Scala API использует nonNegativeModдля определения раздела на основе вычисленного хеша,

  • если распределение ключей неравномерно, вы можете оказаться в ситуации, когда часть вашего кластера простаивает

  • ключи должны быть хешируемыми. Вы можете проверить мой ответ для списка A как ключа для функции reduceByKey PySpark, чтобы узнать о конкретных проблемах PySpark. Другая возможная проблема отмечена в документации HashPartitioner :

    Массивы Java имеют хэш-коды, которые основаны на идентификаторах массивов, а не на их содержимом, поэтому попытка разбить RDD [Array [ ]] или RDD [(Array [ ], _)] с помощью HashPartitioner приведет к неожиданному или неправильному результату.

  • В Python 3 вы должны убедиться, что хеширование согласовано. См. Что означает исключение: случайность хэша строки должна быть отключена с помощью PYTHONHASHSEED означает в pyspark?

  • Разделитель хэша не является ни инъективным, ни сюръективным. Одному разделу можно назначить несколько ключей, а некоторые разделы могут оставаться пустыми.

  • Обратите внимание, что в настоящее время методы на основе хешей не работают в Scala в сочетании с классами case, определяемыми REPL ( равенство классов Case в Apache Spark ).

  • HashPartitioner(или любой другой Partitioner) перемешивает данные. Если разделение не используется повторно между несколькими операциями, оно не уменьшает объем данных, которые необходимо перемешать.

ноль323
источник
Отличное написание, спасибо. Тем не менее, я заметил в изображениях , которые вы имеете (1, None)с hash(2) % Pгде P является разделом. Не должно быть hash(1) % P?
javamonkey79
Я использую Spark 2.2, а partitionByв rdd нет API. в dataframe.write есть partitionBy, но он не принимает Partitioner в качестве аргумента.
hakunami
хороший ответ ... перемешивание основано на партионе, хороший партионатор может уменьшить количество перемешанных данных.
Леон
1
Отличный ответ. У меня вопрос, на который я не могу дать однозначного ответа в Интернете. Когда мы используем df.repartition (n), то есть когда мы не указываем какой-либо столбец в качестве ключа, тогда как хеш-функция работает внутри, так как нечего хешировать?
dsk
@dsk, если вы не укажете ключ, я считаю, что при перераспределении используется RoundRobinPartitioning. Некоторое обсуждение здесь .
Майк Соудер,
6

RDDраспространяется, это означает, что он разделен на некоторое количество частей. Каждый из этих разделов потенциально находится на разных машинах. Разделитель хеша с аргументом numPartitionsвыбирает, на каком разделе разместить пару (key, value)следующим образом:

  1. Создает именно numPartitionsперегородки.
  2. Места (key, value)в разделе с номеромHash(key) % numPartitions
Abalcerek
источник
3

HashPartitioner.getPartitionМетод принимает ключ в качестве аргумента и возвращает индекс раздела , который ключ принадлежит. Разделитель должен знать действительные индексы, поэтому он возвращает числа в правильном диапазоне. Количество разделов указывается в numPartitionsаргументе конструктора.

Реализация возвращается примерно key.hashCode() % numPartitions. См. Partitioner.scala для более подробной информации.

Даниэль Дарабос
источник