apache-kafka - 如何删除一个特定主题的组的消费者偏移量

假设我有两个主题(都有两个分区和无限保留):

  • my_topic_a
  • my_topic_b

和一个消费者组:

  • 我的消费者

在某些时候,它同时消耗了两个主题,但由于一些变化,它不再对 my_topic_a 感兴趣,所以它停止消耗它并且现在正在累积滞后:

kafka-consumer-groups.sh --bootstrap-server=kafka.core-kafka.svc.cluster.local:9092 --group my_consumer --describe
TOPIC                                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                  HOST            CLIENT-ID
my_topic_a                           0          300000          400000          100000          -                                                            -               -
my_topic_a                           1          300000          400000          100000          -                                                            -               -
my_topic_b                           0          500000          500000          0               -                                                            -               -
my_topic_b                           1          500000          500000          0               -                                                            -               -

这种延迟让我很烦,因为:

  • 我在 Grafana 中的消费者滞后图被污染了。
  • 触发了自动警报,提醒我消费者滞后太多。

因此我想摆脱 my_consumermy_topic_a 的偏移量,以达到 my_consumer 从未消费过的状态 my_topic_a

以下尝试失败:

kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group my_consumer_group --delete --topic domain.user

有了这个输出:

The consumer does not support topic-specific offset deletion from a consumer group.

我怎样才能实现我的目标? (暂时停止该组的所有消费者在我的用例中是一个可行的选择。)

(我使用的是 Kafka 版本 2.2.0。)


我的猜测是,可以通过为主题 __consumer_offsets 写一些东西来完成一些事情,但我不知道它会是什么。目前,该主题如下所示(再次简化):

kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server kafka:9092 --topic __consumer_offsets --from-beginning
...
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,0]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=299999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000000000, expireTimestamp=None)
[my_consumer_group,my_topic_a,1]::OffsetAndMetadata(offset=300000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1605000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,0]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)
...
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=499999, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000000000, expireTimestamp=None)
[my_consumer_group,my_topic_b,1]::OffsetAndMetadata(offset=500000, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1607000100000, expireTimestamp=None)

最佳答案

与此同时(Kafka 2.8),kafka-consumer-groups.sh 的新--delete-offsets 参数已成为可能。 :-)

  • https://stackoverflow.com/a/66644574/1866775
  • https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets

https://stackoverflow.com/questions/65239579/

相关文章:

c# - EFCore5,使用 FromSqlRaw 和 [Owned] 属性

c# - 使用 Lamar 注入(inject)运行时对象

c# - HttpClient : This instance has already starte

python - 如何将路径参数转发到 VPC 链路端点?

git - 如何在 "git checkout"的输出中着色分支名称?

ansible - 我怎样才能访问另一台服务器的ansible facts?

docker - 如何在 VSCode devcontainer 中使用 minikube?

android - 在 Kotlin 中检查两个对象的某些(不是全部)属性是否相等的惯用方法

python - 子进程无法使用 Pandas 执行文件

ruby-on-rails - 将复杂的哈希传递给 Sidekiq 作业