Spark: Почему Python значительно превосходит Scala в моем случае использования?

16

Чтобы сравнить производительность Spark при использовании Python и Scala, я создал одно и то же задание на обоих языках и сравнил время выполнения. Я ожидал, что обе работы займут примерно одинаковое количество времени, но работа с Python заняла только одно, а работа с 27minScala - 37minпочти на 40% дольше! Я реализовал ту же работу в Java, и это 37minutesтоже заняло . Как это возможно, что Python намного быстрее?

Минимальный проверяемый пример:

Работа на Python:

# Configuration
conf = pyspark.SparkConf()
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
conf.set("spark.executor.instances", "4")
conf.set("spark.executor.cores", "8")
sc = pyspark.SparkContext(conf=conf)

# 960 Files from a public dataset in 2 batches
input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

# Count occurances of a certain string
logData = sc.textFile(input_files)
logData2 = sc.textFile(input_files2)
a = logData.filter(lambda value: value.startswith('WARC-Type: response')).count()
b = logData2.filter(lambda value: value.startswith('WARC-Type: response')).count()

print(a, b)

Скала работа:

// Configuration
config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

// 960 Files from a public dataset in 2 batches 
val input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
val input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

// Count occurances of a certain string
val logData1 = sc.textFile(input_files)
val logData2 = sc.textFile(input_files2)
val num1 = logData1.filter(line => line.startsWith("WARC-Type: response")).count()
val num2 = logData2.filter(line => line.startsWith("WARC-Type: response")).count()

println(s"Lines with a: $num1, Lines with b: $num2")

Просто глядя на код, они кажутся идентичными. Я посмотрел на группы DAG, и они не предоставили никакой информации (или, по крайней мере, у меня нет ноу-хау, чтобы придумать объяснения, основанные на них).

Я был бы очень признателен за любые указатели.

maestromusica
источник
Комментарии не для расширенного обсуждения; этот разговор был перенесен в чат .
Самуэль Лью
1
Я бы начал анализировать, прежде чем что-то спрашивать, синхронизируя соответствующие блоки и операторы, чтобы увидеть, было ли конкретное место, где версия на python быстрее. Тогда вы могли бы уточнить вопрос «почему этот оператор python быстрее».
Терри Ян Риди

Ответы:

11

Ваше базовое предположение, что Scala или Java должны быть быстрее для этой конкретной задачи, просто неверно. Вы можете легко проверить это с минимальными локальными приложениями. Скала первая:

import scala.io.Source
import java.time.{Duration, Instant}

object App {
  def main(args: Array[String]) {
    val Array(filename, string) = args

    val start = Instant.now()

    Source
      .fromFile(filename)
      .getLines
      .filter(line => line.startsWith(string))
      .length

    val stop = Instant.now()
    val duration = Duration.between(start, stop).toMillis
    println(s"${start},${stop},${duration}")
  }
}

Питон один

import datetime
import sys

if __name__ == "__main__":
    _, filename, string = sys.argv
    start = datetime.datetime.now()
    with open(filename) as fr:
        # Not idiomatic or the most efficient but that's what
        # PySpark will use
        sum(1 for _ in filter(lambda line: line.startswith(string), fr))

    end = datetime.datetime.now()
    duration = round((end - start).total_seconds() * 1000)
    print(f"{start},{end},{duration}")

Результаты (по 300 повторений в каждом, Python 3.7.6, Scala 2.11.12), Posts.xmlполученные из дампа данных hermeneutics.stackexchange.com со смесью совпадающих и не совпадающих шаблонов:

боксы дурарции в миллис для вышеуказанных программ

  • Python 273.50 (258.84, 288.16)
  • Scala 634,13 (533,81, 734,45)

Как видите, Python не только систематически быстрее, но и более последовательный (более низкий спред).

Уберите сообщение - не верьте необоснованным FUD - языки могут быть быстрее или медленнее в определенных задачах или в определенных средах (например, здесь Scala может быть поражен запуском JVM и / или GC и / или JIT), но если вы заявляете например, «XYZ на X4 быстрее» или «XYZ медленнее, чем ZYX (..) Приблизительно в 10 раз медленнее», это обычно означает, что кто-то написал действительно плохой код для тестирования.

Редактировать :

Чтобы решить некоторые проблемы, поднятые в комментариях:

  • В коде OP данные передаются в основном в одном направлении (JVM -> Python), и никакой реальной сериализации не требуется (этот конкретный путь просто проходит тестовую строку как есть и декодируется в UTF-8 на другой стороне). Это так же дешево, как и когда дело доходит до «сериализации».
  • То, что передается обратно, представляет собой одно целое число на раздел, поэтому влияние в этом направлении незначительно.
  • Связь осуществляется через локальные сокеты (все взаимодействие на рабочем месте за пределами первоначального подключения и аутентификации выполняется с использованием дескриптора файла, возвращенного из него local_connect_and_auth, и не более, чем с файлом, связанным с сокетом ). Опять же, настолько дешево, насколько это возможно, когда дело доходит до связи между процессами.
  • Учитывая разницу в производительности, показанную выше (намного выше, чем та, что вы видите в своей программе), есть много возможностей для накладных расходов, перечисленных выше.
  • Этот случай полностью отличается от случаев, когда простые или сложные объекты должны передаваться в интерпретатор Python и обратно в форме, доступной для обеих сторон в виде совместимых с рассолами дампов (наиболее примечательные примеры включают UDF старого стиля, некоторые части старого стиль MLLib).

Изменить 2 :

