Спарк производительности для Scala против Python

178

Я предпочитаю Python, а не Scala. Но, поскольку Spark изначально написан на Scala, я ожидал, что мой код будет работать быстрее в Scala, чем версия Python по понятным причинам.

Исходя из этого предположения, я подумал изучить и написать Scala-версию очень распространенного кода предварительной обработки для 1 ГБ данных. Данные взяты из конкурса SpringLeaf на Kaggle . Просто, чтобы дать обзор данных (он содержит 1936 измерений и 145232 строк). Данные состоят из различных типов, например, int, float, string, boolean. Я использую 6 ядер из 8 для обработки Spark; Вот почему я использовал для minPartitions=6того, чтобы каждое ядро ​​было что-то для обработки.

Скала код

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Код Python

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Скала Performance Stage 0 (38 минут), Этап 1 (18 секунд) введите описание изображения здесь

Стадия исполнения Python 0 (11 минут), стадия 1 (7 секунд) введите описание изображения здесь

Оба дают разные графики визуализации DAG (из-за которых на обоих рисунках показаны разные функции этапа 0 для Scala (map ) и Python ( reduceByKey))

Но, по сути, оба кода пытаются преобразовать данные в (размер_идентификатора, строку списка значений) и сохранить на диск. Выходные данные будут использоваться для вычисления различной статистики для каждого измерения.

С точки зрения производительности, код Scala для таких реальных данных, как этот, кажется, работает в 4 раза медленнее, чем версия Python. Хорошей новостью для меня является то, что это дало мне хорошую мотивацию, чтобы остаться с Python. Плохая новость в том, что я не совсем понял, почему?

Mrityunjay
источник
8
Возможно, это зависит от кода и приложения, так как я получаю другой результат, что Apache Spark Python медленнее, чем Scala, при суммировании миллиарда членов формулы Лейбница для π
Пол
3
Интересный вопрос! Кстати, посмотрите также здесь: emptypipes.org/2015/01/17/python-vs-scala-vs-spark Чем больше у вас ядер, тем меньше вы видите различия между языками.
Маркон
Рассматривали ли вы принять существующий ответ?
10465355 говорит Восстановить Монику

Ответы:

358

Оригинальный ответ, обсуждающий код, можно найти ниже.


Прежде всего, вы должны различать разные типы API, каждый со своими соображениями производительности.

RDD API

(Чистые структуры Python с оркестровкой на основе JVM)

Это компонент, который больше всего будет зависеть от производительности кода Python и деталей реализации PySpark. Хотя производительность Python вряд ли будет проблемой, необходимо учитывать как минимум несколько факторов:

  • Накладные расходы на коммуникацию JVM. Практически все данные, поступающие к исполнителю Python и от него, должны передаваться через сокет и JVM-работника. Хотя это относительно эффективное местное общение, оно все же не является бесплатным.
  • Исполнители на основе процессов (Python) и исполнители на основе потоков (один JVM-несколько потоков) (Scala). Каждый исполнитель Python работает в своем собственном процессе. Как побочный эффект, он обеспечивает более сильную изоляцию, чем его аналог JVM, и некоторый контроль над жизненным циклом исполнителя, но потенциально значительно более высокое использование памяти:

    • след памяти переводчика
    • след загруженных библиотек
    • менее эффективное вещание (каждому процессу требуется собственная копия вещания)
  • Производительность самого кода Python. Вообще говоря, Scala быстрее, чем Python, но зависит от задачи к задаче. Более того, у вас есть несколько вариантов, включая JIT, такие как Numba , расширения C ( Cython ) или специализированные библиотеки, такие как Theano . Наконец, если вы не используете ML / MLlib (или просто стек NumPy) , рассмотрите возможность использования PyPy в качестве альтернативного интерпретатора. Смотрите SPARK-3094 .

  • Конфигурация PySpark обеспечивает spark.python.worker.reuse опцию, которая может использоваться для выбора между разветвлением процесса Python для каждой задачи и повторным использованием существующего процесса. Последний вариант, по-видимому, полезен, чтобы избежать дорогостоящей сборки мусора (это скорее впечатление, чем результат систематических тестов), тогда как первый (по умолчанию) оптимален для дорогостоящих передач и импорта.
  • Подсчет ссылок, используемый в CPython как метод сбора мусора в первой строке, довольно хорошо работает с типичными рабочими нагрузками Spark (потоковая обработка, без циклов ссылок) и снижает риск длительных пауз GC.

MLlib

(смешанное выполнение Python и JVM)

Основные соображения почти такие же, как и раньше, с несколькими дополнительными проблемами. Хотя базовые структуры, используемые с MLlib, являются простыми объектами RDD Python, все алгоритмы выполняются напрямую с использованием Scala.

