我正在测试卡夫卡的弹性(Apache;Kafka_2.12-1.1.0)。我所期望的是,当节点崩溃时,主题的ISR应该自己增加(即复制到可用节点)。我花了4天时间搜索可能的解决方案,但没有用。
拥有3个节点集群,并使用docker(wurstmeister)创建了3个代理,其中3个动物园管理员(1node=1broker+1个动物园管理员)
已在server.properties中更新以下内容
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
min.insync.replicas=2
default.replication.factor=3
启动了所有代理;等待了一分钟;创建了带有复制3的主题,分钟同步复制2
bin/kafka-topics.sh --create --zookeeper 172.31.31.142:2181,172.31.26.102:2181,172.31.17.252:2181 --config 'min.insync.replicas=2' --replication-factor 3 --partitions 1 --topic test2
当我描述这个主题时,我看到下面的数据
bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test2
Topic:test2 PartitionCount:1 ReplicationFactor:3 Configs:min.insync.replicas=2
Topic: test2 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
到目前为止还不错,现在我开始调查;接下来是制片人。当消费处于全速状态时,我会杀死经纪人。现在,当我描述相同的主题时,我看到下面的内容([编辑-1])
bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test2
Topic:test2 PartitionCount:1 ReplicationFactor:3 Configs:min.insync.replicas=2
Topic: test2 Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3,1
bash-4.4# bin/kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic __consumer_offsets
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,3
Topic: __consumer_offsets Partition: 1 Leader: 3 Replicas: 2,3,1 Isr: 1,3
.. .. ..
[编辑结束-1]
我让卡夫卡的制片人、消费者持续几分钟;
问题1:当代理2关闭时,为什么副本仍然显示2?
现在我又向集群添加了两个代理。当生产商、消费者继续观察ISR时,ISR复制品的数量不会增加,只会增加到3.1个。
问题2:为什么ISR没有增加,即使有两个以上的经纪人可供选择?
.
然后我停止了生产者、消费者;等了几分钟;再次运行描述命令——仍然是相同的结果。
ISR何时扩展其复制?.如果还有2个可用节点,为什么ISR不复制?
我的制片人如下
props.put("acks", "all");
props.put("retries", 4);
props.put("batch.size", new Integer(args[2]));// 60384
props.put("linger.ms", new Integer(args[3]));// 1
props.put("buffer.memory", args[4]);// 33554432
props.put("bootstrap.servers", args[6]);// host:port,host:port,host:port etc
props.put("max.request.size", "10485760");// 1048576
和消费者如下
props.put("group.id", "testgroup");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", args[2]);// 1000
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("max.partition.fetch.bytes", args[3]);// 52428800
props.put("fetch.max.bytes", args[4]);// 1048576
props.put("fetch.message.max.bytes", args[5]);// 1048576
props.put("bootstrap.servers", args[6]);
props.put("max.poll.records", args[7]);
props.put("max.poll.interval.ms", "30000");
props.put("auto.offset.reset", "latest");
在另一个实验中,当我移除另一个代理时,我开始看到同步复制总数小于所需的最小值的错误。令人惊讶的是,在这种状态下,生产者没有被阻塞;但是我在broker server.log上看到了错误。没有新邮件进入队列。
问题4:不应该阻止生产者吗?而不是在代理端抛出错误?
还是我的理解错误?
有什么帮助吗?