假设我有两个主题(都有两个分区和无限保留):
my_topic_a
my_topic_b
和一个消费者组:
我的消费者
在某些时候,它同时消耗了两个主题,但由于一些变化,它不再对 my_topic_a
感兴趣,所以它停止消耗它并且现在正在累积滞后:
kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my_topic_a 0 300000 400000 100000 - - -
my_topic_a 1 300000 400000 100000 - - -
my_topic_b 0 500000 500000 0 - - -
my_topic_b 1 500000 500000 0 - - -
这种延迟让我很烦,因为:
因此我想摆脱 my_consumer
的 my_topic_a
的偏移量,以达到 my_consumer
从未消费过的状态 my_topic_a
。
以下尝试失败:
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user
有了这个输出:
The consumer does not support topic-specific offset deletion from a consumer group.
我怎样才能实现我的目标? (暂时停止该组的所有消费者在我的用例中是一个可行的选择。)
(我使用的是 Kafka 版本 2.2.0
。)
我的猜测是,可以通过为主题 __consumer_offsets
写一些东西来完成一些事情,但我不知道它会是什么。目前,该主题如下所示(再次简化):
kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
最佳答案
与此同时(Kafka 2.8),kafka-consumer-groups.sh
的新--delete-offsets
参数已成为可能。 :-)
https://stackoverflow.com/questions/65239579/