От чего зависит потребительское смещение Кафки?

170

Я относительно новичок в Кафке. Я немного поэкспериментировал с этим, но некоторые вещи мне неясны в отношении компенсации потребителю. Из того, что я понял до сих пор, когда потребитель начинает, смещение, с которого он начнет читать, определяется настройкой конфигурации auto.offset.reset(поправьте меня, если я ошибаюсь).

Теперь скажем, например, что в теме 10 сообщений (смещений от 0 до 9), и потребитель получал 5 из них до того, как оно вышло из строя (или до того, как я убил потребителя). Затем скажите, что я перезапускаю этот процесс. Мои вопросы:

  1. Если auto.offset.resetустановлено значение smallest, всегда ли оно начнет потреблять со смещения 0?

  2. Если auto.offset.resetустановлено значение largest, оно начнет потреблять со смещения 5?

  3. Всегда ли поведение в отношении такого сценария детерминировано?

Пожалуйста, не стесняйтесь комментировать, если что-то в моем вопросе неясно. Заранее спасибо.

Асиф Икбал
источник

Ответы:

260

Это немного сложнее, чем вы описали.
В auto.offset.resetконфигурации пинки в только если ваша группа потребителей не имеет действительное смещение покончило где - то (2 поддерживаются офсетные хранилища теперь Кафка и Zookeeper), и это также зависит от того , какого потребителя вы используете.

Если вы используете высокоуровневого Java-потребителя, представьте следующие сценарии:

  1. У вас есть потребитель в группе потребителей, group1который принял 5 сообщений и умер. В следующий раз, когда вы запустите этот потребитель, он даже не будет использовать этот auto.offset.resetконфиг и продолжит работу с того места, где он умер, потому что он просто извлечет сохраненное смещение из хранилища смещений (Kafka или ZK, как я уже упоминал).

  2. У вас есть сообщения в теме (как вы описали), и вы начинаете потребителя в новой группе потребителей group2. Смещение не сохраняется нигде, и на этот раз auto.offset.resetконфиг решит, начинать ли с начала темы ( earliest) или с конца темы ( latest)

Еще одна вещь, которая влияет на то, какому значению будет соответствовать смещение, earliestи его latestконфигурации - это политика хранения журнала. Представьте, что у вас есть тема с задержкой, настроенной на 1 час. Вы создаете 5 сообщений, а через час вы публикуете еще 5 сообщений. latestСмещение будет по- прежнему остаются такими же , как и в предыдущем примере , но earliestодин не сможет быть 0потому , что Кафка уже удалить эти сообщения и , таким образом , самый ранний доступный офсетных будет 5.

Все, что упомянуто выше, не имеет отношения к нему, SimpleConsumerи каждый раз, когда вы его запускаете, оно решает, с чего начать, используя auto.offset.resetконфигурацию.

Если вы используете Кафка версии старше 0.9, вы должны заменить earliest, latestс smallest, largest.

serejja
источник
3
Большое спасибо за ответ. Так что для потребителя высокого уровня, когда потребитель что-то совершил (в ZK или Kafka), auto.offset.resetэто не имеет никакого значения после этого? Единственное значение этого параметра - когда ничего не совершено (и в идеале это будет при первом запуске потребителя)?
Асиф Икбал
2
Точно так же, как вы описали
serejja
1
@serejja Привет. Как насчет того, если у меня всегда есть 1 потребитель на группу, и сценарий № 1 вашего ответа происходит для меня? Будет ли это так же?
ha9u63ar
1
@ ha9u63ar не совсем понял ваш вопрос. Если вы перезапустите своего потребителя в той же группе, то да, он не будет использовать auto.offset.resetи продолжать с зафиксированного смещения. Если вы всегда используете другую группу потребителей (например, генерировать ее при запуске потребителя), то потребитель всегда будет уважатьauto.offset.reset
serejja
@serejja да, и это не работает для меня. не могли бы вы взглянуть на это - это моя проблема
ha9u63ar
83

Просто обновление: начиная с Kafka 0.9 и далее, Kafka использует новую Java-версию потребителя, и имена параметров auto.offset.reset изменились; Из руководства:

Что делать, если в Kafka нет начального смещения или если текущее смещение больше не существует на сервере (например, потому что эти данные были удалены):

Самый ранний : автоматически сбрасывает смещение на самое раннее смещение

последний : автоматически сбрасывает смещение на последнее смещение

none : выдать исключение потребителю, если для группы потребителя не найдено предыдущего смещения

что-нибудь еще: бросить исключение потребителю.

Я потратил некоторое время, чтобы найти это после проверки принятого ответа, поэтому подумал, что сообществу может быть полезно опубликовать его.

Израиль Цинк
источник
9

Более того, есть offsets.retention.minutes. Если время с момента последнего коммита> offsets.retention.minutes, то auto.offset.resetтакже

Саша Нинкович
источник
1
не кажется ли это избыточным с сохранением журнала? срок хранения офсет должен быть основан на сохранении журнала?
mike01010
@ mike01010 это верно. Это должно быть основано на сохранении журнала, это одно из предложенных решений в заявке. Prolong default value of offsets.retention.minutes to be at least twice larger than log.retention.hours. questions.apache.org/jira/browse/KAFKA-3806
сахеб
Этот ответ напугал меня на некоторое время, пока я не проверить документацию по offsets.retention.minutes:. <Б> После того, как группа потребителей теряет все свои потребитель (т.е. становится пустой) его смещения будут храниться в течение срока хранения перед тем , как отбрасывается </ b> Для автономного потребителям (используя ручное назначение), смещения будут истекать после времени последнего принятия плюс этот период хранения. (Это для Kafka 2.3)
jumping_monkey