本文共 2071 字,大约阅读时间需要 6 分钟。
在使用kafka-consumer-groups.sh脚本调整消费者组位移之前,需确保消费者组状态为inactive,即不处于运行状态。现有版本的新功能仅适用于新版本的消费者。
调整消费者组位移的操作分为三大步骤:
确定主题作用域
支持以下三种指定方式:--all-topics:为所有主题的所有分区调整位移。--topic t1 --topic t2:指定多个主题的所有分区。--topic t1:0,1,2:为指定主题的特定分区调整位移。确定位移重设策略
支持8种调整策略:--to-earliest:调整至分区当前最小位移。--to-latest:调整至分区当前最新位移。--to-current:调整至分区当前位移(无变化)。--to-offset <offset>:指定特定位移值。--shift-by N:位移调整量,可正负值(负值表示向前移动)。--to-datetime <yyyy-MM-ddTHH:mm:ss.xxx>:调整至指定时间点的最早位移。--by-duration <PnDTnHnMnS>:调整至距离当前时间指定时间段的位移。--from-file <file>:从CSV文件读取调整策略。确定执行方案
支持三种执行方式:--execute:执行位移调整。--export:将调整方案生成CSV文件,便于后续使用。bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
所有分区位移重置为0。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-latest --execute
所有分区位移调整至最新位置(如1000000)。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
所有分区位移调整至500000。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
位移保持不变(此操作无实际效果)。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
所有分区位移向前移动100000。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000 --execute
所有分区位移调整至2017年8月4日14:30之后的最早位移。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --by-duration PT0H30M0S --execute
所有分区位移调整至30分钟前的最早位移(如0)。
通过kafka-consumer-groups.sh脚本,用户可以轻松完成消费者组位移调整。在实际操作中,可根据具体需求选择合适的策略和参数,实现高效的消费者管理。
转载地址:http://kmefk.baihongyu.com/