Я отправляю String-сообщения в Kafka V. 0.8 с помощью Java Producer API. Если размер сообщения составляет около 15 МБ, я получаю файл MessageSizeTooLargeException
. Я пытался установить message.max.bytes
значение 40 МБ, но все равно получаю исключение. Мелкие сообщения работали без проблем.
(Исключение появляется у производителя, у меня нет потребителя в этом приложении.)
Что я могу сделать, чтобы избавиться от этого исключения?
Мой пример конфигурации производителя
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Журнал ошибок:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
java
apache-kafka
Сонсон123
источник
источник
Ответы:
Вам нужно настроить три (или четыре) свойства:
fetch.message.max.bytes
определяет максимальный размер сообщения, которое может получить потребитель.replica.fetch.max.bytes
- это позволит репликам в брокерах отправлять сообщения в кластере и обеспечивать правильность репликации сообщений. Если это слишком мало, то сообщение никогда не будет реплицировано, и, следовательно, потребитель никогда не увидит сообщение, потому что сообщение никогда не будет зафиксировано (полностью реплицировано).message.max.bytes
- это наибольший размер сообщения, которое может быть получено брокером от производителя.max.message.bytes
- это наибольший размер сообщения, которое брокер разрешает добавлять в тему. Этот размер подтвержден предварительным сжатием. (По умолчанию у брокераmessage.max.bytes
.)Я узнал о номере 2 на собственном горьком опыте - вы не получаете НИКАКИХ исключений, сообщений или предупреждений от Kafka, поэтому обязательно учитывайте это, когда отправляете большие сообщения.
источник
message.max.bytes
в исходном коде. Но мне нужно установить эти значения в конфигурации сервера Kafkaconfig/server.properties
. Теперь работают и большие сообщения :).fetch.message.max.bytes
память для КАЖДОГО раздела. Это означает, что если вы используете огромное количествоfetch.message.max.bytes
разделов в сочетании с большим количеством разделов, это потребует много памяти. Фактически, поскольку процесс репликации между брокерами также является специализированным потребителем, он также потребляет память на брокерах.max.message.bytes
конфигурация для каждой темы, которая может быть ниже, чем у брокераmessage.max.bytes
./.*fetch.*bytes/
, не кажутся жесткими ограничениями: «Это не абсолютный максимум, если [...] больше, чем это значение, пакет записи будет по-прежнему должны быть возвращены, чтобы гарантировать прогресс ».Незначительные изменения, необходимые для Kafka 0.10 и нового потребителя, по сравнению с ответом смеющегося_man :
message.max.bytes
иreplica.fetch.max.bytes
.message.max.bytes
должно быть равно или меньше (*)replica.fetch.max.bytes
.max.request.size
чтобы отправить сообщение большего размера.max.partition.fetch.bytes
чтобы получать сообщения большего размера.(*) Прочтите комментарии, чтобы узнать больше о
message.max.bytes
<=replica.fetch.max.bytes
источник
message.max.bytes
должно быть меньше чемreplica.fetch.max.bytes
?replica.fetch.max.bytes
должно быть строго большеmessage.max.bytes
. Ранее сегодня один из сотрудников Confluent подтвердил то, что я подозревал: на самом деле эти две величины могут быть равными.message.max.bytes<replica.fetch.max.bytes
илиmessage.max.bytes=replica.fetch.max.bytes
@Kostas?Вам необходимо переопределить следующие свойства:
Конфигурации брокера ($ KAFKA_HOME / config / server.properties)
Конфигурации потребителей ($ KAFKA_HOME / config / consumer.properties)
Этот шаг у меня не сработал. Я добавляю его в потребительское приложение, и оно работает нормально
Перезагрузите сервер.
посмотрите эту документацию для получения дополнительной информации: http://kafka.apache.org/08/configuration.html
источник
Идея состоит в том, чтобы иметь одинаковый размер сообщения, отправляемого от Kafka Producer в Kafka Broker, а затем полученного Kafka Consumer, т.е.
Производитель Kafka -> Брокер Kafka -> Потребитель Kafka
Предположим, что если требуется отправить 15 МБ сообщения, тогда производитель , брокер и потребитель , все трое, должны быть синхронизированы.
Kafka Producer отправляет 15 МБ -> Kafka Broker разрешает / сохраняет 15 МБ -> Kafka Consumer получает 15 МБ
Следовательно, настройка должна быть:
а) о Брокере:
б) на Потребителе:
источник
Важно помнить, что
message.max.bytes
атрибут должен быть синхронизирован соfetch.message.max.bytes
свойством потребителя . размер выборки должен быть не меньше максимального размера сообщения, в противном случае может возникнуть ситуация, когда производители могут отправлять сообщения большего размера, чем потребитель может принять / получить. Возможно, стоит взглянуть на это.Какую версию Kafka вы используете? Также предоставьте более подробную информацию о трассировке, которую вы получаете. что-то вроде ...
payload size of xxxx larger than 1000000
появляется в журнале?источник
Ответ от @laughing_man довольно точен. Но все же я хотел дать рекомендацию, которую я узнал от эксперта Kafka Стефана Маарека из Quora.
Kafka не предназначен для обработки больших сообщений.
Ваш API должен использовать облачное хранилище (Ex AWS S3) и просто отправить в Kafka или любой брокер сообщений ссылку на S3. Вы должны найти место для хранения ваших данных, возможно, это сетевой диск, может быть, что угодно, но это не должен быть брокер сообщений.
Теперь, если вы не хотите использовать вышеуказанное решение
Максимальный размер сообщения составляет 1 МБ (настройка в ваших брокерах называется
message.max.bytes
) Apache Kafka . Если вам это действительно нужно, вы можете увеличить этот размер и обязательно увеличить сетевые буферы для ваших производителей и потребителей.И если вы действительно заботитесь о разделении своего сообщения, убедитесь, что каждое разделение сообщения имеет один и тот же ключ, чтобы оно было перенесено в один и тот же раздел, а содержимое вашего сообщения должно сообщать «идентификатор части», чтобы ваш потребитель мог полностью восстановить сообщение. .
Вы также можете изучить сжатие, если ваше сообщение основано на тексте (сжатие gzip, snappy, lz4), что может уменьшить размер данных, но не волшебным образом.
Опять же, вам нужно использовать внешнюю систему для хранения этих данных и просто отправить внешнюю ссылку на Kafka. Это очень распространенная архитектура, и вам следует придерживаться ее, и она будет широко распространена.
Имейте в виду, что Kafka работает лучше всего, только если сообщения огромны по объему, но не по размеру.
Источник: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
источник
Для людей, использующих landoop kafka: вы можете передавать значения конфигурации в переменных среды, например:
И если вы используете rdkafka, то передайте message.max.bytes в конфигурации производителя, например:
Аналогичным образом для потребителя
источник