消费端出现offset重置为latest, earliest现象,类似log:
(org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse:595) - Fetch offset 2261734 is out of range, resetting offset
原因:该consumer消费的topic的leader和followers的状态不一致时,发生leader切换,会发生offset out of range,此时consumer进行消费时发现offset非法,会进行offset重置
在测试环境中创建一个topic test,3个分区,2个副本,broker 1为leader,broker 2为follower。
写入一些数据消费端进行正常消费,消费至最新状态A处之后,将test的follower的broker2停掉,继续向test写入数据,并让consumer再次消费至最新状态B处。此时停掉的follower broker 2的状态已和leader broker 1的状态不一样,已滞后leader的状态。
现在将follower broker 2启动,此时follower和leader的状态不一样,follower需要和leader进行同步,但当follower与leader未同步成功之前将leader broker 1停掉,然后follower broker 2经过leader的选举机制被迫选为leader(unclean.leader.election.enable为true时,选择第一个启动的副本为leader),在被选举为leader之前broker 2的状态并没有和broker 1的状态一致,也就是说broker 2上的LEO并没有同步到B处,而broker 2被选举为了leader,此时producer继续向topic中写数据,写入之后consumer会进行消费,consumer需要消费的offset的B,而broker 2的LEO并没有同步到B处,此时就发生了out of ranger,offset被重置。
优化方案
针对以上情况我们进行了一下修改:
1. 将topic的副本数设置为3(原先为2),减少ISR列表只有一个leader的几率 2. 调整min.insync.replicas
为2(默认是1),此参数的意思是当ISR中的个数小于此值时,producer无法写入数据,会抛出异常。此参数还需要结合acks来使用,需要将acks设置为all或者-1(flume中kafkaChannel默认是all)。 3. 调整unclean.leader.election.enable
参数为false(默认为true),此参数标识当ISR中没有副本时,选举最早启动的broker为leader。 版本0.11中已经修改默认值为false 4. 集群因为维护需要重启时,先停掉一台broker,然后重启该broker,等到该broker已加入到ISR中之后,再对其它broker进行如上操作,切勿单个broker依次重启。