Искра, оптимально разделяющая один RDD на два

10

У меня есть большой набор данных, который мне нужно разделить на группы в соответствии с конкретными параметрами. Я хочу, чтобы работа выполнялась максимально эффективно. Я могу представить два способа сделать это

Вариант 1 - Создать карту из оригинального СДР и отфильтровать

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Вариант 2 - Фильтр оригинальной СДР напрямую

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

Первый метод должен перебирать все записи исходного набора данных 3 раза, тогда как второй должен делать это только дважды, но в обычных условиях спарк делает некоторые закулисные построения графика, так что я могу представить, что они эффективно сделано таким же образом. Мои вопросы: а.) Является ли один метод более эффективным, чем другой, или делает построение искрового графа эквивалентным? Б.) Возможно ли сделать это разделение за один проход?

jagartner
источник
Я также нашел себя с очень похожей проблемой, и не нашел решения. Но что на самом деле происходит, неясно из этого кода, потому что spark имеет «ленивую оценку» и, предположительно, способен выполнять только то, что ему действительно необходимо выполнить, а также комбинировать карты, фильтры и все, что можно сделать вместе. Так что, возможно, то, что вы описываете, может произойти за один проход. Тем не менее, недостаточно знаком с ленивыми механизмами оценки. На самом деле я только что заметил .cache (). Может быть, есть способ сделать только один .cache () и получить полный результат?
user3780968

Ответы:

9

Прежде всего позвольте мне сказать вам, что я не эксперт по Spark; Я использовал его довольно часто в последние несколько месяцев, и я думаю, что теперь я понимаю это, но я могу ошибаться.

Итак, отвечая на ваши вопросы:

а.) они эквивалентны, но не так, как вы это видите; Spark не будет оптимизировать график, если вам интересно, но customMapperон все равно будет выполнен дважды в обоих случаях; это связано с тем, что для искры rdd1и rdd2являются два совершенно разных RDD, и он будет строить график преобразования снизу вверх, начиная с листьев; поэтому вариант 1 будет переводиться на:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Как вы сказали, customMapperвыполняется дважды (более того, также rddInбудет прочитано дважды, что означает, что если оно поступает из базы данных, оно может быть еще медленнее).

б.) есть способ, вам просто нужно двигаться cache()в нужном месте:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

Делая это, мы сообщаем искре, что она может хранить частичные результаты mappedRdd; затем он будет использовать эти частичные результаты как для, так rdd1и для rdd2. С искровой точки зрения это эквивалентно:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
StefanoP
источник