Я предпочитаю Python, а не Scala. Но, поскольку Spark изначально написан на Scala, я ожидал, что мой код будет работать быстрее в Scala, чем версия Python по понятным причинам.
Исходя из этого предположения, я подумал изучить и написать Scala-версию очень распространенного кода предварительной обработки для 1 ГБ данных. Данные взяты из конкурса SpringLeaf на Kaggle . Просто, чтобы дать обзор данных (он содержит 1936 измерений и 145232 строк). Данные состоят из различных типов, например, int, float, string, boolean. Я использую 6 ядер из 8 для обработки Spark; Вот почему я использовал для minPartitions=6
того, чтобы каждое ядро было что-то для обработки.
Скала код
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Код Python
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
Скала Performance Stage 0 (38 минут), Этап 1 (18 секунд)
Стадия исполнения Python 0 (11 минут), стадия 1 (7 секунд)
Оба дают разные графики визуализации DAG (из-за которых на обоих рисунках показаны разные функции этапа 0 для Scala (map
) и Python ( reduceByKey
))
Но, по сути, оба кода пытаются преобразовать данные в (размер_идентификатора, строку списка значений) и сохранить на диск. Выходные данные будут использоваться для вычисления различной статистики для каждого измерения.
С точки зрения производительности, код Scala для таких реальных данных, как этот, кажется, работает в 4 раза медленнее, чем версия Python. Хорошей новостью для меня является то, что это дало мне хорошую мотивацию, чтобы остаться с Python. Плохая новость в том, что я не совсем понял, почему?
источник
Ответы:
Оригинальный ответ, обсуждающий код, можно найти ниже.
Прежде всего, вы должны различать разные типы API, каждый со своими соображениями производительности.
RDD API
(Чистые структуры Python с оркестровкой на основе JVM)
Это компонент, который больше всего будет зависеть от производительности кода Python и деталей реализации PySpark. Хотя производительность Python вряд ли будет проблемой, необходимо учитывать как минимум несколько факторов:
Исполнители на основе процессов (Python) и исполнители на основе потоков (один JVM-несколько потоков) (Scala). Каждый исполнитель Python работает в своем собственном процессе. Как побочный эффект, он обеспечивает более сильную изоляцию, чем его аналог JVM, и некоторый контроль над жизненным циклом исполнителя, но потенциально значительно более высокое использование памяти:
Производительность самого кода Python. Вообще говоря, Scala быстрее, чем Python, но зависит от задачи к задаче. Более того, у вас есть несколько вариантов, включая JIT, такие как Numba , расширения C ( Cython ) или специализированные библиотеки, такие как Theano . Наконец,
если вы не используете ML / MLlib (или просто стек NumPy), рассмотрите возможность использования PyPy в качестве альтернативного интерпретатора. Смотрите SPARK-3094 .spark.python.worker.reuse
опцию, которая может использоваться для выбора между разветвлением процесса Python для каждой задачи и повторным использованием существующего процесса. Последний вариант, по-видимому, полезен, чтобы избежать дорогостоящей сборки мусора (это скорее впечатление, чем результат систематических тестов), тогда как первый (по умолчанию) оптимален для дорогостоящих передач и импорта.MLlib
(смешанное выполнение Python и JVM)
Основные соображения почти такие же, как и раньше, с несколькими дополнительными проблемами. Хотя базовые структуры, используемые с MLlib, являются простыми объектами RDD Python, все алгоритмы выполняются напрямую с использованием Scala.
Это означает дополнительную стоимость преобразования объектов Python в объекты Scala и наоборот, увеличение использования памяти и некоторые дополнительные ограничения, которые мы рассмотрим позже.
На данный момент (Spark 2.x) API на основе RDD находится в режиме обслуживания и планируется удалить в Spark 3.0 .
DataFrame API и Spark ML
(Выполнение JVM с кодом Python ограничено драйвером)
Это, вероятно, лучший выбор для стандартных задач обработки данных. Поскольку код Python в основном ограничивается высокоуровневыми логическими операциями над драйвером, между Python и Scala не должно быть различий в производительности.
Единственным исключением является использование построчных пользовательских функций Python, которые значительно менее эффективны, чем их эквиваленты в Scala. Хотя есть некоторые возможности для улучшений (в Spark 2.0.0 произошли существенные изменения), самым большим ограничением является полный переход между внутренним представлением (JVM) и интерпретатором Python. Если возможно, вы должны предпочесть композицию встроенных выражений ( например . Поведение Python UDF было улучшено в Spark 2.0.0, но оно все еще неоптимально по сравнению с собственным выполнением.
Это
может быть улучшено в будущем,значительно улучшилось с введением векторизованных пользовательских функций (SPARK-21190 и других расширений) , которые используют потоковую передачу по стрелкам для эффективного обмена данными с десериализацией без копирования. Для большинства приложений их вторичные издержки можно просто игнорировать.Также обязательно избегайте ненужной передачи данных между
DataFrames
иRDDs
. Это требует дорогой сериализации и десериализации, не говоря уже о передаче данных в и из интерпретатора Python.Стоит отметить, что у вызовов Py4J довольно высокая задержка. Это включает в себя простые звонки, такие как:
Обычно это не имеет значения (накладные расходы постоянны и не зависят от объема данных), но в случае мягких приложений реального времени вы можете рассмотреть возможность кэширования / повторного использования оболочек Java.
GraphX и Spark DataSets
На данный момент (Spark
Graphx1.62.1) ни один из них не предоставляет PySpark API, так что вы можете сказать, что PySpark бесконечно хуже, чем Scala.На практике разработка GraphX остановилась почти полностью, и проект в настоящее время находится в режиме обслуживания с закрытыми билетами JIRA, которые не будут исправлены . Библиотека GraphFrames предоставляет альтернативную библиотеку обработки графов с привязками Python.
DatasetСубъективно говоря,
Datasets
в Python недостаточно места для статической типизации, и даже если текущая реализация Scala слишком упрощена и не дает таких же преимуществ в производительности, какDataFrame
.Потоковый
Из того, что я видел до сих пор, я настоятельно рекомендую использовать Scala поверх Python. Это может измениться в будущем, если PySpark получит поддержку для структурированных потоков, но сейчас Scala API кажется гораздо более надежным, всеобъемлющим и эффективным. Мой опыт довольно ограничен.
Структурированные потоковые потоки в Spark 2.x, похоже, сокращают разрыв между языками, но пока он все еще находится на ранних этапах. Тем не менее, API, основанный на RDD, уже упоминается как «устаревшая потоковая передача» в Документации блоков данных (дата доступа 2017-03-03), поэтому разумно ожидать дальнейших усилий по объединению.
Неэффективные соображения
Характеристика паритетаНе все функции Spark предоставляются через API PySpark. Обязательно проверьте, реализованы ли нужные вам детали, и постарайтесь понять возможные ограничения.
Это особенно важно при использовании MLlib и аналогичных смешанных контекстов (см. Вызов функции Java / Scala из задачи ). Чтобы быть справедливым, некоторые части PySpark API, например
API дизайнmllib.linalg
, предоставляют более полный набор методов, чем Scala.PySpark API близко отражает своего аналога Scala и поэтому не совсем Pythonic. Это означает, что довольно просто отобразить языки, но в то же время код Python может быть значительно сложнее для понимания.
Сложная архитектураПоток данных PySpark относительно сложен по сравнению с чистым выполнением JVM. Гораздо сложнее рассуждать о программах PySpark или отладке. Более того, по крайней мере, базовое понимание Scala и JVM в целом является обязательным условием.
Spark 2.x и вышеНепрерывный переход к
Dataset
API, с замороженным RDD API, предоставляет как возможности, так и проблемы для пользователей Python. В то время как высокоуровневые части API намного проще раскрыть в Python, более сложные функции практически невозможно использовать напрямую .Более того, нативные функции Python продолжают оставаться гражданами второго сорта в мире SQL. Надеемся, что в будущем это улучшится благодаря сериализации Apache Arrow ( текущие усилия нацелены на данные,
collection
но serde UDF - долгосрочная цель ).Для проектов, сильно зависящих от кодовой базы Python, чистые альтернативы Python (такие как Dask или Ray ) могут быть интересной альтернативой.
Это не должно быть один против другого
API Spark DataFrame (SQL, Dataset) предоставляет элегантный способ интеграции кода Scala / Java в приложение PySpark. Вы можете использовать его
DataFrames
для представления данных собственному коду JVM и считывания результатов. Я объяснил некоторые варианты где-то еще, и вы можете найти рабочий пример использования Python-Scala в разделе Как использовать класс Scala внутри Pyspark .Это может быть дополнительно дополнено введением пользовательских типов (см. Как определить схему для пользовательского типа в Spark SQL? ).
Что не так с кодом, указанным в вопросе
(Отказ от ответственности: точка зрения Pythonista. Скорее всего, я пропустил некоторые трюки Scala)
Прежде всего, в вашем коде есть одна часть, которая вообще не имеет смысла. Если у вас уже есть
(key, value)
пары, созданные с использованиемzipWithIndex
илиenumerate
какой смысл создавать строку, просто чтобы разделить ее сразу после этого?flatMap
не работает рекурсивно, так что вы можете просто выдавать кортежи и пропустить следующие действияmap
.Другая часть, которую я нахожу проблематичной, это
reduceByKey
. Вообще говоря,reduceByKey
полезно, если применение агрегатной функции может уменьшить объем данных, которые необходимо перетасовать. Так как вы просто объединяете строки, здесь нечего получить. Игнорируя низкоуровневые вещи, такие как количество ссылок, объем данных, которые вы должны передать, точно такой же, как и дляgroupByKey
.Обычно я не буду останавливаться на этом, но, насколько я могу судить, это узкое место в вашем коде Scala. Присоединение строк в JVM - довольно дорогая операция (см., Например: Является ли объединение строк в Scala таким же дорогостоящим, как и в Java? ). Это значит , что - то подобное
_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
, что эквивалентноinput4.reduceByKey(valsConcat)
в коде не является хорошей идеей.Если вы хотите избежать,
groupByKey
вы можете попробовать использоватьaggregateByKey
сStringBuilder
. Нечто подобное должно сделать свое дело:но я сомневаюсь, что это стоит всей суеты.
Учитывая вышесказанное, я переписал ваш код следующим образом:
Скала :
Python :
Полученные результаты
В
local[6]
режиме (Intel (R) Xeon (R) CPU E3-1245 V2 @ 3,40 ГГц) с 4 ГБ памяти на исполнителя требуется (n = 3):Я почти уверен, что большую часть этого времени тратится на перемешивание, сериализацию, десериализацию и другие второстепенные задачи. Просто для удовольствия, вот наивный однопоточный код на Python, который выполняет ту же задачу на этом компьютере менее чем за минуту:
источник
Расширение вышеупомянутых ответов -
Scala быстрее во многих отношениях по сравнению с Python, но есть несколько веских причин, почему Python становится все более популярным, чем Scala, давайте рассмотрим их немного -
Python для Apache Spark довольно прост в освоении и использовании. Однако это не единственная причина, по которой Pyspark является лучшим выбором, чем Scala. Есть еще кое-что.
Python API для Spark может быть медленнее в кластере, но, в конце концов, исследователи данных могут сделать с ним гораздо больше по сравнению со Scala. Сложность Скалы отсутствует. Интерфейс прост и понятен.
Говорить о читабельности кода, обслуживании и знакомстве с Python API для Apache Spark гораздо лучше, чем Scala.
Python поставляется с несколькими библиотеками, связанными с машинным обучением и обработкой естественного языка. Это помогает в анализе данных, а также имеет статистику, которая является достаточно зрелой и проверенной временем. Например, numpy, pandas, scikit-learn, seaborn и matplotlib.
Примечание. Большинство исследователей данных используют гибридный подход, в котором используются лучшие из обоих API.
Наконец, сообщество Scala часто оказывается гораздо менее полезным для программистов. Это делает Python очень ценным обучением. Если у вас достаточно опыта работы с любым статически типизированным языком программирования, таким как Java, вы можете перестать беспокоиться о том, чтобы вообще не использовать Scala.
источник