Как я могу отправлять большие сообщения с помощью Kafka (более 15 МБ)?

120

Я отправляю String-сообщения в Kafka V. 0.8 с помощью Java Producer API. Если размер сообщения составляет около 15 МБ, я получаю файл MessageSizeTooLargeException. Я пытался установить message.max.bytesзначение 40 МБ, но все равно получаю исключение. Мелкие сообщения работали без проблем.

(Исключение появляется у производителя, у меня нет потребителя в этом приложении.)

Что я могу сделать, чтобы избавиться от этого исключения?

Мой пример конфигурации производителя

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Журнал ошибок:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Сонсон123
источник
5
Моим первым инстинктом было бы попросить вас разделить это огромное сообщение на несколько более мелких: - / Я предполагаю, что это невозможно по какой-то причине, но вы, тем не менее, можете пересмотреть это: огромные сообщения обычно означают, что есть недостаток дизайна где-то, что действительно нужно исправить.
Аарон Дигулла 09
1
Спасибо, но это значительно усложнит мою логику. Почему использовать Kafka для сообщений размером около 15 МБ - плохая идея? Является ли 1 МБ максимально допустимым размером сообщения? Я не нашел много информации об ограничении размера сообщения в документации Kafka.
Sonson123 09
2
Это совершенно не связано с Kafka или любой другой системой обработки сообщений. Мое рассуждение: если что-то пойдет не так с вашим 15-мегабайтным файлом, то потом убрать беспорядок будет очень дорого. Вот почему я обычно разбиваю большие файлы на множество более мелких заданий (которые обычно также могут выполняться параллельно).
Аарон Дигулла 09
вы использовали какое-либо сжатие? не могли бы вы поделиться некоторыми подробностями, сложно угадать что-то из одного-единственного слова
user2720864 09

Ответы:

182

Вам нужно настроить три (или четыре) свойства:

  • Сторона потребителя: fetch.message.max.bytesопределяет максимальный размер сообщения, которое может получить потребитель.
  • Сторона брокера: replica.fetch.max.bytes- это позволит репликам в брокерах отправлять сообщения в кластере и обеспечивать правильность репликации сообщений. Если это слишком мало, то сообщение никогда не будет реплицировано, и, следовательно, потребитель никогда не увидит сообщение, потому что сообщение никогда не будет зафиксировано (полностью реплицировано).
  • Сторона брокера: message.max.bytes- это наибольший размер сообщения, которое может быть получено брокером от производителя.
  • Сторона брокера (по теме): max.message.bytes- это наибольший размер сообщения, которое брокер разрешает добавлять в тему. Этот размер подтвержден предварительным сжатием. (По умолчанию у брокера message.max.bytes.)

Я узнал о номере 2 на собственном горьком опыте - вы не получаете НИКАКИХ исключений, сообщений или предупреждений от Kafka, поэтому обязательно учитывайте это, когда отправляете большие сообщения.

смеющийся человек
источник
3
Хорошо, вы и user2720864 были правы. Я только установил message.max.bytesв исходном коде. Но мне нужно установить эти значения в конфигурации сервера Kafka config/server.properties. Теперь работают и большие сообщения :).
Sonson123 03
3
Есть ли какие-либо известные недостатки при установке слишком высоких значений?
Иван Балашов
7
Да. Со стороны потребителя вы выделяете fetch.message.max.bytesпамять для КАЖДОГО раздела. Это означает, что если вы используете огромное количество fetch.message.max.bytesразделов в сочетании с большим количеством разделов, это потребует много памяти. Фактически, поскольку процесс репликации между брокерами также является специализированным потребителем, он также потребляет память на брокерах.
смеющийся_man
3
Обратите внимание, что существует также max.message.bytesконфигурация для каждой темы, которая может быть ниже, чем у брокера message.max.bytes.
Питер Дэвис
1
Согласно официальному документу, параметры на стороне потребителя и параметры, касающиеся репликации между брокерами /.*fetch.*bytes/, не кажутся жесткими ограничениями: «Это не абсолютный максимум, если [...] больше, чем это значение, пакет записи будет по-прежнему должны быть возвращены, чтобы гарантировать прогресс ».
Bluu
56

Незначительные изменения, необходимые для Kafka 0.10 и нового потребителя, по сравнению с ответом смеющегося_man :

  • Брокер: Без изменений, вам еще нужно увеличить свойства message.max.bytesи replica.fetch.max.bytes. message.max.bytesдолжно быть равно или меньше (*) replica.fetch.max.bytes.
  • Производитель: Увеличьте, max.request.sizeчтобы отправить сообщение большего размера.
  • Потребитель: увеличьте, max.partition.fetch.bytesчтобы получать сообщения большего размера.

(*) Прочтите комментарии, чтобы узнать больше о message.max.bytes<=replica.fetch.max.bytes

