
94
|
第
5
章
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
➌
String topic = e.getKey().topic();
int partition = e.getKey().partition();
long committedOffset = e.getValue().offset();
long latestOffset = latestOffsets.get(e.getKey()).offset();
System.out.println("Consumer group " + CONSUMER_GROUP
+ " has committed offset " + committedOffset
+ " to topic " + topic + " partition " + partition
+ ". The latest offset in the partition is "
+ latestOffset + " so consumer group is "
+ (latestOffset - committedOffset) + " records behind");
}
➊
获取消费者群组读取的所有主题和分区,以及每个分区最新的提交偏移量。与
describeConsumerGroups
不同,
listConsumerGroupOffsets ...