Что ж, давайте сделаем ваш набор данных немного интереснее:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
У нас есть шесть элементов:
rdd.count
Long = 6
нет разделителя:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
и восемь разделов:
rdd.partitions.length
Int = 8
Теперь давайте определим небольшой помощник для подсчета количества элементов на раздел:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Поскольку у нас нет секционера, наш набор данных равномерно распределяется между секциями ( схема секционирования по умолчанию в Spark ):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
Теперь давайте перераспределим наш набор данных:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Поскольку переданный параметр HashPartitioner
определяет количество разделов, мы ожидаем один раздел:
rddOneP.partitions.length
Int = 1
Поскольку у нас только один раздел, он содержит все элементы:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
Обратите внимание, что порядок значений после перемешивания не является детерминированным.
Таким же образом, если мы используем HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
мы получим 2 раздела:
rddTwoP.partitions.length
Int = 2
Поскольку rdd
разделен по ключевым данным, данные больше не будут распределяться равномерно:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Поскольку с тремя ключами и только двумя разными значениями hashCode
мода numPartitions
здесь нет ничего неожиданного:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Просто чтобы подтвердить вышесказанное:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
Наконец, HashPartitioner(7)
мы получаем семь разделов, три непустых по 2 элемента в каждом:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
Резюме и примечания
HashPartitioner
принимает единственный аргумент, который определяет количество разделов
значения присваиваются разделам с помощью hash
ключей. hash
функция может отличаться в зависимости от языка (может использовать Scala RDD hashCode
, DataSets
используйте MurmurHash 3, PySpark, portable_hash
).
В таком простом случае, как этот, где ключ - это небольшое целое число, вы можете предположить, что hash
это идентификатор ( i = hash(i)
).
Scala API использует nonNegativeMod
для определения раздела на основе вычисленного хеша,
если распределение ключей неравномерно, вы можете оказаться в ситуации, когда часть вашего кластера простаивает
ключи должны быть хешируемыми. Вы можете проверить мой ответ для списка A как ключа для функции reduceByKey PySpark, чтобы узнать о конкретных проблемах PySpark. Другая возможная проблема отмечена в документации HashPartitioner :
Массивы Java имеют хэш-коды, которые основаны на идентификаторах массивов, а не на их содержимом, поэтому попытка разбить RDD [Array [ ]] или RDD [(Array [ ], _)] с помощью HashPartitioner приведет к неожиданному или неправильному результату.
В Python 3 вы должны убедиться, что хеширование согласовано. См. Что означает исключение: случайность хэша строки должна быть отключена с помощью PYTHONHASHSEED означает в pyspark?
Разделитель хэша не является ни инъективным, ни сюръективным. Одному разделу можно назначить несколько ключей, а некоторые разделы могут оставаться пустыми.
Обратите внимание, что в настоящее время методы на основе хешей не работают в Scala в сочетании с классами case, определяемыми REPL ( равенство классов Case в Apache Spark ).
HashPartitioner
(или любой другой Partitioner
) перемешивает данные. Если разделение не используется повторно между несколькими операциями, оно не уменьшает объем данных, которые необходимо перемешать.
(1, None)
сhash(2) % P
где P является разделом. Не должно бытьhash(1) % P
?partitionBy
в rdd нет API. в dataframe.write есть partitionBy, но он не принимает Partitioner в качестве аргумента.RDD
распространяется, это означает, что он разделен на некоторое количество частей. Каждый из этих разделов потенциально находится на разных машинах. Разделитель хеша с аргументомnumPartitions
выбирает, на каком разделе разместить пару(key, value)
следующим образом:numPartitions
перегородки.(key, value)
в разделе с номеромHash(key) % numPartitions
источник
HashPartitioner.getPartition
Метод принимает ключ в качестве аргумента и возвращает индекс раздела , который ключ принадлежит. Разделитель должен знать действительные индексы, поэтому он возвращает числа в правильном диапазоне. Количество разделов указывается вnumPartitions
аргументе конструктора.Реализация возвращается примерно
key.hashCode() % numPartitions
. См. Partitioner.scala для более подробной информации.источник