Саша Веттер
источник
2
Вы знаете, почему message.max.bytesдолжно быть меньше чем replica.fetch.max.bytes?
Костас
2
" replica.fetch.max.bytes (по умолчанию: 1 МБ) - максимальный размер данных, которые может реплицировать брокер. Он должен быть больше, чем message.max.bytes , иначе брокер примет сообщения и не сможет их реплицировать. Приводит к потенциальная потеря данных ". Источник: handle-large-messages-kafka
Саша Веттер
2
Спасибо, что связались со мной. Это похоже на то, что предлагает руководство Cloudera . Однако оба они неверны - обратите внимание, что они не предлагают никаких технических причин, почему replica.fetch.max.bytes должно быть строго больше message.max.bytes. Ранее сегодня один из сотрудников Confluent подтвердил то, что я подозревал: на самом деле эти две величины могут быть равными.
Костас
2
Есть ли какие-нибудь обновления относительно message.max.bytes<replica.fetch.max.bytesили message.max.bytes=replica.fetch.max.bytes@Kostas?
Саша Веттер
2
Да, они могут быть равными: mail-archive.com/users@kafka.apache.org/msg25494.html (Исмаэль работает на Confluent)
Костас
13

Вам необходимо переопределить следующие свойства:

Конфигурации брокера ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Конфигурации потребителей ($ KAFKA_HOME / config / consumer.properties)
Этот шаг у меня не сработал. Я добавляю его в потребительское приложение, и оно работает нормально

  • fetch.message.max.bytes

Перезагрузите сервер.

посмотрите эту документацию для получения дополнительной информации: http://kafka.apache.org/08/configuration.html

user2550587
источник
1
для потребителя командной строки мне нужно использовать флаг --fetch-size = <bytes>. Кажется, он не читает файл consumer.properties (kafka 0.8.1). Я бы также порекомендовал включить сжатие со стороны производителя, используя параметр compress.codec.
Ziggy Eunicien
Комментарий Зигги работал у меня kafka 0.8.1.1. Спасибо!
Джеймс
может быть, fetch.message.max.bytes заменен на max.partition.fetch.bytes в ConsumerConfig?
s_bei
12

Идея состоит в том, чтобы иметь одинаковый размер сообщения, отправляемого от Kafka Producer в Kafka Broker, а затем полученного Kafka Consumer, т.е.

Производитель Kafka -> Брокер Kafka -> Потребитель Kafka

Предположим, что если требуется отправить 15 МБ сообщения, тогда производитель , брокер и потребитель , все трое, должны быть синхронизированы.

Kafka Producer отправляет 15 МБ -> Kafka Broker разрешает / сохраняет 15 МБ -> Kafka Consumer получает 15 МБ

Следовательно, настройка должна быть:

а) о Брокере:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

б) на Потребителе:

fetch.message.max.bytes=15728640
Ravi
источник
2
может быть, fetch.message.max.bytes заменен на max.partition.fetch.bytes в ConsumerConfig?
s_bei
7

Важно помнить, что message.max.bytesатрибут должен быть синхронизирован со fetch.message.max.bytesсвойством потребителя . размер выборки должен быть не меньше максимального размера сообщения, в противном случае может возникнуть ситуация, когда производители могут отправлять сообщения большего размера, чем потребитель может принять / получить. Возможно, стоит взглянуть на это.
Какую версию Kafka вы используете? Также предоставьте более подробную информацию о трассировке, которую вы получаете. что-то вроде ... payload size of xxxx larger than 1000000появляется в журнале?

user2720864
источник
1
Я обновил свой вопрос, добавив дополнительную информацию: Kafka Version 2.8.0-0.8.0; теперь мне нужен только продюсер.
Sonson123
7

Ответ от @laughing_man довольно точен. Но все же я хотел дать рекомендацию, которую я узнал от эксперта Kafka Стефана Маарека из Quora.

Kafka не предназначен для обработки больших сообщений.

Ваш API должен использовать облачное хранилище (Ex AWS S3) и просто отправить в Kafka или любой брокер сообщений ссылку на S3. Вы должны найти место для хранения ваших данных, возможно, это сетевой диск, может быть, что угодно, но это не должен быть брокер сообщений.

Теперь, если вы не хотите использовать вышеуказанное решение

Максимальный размер сообщения составляет 1 МБ (настройка в ваших брокерах называется message.max.bytes) Apache Kafka . Если вам это действительно нужно, вы можете увеличить этот размер и обязательно увеличить сетевые буферы для ваших производителей и потребителей.

И если вы действительно заботитесь о разделении своего сообщения, убедитесь, что каждое разделение сообщения имеет один и тот же ключ, чтобы оно было перенесено в один и тот же раздел, а содержимое вашего сообщения должно сообщать «идентификатор части», чтобы ваш потребитель мог полностью восстановить сообщение. .

Вы также можете изучить сжатие, если ваше сообщение основано на тексте (сжатие gzip, snappy, lz4), что может уменьшить размер данных, но не волшебным образом.

Опять же, вам нужно использовать внешнюю систему для хранения этих данных и просто отправить внешнюю ссылку на Kafka. Это очень распространенная архитектура, и вам следует придерживаться ее, и она будет широко распространена.

Имейте в виду, что Kafka работает лучше всего, только если сообщения огромны по объему, но не по размеру.

Источник: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Бхану Хойсала
источник
4
Вы можете отметить, что «ваша» рекомендация - это почти дословная копия рекомендации Стефана Маарека с Quora на сайте quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
Mike
Kafka работает с большими сообщениями, абсолютно без проблем. На начальной странице домашней страницы Kafka она даже упоминается как система хранения.
calloc_org
3

Для людей, использующих landoop kafka: вы можете передавать значения конфигурации в переменных среды, например:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

И если вы используете rdkafka, то передайте message.max.bytes в конфигурации производителя, например:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

Аналогичным образом для потребителя

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      
информатор
источник