Это означает дополнительную стоимость преобразования объектов Python в объекты Scala и наоборот, увеличение использования памяти и некоторые дополнительные ограничения, которые мы рассмотрим позже.

На данный момент (Spark 2.x) API на основе RDD находится в режиме обслуживания и планируется удалить в Spark 3.0 .

DataFrame API и Spark ML

(Выполнение JVM с кодом Python ограничено драйвером)

Это, вероятно, лучший выбор для стандартных задач обработки данных. Поскольку код Python в основном ограничивается высокоуровневыми логическими операциями над драйвером, между Python и Scala не должно быть различий в производительности.

Единственным исключением является использование построчных пользовательских функций Python, которые значительно менее эффективны, чем их эквиваленты в Scala. Хотя есть некоторые возможности для улучшений (в Spark 2.0.0 произошли существенные изменения), самым большим ограничением является полный переход между внутренним представлением (JVM) и интерпретатором Python. Если возможно, вы должны предпочесть композицию встроенных выражений ( например . Поведение Python UDF было улучшено в Spark 2.0.0, но оно все еще неоптимально по сравнению с собственным выполнением.

Это может быть улучшено в будущем, значительно улучшилось с введением векторизованных пользовательских функций (SPARK-21190 и других расширений) , которые используют потоковую передачу по стрелкам для эффективного обмена данными с десериализацией без копирования. Для большинства приложений их вторичные издержки можно просто игнорировать.

Также обязательно избегайте ненужной передачи данных между DataFramesи RDDs. Это требует дорогой сериализации и десериализации, не говоря уже о передаче данных в и из интерпретатора Python.

Стоит отметить, что у вызовов Py4J довольно высокая задержка. Это включает в себя простые звонки, такие как:

from pyspark.sql.functions import col

col("foo")

Обычно это не имеет значения (накладные расходы постоянны и не зависят от объема данных), но в случае мягких приложений реального времени вы можете рассмотреть возможность кэширования / повторного использования оболочек Java.

GraphX ​​и Spark DataSets

На данный момент (Spark 1.6 2.1) ни один из них не предоставляет PySpark API, так что вы можете сказать, что PySpark бесконечно хуже, чем Scala.

Graphx

На практике разработка GraphX ​​остановилась почти полностью, и проект в настоящее время находится в режиме обслуживания с закрытыми билетами JIRA, которые не будут исправлены . Библиотека GraphFrames предоставляет альтернативную библиотеку обработки графов с привязками Python.

Dataset

Субъективно говоря, Datasetsв Python недостаточно места для статической типизации, и даже если текущая реализация Scala слишком упрощена и не дает таких же преимуществ в производительности, как DataFrame.

Потоковый

Из того, что я видел до сих пор, я настоятельно рекомендую использовать Scala поверх Python. Это может измениться в будущем, если PySpark получит поддержку для структурированных потоков, но сейчас Scala API кажется гораздо более надежным, всеобъемлющим и эффективным. Мой опыт довольно ограничен.

Структурированные потоковые потоки в Spark 2.x, похоже, сокращают разрыв между языками, но пока он все еще находится на ранних этапах. Тем не менее, API, основанный на RDD, уже упоминается как «устаревшая потоковая передача» в Документации блоков данных (дата доступа 2017-03-03), поэтому разумно ожидать дальнейших усилий по объединению.

Неэффективные соображения

Характеристика паритета

Не все функции Spark предоставляются через API PySpark. Обязательно проверьте, реализованы ли нужные вам детали, и постарайтесь понять возможные ограничения.

Это особенно важно при использовании MLlib и аналогичных смешанных контекстов (см. Вызов функции Java / Scala из задачи ). Чтобы быть справедливым, некоторые части PySpark API, например mllib.linalg, предоставляют более полный набор методов, чем Scala.

API дизайн

PySpark API близко отражает своего аналога Scala и поэтому не совсем Pythonic. Это означает, что довольно просто отобразить языки, но в то же время код Python может быть значительно сложнее для понимания.

Сложная архитектура

Поток данных PySpark относительно сложен по сравнению с чистым выполнением JVM. Гораздо сложнее рассуждать о программах PySpark или отладке. Более того, по крайней мере, базовое понимание Scala и JVM в целом является обязательным условием.

Spark 2.x и выше

Непрерывный переход к DatasetAPI, с замороженным RDD API, предоставляет как возможности, так и проблемы для пользователей Python. В то время как высокоуровневые части API намного проще раскрыть в Python, более сложные функции практически невозможно использовать напрямую .

Более того, нативные функции Python продолжают оставаться гражданами второго сорта в мире SQL. Надеемся, что в будущем это улучшится благодаря сериализации Apache Arrow ( текущие усилия нацелены на данные,collection но serde UDF - долгосрочная цель ).

Для проектов, сильно зависящих от кодовой базы Python, чистые альтернативы Python (такие как Dask или Ray ) могут быть интересной альтернативой.

Это не должно быть один против другого

API Spark DataFrame (SQL, Dataset) предоставляет элегантный способ интеграции кода Scala / Java в приложение PySpark. Вы можете использовать его DataFramesдля представления данных собственному коду JVM и считывания результатов. Я объяснил некоторые варианты где-то еще, и вы можете найти рабочий пример использования Python-Scala в разделе Как использовать класс Scala внутри Pyspark .

Это может быть дополнительно дополнено введением пользовательских типов (см. Как определить схему для пользовательского типа в Spark SQL? ).


Что не так с кодом, указанным в вопросе

(Отказ от ответственности: точка зрения Pythonista. Скорее всего, я пропустил некоторые трюки Scala)

Прежде всего, в вашем коде есть одна часть, которая вообще не имеет смысла. Если у вас уже есть (key, value)пары, созданные с использованием zipWithIndexили enumerateкакой смысл создавать строку, просто чтобы разделить ее сразу после этого? flatMapне работает рекурсивно, так что вы можете просто выдавать кортежи и пропустить следующие действия map.

Другая часть, которую я нахожу проблематичной, это reduceByKey. Вообще говоря, reduceByKeyполезно, если применение агрегатной функции может уменьшить объем данных, которые необходимо перетасовать. Так как вы просто объединяете строки, здесь нечего получить. Игнорируя низкоуровневые вещи, такие как количество ссылок, объем данных, которые вы должны передать, точно такой же, как и для groupByKey.

Обычно я не буду останавливаться на этом, но, насколько я могу судить, это узкое место в вашем коде Scala. Присоединение строк в JVM - довольно дорогая операция (см., Например: Является ли объединение строк в Scala таким же дорогостоящим, как и в Java? ). Это значит , что - то подобное _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) , что эквивалентно input4.reduceByKey(valsConcat)в коде не является хорошей идеей.

