Я использую apache kafka для обмена сообщениями. Я реализовал производителя и потребителя на Java. Как узнать количество сообщений в теме?
источник
Я использую apache kafka для обмена сообщениями. Я реализовал производителя и потребителя на Java. Как узнать количество сообщений в теме?
Единственный способ, который приходит в голову для этого с точки зрения потребителя, - это фактически потреблять сообщения и затем пересчитывать их.
Брокер Kafka предоставляет счетчики JMX для количества сообщений, полученных с момента запуска, но вы не можете знать, сколько из них уже было очищено.
В большинстве распространенных сценариев сообщения в Kafka лучше всего рассматривать как бесконечный поток, и получение дискретного значения того, сколько сообщений в настоящее время хранится на диске, не имеет значения. Более того, все усложняется при работе с кластером брокеров, у каждого из которых есть подмножество сообщений в теме.
Это не java, но может быть полезно
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
источник
bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609
И тогда разница возвращает фактические ожидающие сообщения в теме? Я прав?Я фактически использую это для тестирования своего POC. Элемент, который вы хотите использовать ConsumerOffsetChecker. Вы можете запустить его с помощью сценария bash, как показано ниже.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
И вот результат: Как вы можете видеть в красном поле, 999 - это номер сообщения, находящегося в данный момент в теме.
Обновление: ConsumerOffsetChecker устарел с 0.10.0, вы можете начать использовать ConsumerGroupCommand.
источник
Иногда интересно знать количество сообщений в каждом разделе, например, при тестировании специального разделителя. Следующие шаги были протестированы для работы с Kafka 0.10.2.1-2 из Confluent 3.2. Учитывая тему Kafka
kt
и следующую командную строку:$ kafka-run-class kafka.tools.GetOffsetShell \ --broker-list host01:9092,host02:9092,host02:9092 --topic kt
Будет напечатан образец вывода, показывающий количество сообщений в трех разделах:
kt:2:6138 kt:1:6123 kt:0:6137
Количество строк может быть больше или меньше в зависимости от количества разделов в теме.
источник
Поскольку
ConsumerOffsetChecker
эта функция больше не поддерживается, вы можете использовать эту команду для проверки всех сообщений в теме:bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \ --group my-group \ --bootstrap-server localhost:9092 \ --describe
Где
LAG
количество сообщений в разделе темы:Также вы можете попробовать использовать kafkacat . Это проект с открытым исходным кодом, который может помочь вам читать сообщения из темы и раздела и выводить их на стандартный вывод. Вот образец, который читает последние 10 сообщений из
sample-kafka-topic
темы, а затем выходит:kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
источник
Используйте https://prestodb.io/docs/current/connector/kafka-tutorial.html
Механизм super SQL, предоставляемый Facebook, который подключается к нескольким источникам данных (Cassandra, Kafka, JMX, Redis ...).
PrestoDB работает как сервер с необязательными рабочими (есть автономный режим без дополнительных рабочих), затем вы используете небольшой исполняемый файл JAR (называемый presto CLI) для выполнения запросов.
После того, как вы правильно настроили сервер Presto, вы можете использовать традиционный SQL:
SELECT count(*) FROM TOPIC_NAME;
источник
Команда Apache Kafka для получения необработанных сообщений на всех разделах темы:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group
Печать:
Group Topic Pid Offset logSize Lag Owner test_group test 0 11051 11053 2 none test_group test 1 10810 10812 2 none test_group test 2 11027 11028 1 none
Столбец 6 - это необработанные сообщения. Сложите их так:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} END {print sum}'
awk читает строки, пропускает строку заголовка, складывает шестой столбец и в конце выводит сумму.
Печать
5
источник
Чтобы получить все сообщения, сохраненные для темы, вы можете найти потребителя в начале и конце потока для каждого раздела и просуммировать результаты.
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(p -> new TopicPartition(topic, p.partition())) .collect(Collectors.toList()); consumer.assign(partitions); consumer.seekToEnd(Collections.emptySet()); Map<TopicPartition, Long> endPartitions = partitions.stream() .collect(Collectors.toMap(Function.identity(), consumer::position)); consumer.seekToBeginning(Collections.emptySet()); System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
источник
Выполните следующее (при условии, что
kafka-console-consumer.sh
это путь):kafka-console-consumer.sh --from-beginning \ --bootstrap-server yourbroker:9092 --property print.key=true \ --property print.value=false --property print.partition \ --topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
источник
--new-consumer
этот параметр, поскольку этот параметр больше недоступен (или, по-видимому, необходим)Используя Java-клиент Kafka 2.11-1.0.0, вы можете сделать следующее:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // after each message, query the number of messages of the topic Set<TopicPartition> partitions = consumer.assignment(); Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); for(TopicPartition partition : offsets.keySet()) { System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition)); } } }
Вывод выглядит примерно так:
offset = 10, key = null, value = un partition test is at 13 offset = 11, key = null, value = deux partition test is at 13 offset = 12, key = null, value = trois partition test is at 13
источник
seekToEnd(..)
иseekToBeginning(..)
методами , которые изменяют состояние изconsumer
.В последних версиях Kafka Manager есть столбец Суммированные недавние смещения .
источник
У меня был такой же вопрос, и вот как я это делаю, от KafkaConsumer в Котлине:
Очень грубый код, так как я только что заставил это работать, но в основном вы хотите вычесть начальное смещение темы из конечного смещения, и это будет текущее количество сообщений для темы.
Вы не можете просто полагаться на конечное смещение из-за других конфигураций (политика очистки, retention-ms и т. Д.), Которые могут в конечном итоге привести к удалению старых сообщений из вашей темы. Смещения только «перемещаются» вперед, так что это начальное смещение, которое будет двигаться вперед ближе к конечному смещению (или, в конечном итоге, к тому же значению, если в теме сейчас нет сообщения).
В основном конечное смещение представляет собой общее количество сообщений, прошедших через эту тему, а разница между ними представляет количество сообщений, которые тема содержит прямо сейчас.
источник
Выдержки из документов Kafka
Устаревшие в 0.9.0.0
Kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) устарел. В дальнейшем для этой функции используйте kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand).
Я использую брокер Kafka с включенным SSL как для сервера, так и для клиента. Ниже я использую команду
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
где / tmp / ssl_config, как показано ниже
источник
Если у вас есть доступ к интерфейсу JMX сервера, начальные и конечные смещения присутствуют в:
(вам нужно заменить
TOPICNAME
&PARTITIONNUMBER
). Имейте в виду, что вам нужно проверить каждую реплику данного раздела, или вам нужно выяснить, какой из брокеров является лидером для данного раздела (и это может измениться со временем).В качестве альтернативы вы можете использовать методы Kafka Consumer
beginningOffsets
иendOffsets
.источник
Я не пробовал это сам, но это , кажется, имеет смысл.
Вы также можете использовать
kafka.tools.ConsumerOffsetChecker
( источник ).источник
Самый простой способ, который я нашел, - использовать REST API Kafdrop
/topic/topicName
и указать заголовок key:"Accept"
/ value:,"application/json"
чтобы получить ответ JSON.Это описано здесь .
источник
Вы можете использовать kafkatool . Пожалуйста, проверьте эту ссылку -> http://www.kafkatool.com/download.html
источник