Я использую клиент Confluent.Kafka .NET версии 1.3.0. Я следую за документами :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
Проблема в том, что даже если я закомментирую строку consumer.StoreOffset(consumerResult);
, я получу следующее неиспользованное сообщение при следующем потреблении , то есть смещение будет увеличиваться, что, по-видимому, не соответствует заявленной в документации документации, то есть, по крайней мере, одна доставка .
Даже если я устанавливаю EnableAutoCommit = false
и удаляю «EnableAutoOffsetStore = false» из конфигурации и заменяю consumer.StoreOffset(consumerResult)
на consumer.Commit()
, я все равно вижу то же поведение, т.е. даже если я закомментирую Commit
, я все равно продолжаю получать следующие неиспользованные сообщения.
Я чувствую, что упускаю что-то фундаментальное здесь, но не могу понять, что. Любая помощь приветствуется!
EnableAutoCommit
установлено. Допустим, у нас естьEnableAutoCommit = false
, и когда яConsume
, я получаю сообщение со смещением 11. Я ожидал, что получу одно и то же сообщение со смещением 11 снова и снова, если обработка сообщения продолжит генерироваться и, следовательно, неCommit
будет выполнен вызов .Consume
), используяCommit
после того, как вы ужеSubscribe
в теме. Кафка (как в клиентской lib) позади сцены поддерживает все смещения, которые она отправила приложению в,Consume
и она отправит их линейно. Таким образом, чтобы обработать сообщение, как в случае сбоя, вы должны отследить его в своем коде и попытаться компенсировать и начать обработку сообщения, а также вы должны знать, что пропустить, если оно уже было обработано в предыдущих запросах. Я не знаком с библиотекой .net, но это не должно иметь большого значения, так как это дизайн Кафки.Ответы:
Извините, я не могу добавить комментарий. Потребитель Kafka потребляет сообщения в пакетах, так что, возможно, вы по-прежнему выполняете итерацию пакета, предварительно извлеченного фоновым потоком .
Вы можете проверить, действительно ли ваш потребитель фиксирует смещение или нет, используя kafka util
kafka-consumer-groups.sh
источник
Возможно, вы захотите иметь логику повторной попытки для обработки каждого из ваших сообщений в течение фиксированного числа раз, например, скажем 5. Если это не удастся в течение этих 5 попыток, вы можете добавить это сообщение в другую тему для обработки всех сообщений. ошибочные сообщения, которые имеют приоритет над вашей актуальной темой. Или вы можете добавить сообщение с ошибкой в ту же тему, чтобы оно было забрано позже, когда все эти другие сообщения будут использованы.
Если в течение этих 5 попыток обработка какого-либо сообщения прошла успешно, вы можете перейти к следующему сообщению в очереди.
источник