Есть ли способ очистить тему в кафке?
Я поместил сообщение, которое было слишком большим, в тему сообщения kafka на моей локальной машине, и теперь я получаю сообщение об ошибке:
kafka.common.InvalidMessageSizeException: invalid message size
Увеличение fetch.size
не является идеальным здесь, потому что я не хочу принимать такие большие сообщения.
apache-kafka
purge
Питер Клипфель
источник
источник
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
--delete-config retention.ms
e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
Это также позволяет вам проверить текущий срок хранения, например, kafka-configs --zookeeper <zkhost>: 2181 --describe - темы типа Entity - Entity-Name <имя темы>Для очистки очереди вы можете удалить тему:
затем воссоздайте его:
источник
delete.topic.enable=true
в файлconfig/server.properties
, как говорится в предупреждении, напечатанном упомянутой командойNote: This will have no impact if delete.topic.enable is not set to true.
Вот шаги, которые я выполняю, чтобы удалить тему с именем
MyTopic
:rm -rf /tmp/kafka-logs/MyTopic-0
. Повторите для других разделов и всех репликzkCli.sh
затемrmr /brokers/MyTopic
Если вы пропустите шаг 3, то Apache Kafka продолжит сообщать о теме как о существующей (например, если вы запускаете
kafka-list-topic.sh
).Протестировано с Apache Kafka 0.8.0.
источник
./zookeeper-shell.sh localhost:2181
и./kafka-topics.sh --list --zookeeper localhost:2181
zookeeper-client
вместоzkCli.sh
(попробовал на Cloudera CDH5)Хотя принятый ответ верен, этот метод устарел. Настройка темы теперь должна быть сделана через
kafka-configs
.Конфигурации, установленные с помощью этого метода, можно отобразить с помощью команды
источник
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
Протестировано в Kafka 0.8.2, для примера быстрого запуска: Сначала добавьте одну строку в файл server.properties в папке config:
Затем вы можете запустить эту команду:
источник
Из кафка 1.1
Очистить тему
подождите 1 минуту, чтобы быть уверенным, что кафка очистит тему, удалите конфигурацию, а затем перейдите к значению по умолчанию
источник
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
У kafka нет прямого метода для очистки / очистки темы (Очереди), но это можно сделать, удалив эту тему и воссоздав ее.
сначала убедитесь, что файл sever.properties имеет и если не добавить
delete.topic.enable=true
затем Удалить тему
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
затем создайте его снова.
источник
Иногда, если у вас насыщенный кластер (слишком много разделов, или используются зашифрованные данные раздела, или используется SSL, или контроллер находится на поврежденном узле, или соединение ненадежно, для очистки указанной темы может потребоваться много времени). ,
Я следую этим шагам, особенно если вы используете Avro.
1: Запуск с инструментами Кафки:
2: Запустить на узле реестра схемы:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Установите сохранение темы обратно к первоначальной настройке, когда тема пуста.
Надеюсь, это кому-то поможет, так как это нелегко рекламировать.
источник
kafka-avro-console-consumer
не обязательноОБНОВЛЕНИЕ: Этот ответ актуален для Кафки 0.6. Для Кафки 0,8 и позже смотрите ответ @Patrick.
Да, остановите kafka и вручную удалите все файлы из соответствующего подкаталога (это легко найти в каталоге данных kafka). После перезагрузки кафки тема будет пустой.
источник
Самый простой подход состоит в том, чтобы установить дату отдельных файлов журнала, которая будет старше, чем срок хранения. Затем брокер должен очистить их и удалить их для вас в течение нескольких секунд. Это предлагает несколько преимуществ:
По моему опыту работы с Kafka 0.7.x удаление файлов журнала и перезапуск посредника могут привести к недопустимым исключениям смещения для определенных потребителей. Это может произойти, потому что посредник перезапускает смещения с нуля (при отсутствии каких-либо существующих файлов журнала), а потребитель, который ранее потреблял из этой темы, переподключается, чтобы запросить конкретное [когда-то действительное] смещение. Если это смещение выходит за границы журналов новых тем, то это не повредит, и потребитель возобновляет работу в начале или в конце. Но, если смещение попадает в границы новых журналов тем, брокер пытается получить набор сообщений, но не удается, потому что смещение не совпадает с реальным сообщением.
Это может быть смягчено также очисткой смещения потребителей в zookeeper для этой темы. Но если вам не нужна нетронутая тема и вы просто хотите удалить существующее содержимое, то просто «прикоснуться» к нескольким журналам тем гораздо проще и надежнее, чем останавливать посредников, удалять журналы тем и очищать определенные узлы zookeeper ,
источник
Совет Томаса великолепен, но, к сожалению,
zkCli
в старых версиях Zookeeper (например, 3.3.6), похоже, не поддерживаетсяrmr
. Например, сравните реализацию командной строки в современном Zookeeper с версией 3.3 .Если вы столкнулись со старой версией Zookeeper, одним из решений является использование клиентской библиотеки, такой как zc.zk для Python. Для тех, кто не знаком с Python, вам необходимо установить его с помощью pip или easy_install . Затем запустите оболочку Python (
python
), и вы можете сделать:или даже
если вы хотите удалить все темы из Кафки.
источник
Чтобы очистить все сообщения от определенной темы, используя вашу группу приложений (GroupName должно совпадать с именем группы приложения kafka).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
источник
После ответа @steven appleyard я выполнил следующие команды на Kafka 2.2.0, и они работали для меня.
источник
Здесь много хороших ответов, но среди них я не нашел ни одного о докере. Я потратил некоторое время, чтобы понять, что использование контейнера брокера в этом случае не подходит (очевидно !!!)
и я должен был использовать
zookeeper:2181
вместо того, чтобы--zookeeper localhost:2181
в соответствии с моим файлом составитьправильная команда будет
Надеюсь, это сэкономит кому-то время.
Также помните, что сообщения не будут удалены немедленно, и это произойдет, когда сегмент журнала будет закрыт.
источник
localhost:2181
... Например, вы неправильно понимаете сетевые функции Docker. Кроме того, не все контейнеры Zookeeper имеютkafka-topics
, поэтому лучше не использовать его таким образом. Последние установки Kafka позволяют--bootstrap-servers
изменить тему вместо--zookeeper
you can use
--zookeeper zookeeper: 2181` из контейнера Kafka - моя точка зрения. Или даже извлеките строку Zookeeper из файлаНе удалось добавить в качестве комментария из-за размера: Не уверен, что это правда, кроме обновления retention.ms и retention.bytes, но я заметил, что политика очистки темы должна быть «delete» (по умолчанию), если «compact», она собирается задерживать сообщения дольше, т. е. если оно «компактное», вам также нужно указать delete.retention.ms .
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Также нужно было отслеживать самые ранние / последние смещения, чтобы подтвердить, что это произошло успешно, также можете проверить du -h / tmp / kafka-logs / test-topic-3-100- *
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762
Другая проблема заключается в том, что вы должны сначала получить текущую конфигурацию, чтобы вы не забыли вернуться после успешного удаления:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
источник
Другой, довольно ручной подход для очистки темы:
в брокерах:
sudo service kafka stop
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
в зоопарке:
sudo /usr/lib/zookeeper/bin/zkCli.sh
rmr /brokers/topic/<some_topic_name>
опять в брокерах:
sudo service kafka start
источник
Это должно дать
retention.ms
настроено. Затем вы можете использовать указанную выше команду alter, чтобы изменить значение на 1 секунду (и позже вернуться к значению по умолчанию).источник
Из Java, используя новое
AdminZkClient
вместо устаревшегоAdminUtils
:источник
AdminClient
илиKafkaAdminClient