У меня есть большой набор данных, который мне нужно разделить на группы в соответствии с конкретными параметрами. Я хочу, чтобы работа выполнялась максимально эффективно. Я могу представить два способа сделать это
Вариант 1 - Создать карту из оригинального СДР и отфильтровать
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
Вариант 2 - Фильтр оригинальной СДР напрямую
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
Первый метод должен перебирать все записи исходного набора данных 3 раза, тогда как второй должен делать это только дважды, но в обычных условиях спарк делает некоторые закулисные построения графика, так что я могу представить, что они эффективно сделано таким же образом. Мои вопросы: а.) Является ли один метод более эффективным, чем другой, или делает построение искрового графа эквивалентным? Б.) Возможно ли сделать это разделение за один проход?
источник
Ответы:
Прежде всего позвольте мне сказать вам, что я не эксперт по Spark; Я использовал его довольно часто в последние несколько месяцев, и я думаю, что теперь я понимаю это, но я могу ошибаться.
Итак, отвечая на ваши вопросы:
а.) они эквивалентны, но не так, как вы это видите; Spark не будет оптимизировать график, если вам интересно, но
customMapper
он все равно будет выполнен дважды в обоих случаях; это связано с тем, что для искрыrdd1
иrdd2
являются два совершенно разных RDD, и он будет строить график преобразования снизу вверх, начиная с листьев; поэтому вариант 1 будет переводиться на:Как вы сказали,
customMapper
выполняется дважды (более того, такжеrddIn
будет прочитано дважды, что означает, что если оно поступает из базы данных, оно может быть еще медленнее).б.) есть способ, вам просто нужно двигаться
cache()
в нужном месте:Делая это, мы сообщаем искре, что она может хранить частичные результаты
mappedRdd
; затем он будет использовать эти частичные результаты как для, такrdd1
и дляrdd2
. С искровой точки зрения это эквивалентно:источник