Есть ли способ удалить все данные из темы или удалить тему перед каждым запуском?
Могу ли я изменить файл KafkaConfig.scala, чтобы изменить logRetentionHours
свойство? Есть ли способ удалить сообщения, как только потребитель их прочитает?
Я использую производителей для извлечения данных откуда-то и отправки данных в определенную тему, где потребляет потребитель. Могу ли я удалять все данные из этой темы при каждом запуске? Мне нужны только новые данные каждый раз в теме. Есть ли способ как-нибудь повторно инициализировать тему?
apache-kafka
apache-zookeeper
TommyT
источник
источник
Ответы:
Не думаю, что это поддерживается.Взгляните на этот выпуск JIRA «Добавить поддержку для удаления темы».Чтобы удалить вручную:
log.dir
атрибутом в файле конфигурации kafka), а также данные zookeeperДля любой заданной темы вы можете
/tmp/kafka-logs/MyTopic-0
там, где/tmp/kafka-logs
это указано вlog.dir
атрибутеЭто
NOT
хороший и рекомендуемый подход, но он должен работать. В конфигурационном файле брокера Kafkalog.retention.hours.per.topic
атрибут используется для определенияThe number of hours to keep a log file before deleting it for some specific topic
Из документации Kafka :
Чтобы найти начальное смещение для чтения в примере Kafka 0.8 Simple Consumer, они говорят
Вы также можете найти здесь пример кода для управления смещением на стороне потребителя.
источник
brokers/topics/<topic_to_delete>
Чтобы избавиться от этого, вам придется рекурсивно удалить все, что находится ниже, а также журналы.kafka-run-class.sh kafka.admin.DeleteTopicCommand
.kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181
Как я уже упоминал здесь, очистить очередь Kafka :
Протестировано в Kafka 0.8.2, для примера быстрого запуска: сначала добавьте одну строку в файл server.properties в папке config:
затем вы можете запустить эту команду:
источник
Протестировано с кафкой 0.10
Примечание: если вы удаляете папки с темами внутри kafka-logs, но не из папки zookeeper-data, вы увидите, что темы все еще там.
источник
В качестве грязного обходного пути вы можете настроить параметры хранения времени выполнения для каждой темы, например
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1
( retention.bytes = 0 также может работать)Через некоторое время кафка должен освободить место. Не уверен, имеет ли это какое-либо значение по сравнению с повторным созданием темы.
пс. Лучше вернуть настройки удержания, как только кафка закончил с очисткой.
Вы также можете использовать
retention.ms
для сохранения исторических данныхисточник
Ниже приведены сценарии для очистки и удаления темы Kafka, предполагающей localhost в качестве сервера zookeeper, а Kafka_Home установлен в каталог установки:
Приведенный ниже сценарий очистит тему, установив время хранения на 1 секунду, а затем удалив конфигурацию:
Чтобы полностью удалить темы, вы должны остановить все применимые брокеры kafka и удалить их каталог (ы) из каталога журналов kafka (по умолчанию: / tmp / kafka-logs), а затем запустить этот сценарий, чтобы удалить тему из zookeeper. Чтобы убедиться, что он был удален из zookeeper, вывод ls / brokers / themes больше не должен включать тему:
источник
grep "log.retention.check.interval" $Kafka_Home/config/server.properties
--add config
так--add-config
Мы попробовали в значительной степени то, что описывают другие ответы, с умеренным уровнем успеха. Что действительно сработало для нас (Apache Kafka 0.8.1), так это команда класса
sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost: 2181
источник
Error: Could not find or load main class kafka.admin.DeleteTopicCommand
Для любителей пива
Если вы используете
brew
как я и потратили много времени на поиск печально известнойkafka-logs
папки, не бойтесь больше. (и, пожалуйста, дайте мне знать, работает ли это для вас и нескольких разных версий Homebrew, Kafka и т. д. :))Вы, вероятно, найдете его в:
Место расположения:
/usr/local/var/lib/kafka-logs
Как на самом деле найти этот путь
(это также полезно практически для каждого приложения, которое вы устанавливаете через brew)
1)
brew services list
2) Откройте и прочтите то, что
plist
вы нашли выше3) Найдите строку, определяющую
server.properties
местоположение, откройте ее, в моем случае:/usr/local/etc/kafka/server.properties
4) Ищите
log.dirs
строку:5) Перейдите в это место и удалите журналы по темам, которые вам нужны.
6) Перезапустите Kafka с помощью
brew services restart kafka
источник
Все данные о темах и их разделах хранятся в
tmp/kafka-logs/
. Более того, они хранятся в форматеtopic-partionNumber
, поэтому, если вы хотите удалить темуnewTopic
, вы можете:rm -rf /tmp/kafka-logs/newTopic-*
источник
log.retention.hours
и добавлятьlog.retention.ms=1000
. Это сохранит запись на Kafka Topic только одну секунду.log.retention.hours
желаемое значение.источник
Начиная с версии kafka 2.3.0, есть альтернативный способ мягкого удаления Kafka (старый подход устарел).
Обновите retention.ms до 1 секунды (1000 мс), затем снова установите его через минуту, до значения по умолчанию, то есть 7 дней (168 часов, 604 800 000 в мс)
Мягкое удаление: - (rentention.ms = 1000) (с использованием kafka-configs.sh)
Установка по умолчанию: - 7 дней (168 часов, retention.ms = 604800000)
источник
При ручном удалении темы из кластера kafka вы можете просто проверить это https://github.com/darrenfu/bigdata/issues/6 . Важный шаг, который часто упускается в большинстве решений, заключается в удалении
/config/topics/<topic_name>
в ZK.источник
Я использую этот скрипт:
источник
Я использую указанную ниже утилиту для очистки после запуска интеграционного теста.
Он использует последнюю
AdminZkClient
версию api. Старый API устарел.Есть возможность удалить тему. Но он отмечает тему для удаления. Позже Zookeeper удалит тему. Поскольку это может быть непредсказуемо долго, я предпочитаю подход retention.ms
источник