Поскольку jasper-m беспокоился о стоимости запуска, можно легко доказать, что Python по-прежнему имеет значительное преимущество над Scala, даже если размер ввода значительно увеличен.

Вот результаты для 2003360 строк / 5.6G (один и тот же вход, только дублированный несколько раз, 30 повторений), который превосходит все, что вы можете ожидать в одной задаче Spark.

введите описание изображения здесь

  • Python 22809,57 (21466,26, 24152,87)
  • Scala 27315,28 (24367,24, 30263,31)

Обратите внимание на непересекающиеся доверительные интервалы.

Изменить 3 :

Чтобы ответить на другой комментарий от Jasper-M:

Большая часть всей обработки все еще происходит внутри JVM в случае Spark.

Это просто неверно в данном конкретном случае:

  • Речь идет о задании карты с одним глобальным сокращением с использованием PyDpark RDD.
  • PySpark RDD (в отличие от, скажем так DataFrame) реализует брутто-функциональность изначально в Python, за исключением ввода, вывода и связи между узлами.
  • Поскольку это одноэтапное задание, а конечный вывод достаточно мал, чтобы его можно было проигнорировать, основная ответственность JVM (если не придираться, это реализовано в основном в Java, а не в Scala) - это запускать формат ввода Hadoop и передавать данные через сокет. файл в Python.
  • Часть чтения идентична для JVM и Python API, поэтому ее можно рассматривать как постоянные издержки. Это также не считается основной частью обработки , даже для такой простой работы, как эта.
user10938362
источник
3
отличный подход к проблеме. Спасибо, что поделились этим
Александрос Бирацис
1
@egordoe Александрос сказал, что «здесь не вызывается UDF», а не «Python не вызывается» - в этом вся разница. Затраты на сериализацию важны, когда данные обмениваются между системами (т.е. когда вы хотите передать данные в UDF и обратно).
user10938362
1
@egordoe Вы явно путаете две вещи - накладные расходы на сериализацию, которая является проблемой, когда нетривиальные объекты передаются взад и вперед. И накладные расходы на общение. Серийные издержки здесь незначительны или вообще отсутствуют, потому что вы просто передаете и декодируете строки байтов, и это происходит в основном в направлении, так как назад вы получаете одно целое число на раздел. Коммуникация вызывает некоторую обеспокоенность, но передача данных через локальные сокеты эффективна, поскольку она действительно достигается, когда речь идет о межпроцессном взаимодействии. Если это не ясно, я рекомендую прочитать источник - это не сложно и будет поучительно.
user10938362
1
Кроме того, методы сериализации просто не равны. Как показывает случай Spark, хорошие методы сериализации могут снизить стоимость до уровня, на котором это больше не имеет значения (см. UDF Pandas со стрелкой), и когда это происходит, другие факторы могут доминировать (см., Например, сравнение производительности между оконными функциями Scala и их эквивалентами с Pandas). UDFs - Python выигрывает там с гораздо большим отрывом, чем в этом вопросе).
user10938362
1
И ваша точка зрения @ Джаспер-М? Отдельные задачи Spark обычно достаточно малы, чтобы иметь сравнимую с ними рабочую нагрузку. Не поймите меня неправильно, но если у вас есть какой-либо реальный контрпример, который лишает законной силы этот или весь вопрос, пожалуйста, опубликуйте его. Я уже отметил, что вторичные действия в некоторой степени способствуют этому значению, но они не влияют на стоимость. Мы все инженеры (какого-то рода) здесь - давайте говорить цифры и код, а не убеждения, не так ли?
user10938362
4

Задание Scala занимает больше времени, поскольку имеет неверную конфигурацию, и поэтому задания Python и Scala были предоставлены с неравными ресурсами.

В коде есть две ошибки:

val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sc.hadoopConfiguration.set("spark.executor.instances", "4") // LINE #4
sc.hadoopConfiguration.set("spark.executor.cores", "8") // LINE #5
  1. ЛИНИЯ 1. После того, как линия была выполнена, конфигурация ресурса задания Spark уже установлена ​​и исправлена. С этого момента нет возможности что-либо настроить. Ни количество исполнителей, ни количество ядер на одного исполнителя.
  2. ЛИНИЯ 4-5. sc.hadoopConfigurationэто неправильное место для установки любой конфигурации Spark. Это должно быть установлено в configэкземпляре, который вы передаете new SparkContext(config).

[ДОБАВЛЕНО] Учитывая вышесказанное, я бы предложил изменить код задания Scala на

config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

и перепроверьте это снова. Могу поспорить, что версия Scala будет в X раз быстрее.

egordoe
источник
Я проверил, что обе задачи выполняют 32 задачи параллельно, поэтому я не думаю, что это виновник?
Maestromusica
спасибо за редактирование, попробую проверить прямо сейчас
maestromusica
Привет @maestromusica это должно быть что-то в конфигурации ресурса, потому что, по сути, Python не может превзойти Scala в этом конкретном случае использования. Другой причиной могут быть некоторые некоррелированные случайные факторы, то есть нагрузка на кластер в определенный момент и тому подобное. Кстати, какой режим вы используете? автономный, местный, пряжа?
egordoe
Да, я подтвердил, что этот ответ неверен. Время выполнения такое же. Я также напечатал конфигурацию в обоих случаях, и она идентична.
Maestromusica
1
Я думаю, что вы можете быть правы. Я задал этот вопрос, чтобы исследовать все другие возможности, такие как ошибка в коде или, может быть, я что-то неправильно понял. Спасибо за ваш вклад.
Maestromusica