Снова использовать то же сообщение, если обработка сообщения не удалась

10

Я использую клиент Confluent.Kafka .NET версии 1.3.0. Я следую за документами :

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

Проблема в том, что даже если я закомментирую строку consumer.StoreOffset(consumerResult);, я получу следующее неиспользованное сообщение при следующем потреблении , то есть смещение будет увеличиваться, что, по-видимому, не соответствует заявленной в документации документации, то есть, по крайней мере, одна доставка .

Даже если я устанавливаю EnableAutoCommit = falseи удаляю «EnableAutoOffsetStore = false» из конфигурации и заменяю consumer.StoreOffset(consumerResult)на consumer.Commit(), я все равно вижу то же поведение, т.е. даже если я закомментирую Commit, я все равно продолжаю получать следующие неиспользованные сообщения.

Я чувствую, что упускаю что-то фундаментальное здесь, но не могу понять, что. Любая помощь приветствуется!

havij
источник
Сообщения уже были возвращены приложению с точки зрения kafka, поэтому, когда вы фиксируете их, они сохраняются как последние зафиксированные смещения, но будут продолжать возвращать следующие сообщения независимо от того, были ли вы использованы или нет. Каковы ваши ожидания здесь? Не могли бы вы уточнить, что вы ожидаете до / после принятия и потребления?
Сагар Вирам
Сообщения не обновляются, пока вы не воспользуетесь поиском смещения. Это повлияет на потребление, и сообщения будут возвращены из смещения поиска.
Сагар Вирам
@ user2683814 В своем посте я упомянул два сценария в зависимости от того, что EnableAutoCommitустановлено. Допустим, у нас есть EnableAutoCommit = false, и когда я Consume, я получаю сообщение со смещением 11. Я ожидал, что получу одно и то же сообщение со смещением 11 снова и снова, если обработка сообщения продолжит генерироваться и, следовательно, не Commitбудет выполнен вызов .
хавидж
Нет, это не так. Вы не можете управлять тем, что poll ( Consume), используя Commitпосле того, как вы уже Subscribeв теме. Кафка (как в клиентской lib) позади сцены поддерживает все смещения, которые она отправила приложению в, Consumeи она отправит их линейно. Таким образом, чтобы обработать сообщение, как в случае сбоя, вы должны отследить его в своем коде и попытаться компенсировать и начать обработку сообщения, а также вы должны знать, что пропустить, если оно уже было обработано в предыдущих запросах. Я не знаком с библиотекой .net, но это не должно иметь большого значения, так как это дизайн Кафки.
Сагар Вирам
Я думаю, что вы должны использовать комбинацию подписки и назначения, и вам могут потребоваться разные потребители для поддержки вашего варианта использования. В случае сбоев используйте назначение / поиск смещения для тематических разделов с одним потребителем для повторной обработки сообщений, а для обычной обработки используйте другого потребителя с потоком подписки / потребления / принятия.
Сагар Вирам

Ответы:

0

Извините, я не могу добавить комментарий. Потребитель Kafka потребляет сообщения в пакетах, так что, возможно, вы по-прежнему выполняете итерацию пакета, предварительно извлеченного фоновым потоком .

Вы можете проверить, действительно ли ваш потребитель фиксирует смещение или нет, используя kafka util kafka-consumer-groups.sh

kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group consumer_group  --describe
Tuyen Luong
источник
0

Возможно, вы захотите иметь логику повторной попытки для обработки каждого из ваших сообщений в течение фиксированного числа раз, например, скажем 5. Если это не удастся в течение этих 5 попыток, вы можете добавить это сообщение в другую тему для обработки всех сообщений. ошибочные сообщения, которые имеют приоритет над вашей актуальной темой. Или вы можете добавить сообщение с ошибкой в ​​ту же тему, чтобы оно было забрано позже, когда все эти другие сообщения будут использованы.

Если в течение этих 5 попыток обработка какого-либо сообщения прошла успешно, вы можете перейти к следующему сообщению в очереди.

Раджу Дасупалли
источник