Если вы хотите избежать, groupByKeyвы можете попробовать использовать aggregateByKeyс StringBuilder. Нечто подобное должно сделать свое дело:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

но я сомневаюсь, что это стоит всей суеты.

Учитывая вышесказанное, я переписал ваш код следующим образом:

Скала :

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python :

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

Полученные результаты

В local[6]режиме (Intel (R) Xeon (R) CPU E3-1245 V2 @ 3,40 ГГц) с 4 ГБ памяти на исполнителя требуется (n = 3):

  • Scala - среднее значение: 250,00 с, стандартное значение 12,49
  • Python - среднее значение: 246,66 с, стандартное равенство: 1,15

Я почти уверен, что большую часть этого времени тратится на перемешивание, сериализацию, десериализацию и другие второстепенные задачи. Просто для удовольствия, вот наивный однопоточный код на Python, который выполняет ту же задачу на этом компьютере менее чем за минуту:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])
zero323
источник
23
Один из самых ясных, исчерпывающих и полезных ответов, с которыми я столкнулся некоторое время. Спасибо!
etov
Какой ты великолепный парень!
DennisLi
-4

Расширение вышеупомянутых ответов -

Scala быстрее во многих отношениях по сравнению с Python, но есть несколько веских причин, почему Python становится все более популярным, чем Scala, давайте рассмотрим их немного -

Python для Apache Spark довольно прост в освоении и использовании. Однако это не единственная причина, по которой Pyspark является лучшим выбором, чем Scala. Есть еще кое-что.

Python API для Spark может быть медленнее в кластере, но, в конце концов, исследователи данных могут сделать с ним гораздо больше по сравнению со Scala. Сложность Скалы отсутствует. Интерфейс прост и понятен.

Говорить о читабельности кода, обслуживании и знакомстве с Python API для Apache Spark гораздо лучше, чем Scala.

Python поставляется с несколькими библиотеками, связанными с машинным обучением и обработкой естественного языка. Это помогает в анализе данных, а также имеет статистику, которая является достаточно зрелой и проверенной временем. Например, numpy, pandas, scikit-learn, seaborn и matplotlib.

Примечание. Большинство исследователей данных используют гибридный подход, в котором используются лучшие из обоих API.

Наконец, сообщество Scala часто оказывается гораздо менее полезным для программистов. Это делает Python очень ценным обучением. Если у вас достаточно опыта работы с любым статически типизированным языком программирования, таким как Java, вы можете перестать беспокоиться о том, чтобы вообще не использовать Scala.


источник