我已经看过几个合流的例子了。卡夫卡的客户(
https://github.com/confluentinc/confluent-kafka-dotnet/
),虽然我可以成功地让制作人将消息推送到卡夫卡,但我无法将任何消息拉回到消费者那里。
通过UI,我可以看到主题已经创建,消息正在进入这个主题(目前有10个分区和3条消息),但是我的消费者总是报告“分区结束”,而没有任何消息的消耗(3条仍在主题上,“OnMessage”从不触发)。
不过,使用者肯定正在访问主题,并且可以在其中一个分区上看到3条消息:
分区结束:dotnet测试主题[6]@3
它只是不使用消息并触发OnMessage()。有什么想法吗?
var conf = new Dictionary<string, object>
{
{ "group.id", Guid.NewGuid().ToString() },
{ "bootstrap.servers", "mykafkacluster:9094" },
{ "sasl.mechanisms", "SCRAM-SHA-256" },
{ "security.protocol", "SASL_SSL" },
{ "sasl.username", "myuser" },
{ "sasl.password", "mypass" }
};
using (var producer = new Producer<string, string>(conf, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
{
producer.ProduceAsync("dotnet-test-topic", "some key", "some value")
.ContinueWith(result =>
{
var msg = result.Result;
if (msg.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"failed to deliver message: {msg.Error.Reason}");
}
else
{
Console.WriteLine($"delivered to: {result.Result.TopicPartitionOffset}");
}
});
producer.Flush(TimeSpan.FromSeconds(10));
}
using (var consumer = new Consumer<string, string>(conf, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{
consumer.Subscribe("dotnet-test-topic");
consumer.OnConsumeError += (_, err)
=> Console.WriteLine($"consume error: {err.Error.Reason}");
consumer.OnMessage += (_, msg)
=> Console.WriteLine($"consumed: {msg.Value}");
consumer.OnPartitionEOF += (_, tpo)
=> Console.WriteLine($"end of partition: {tpo}");
while (true)
{
consumer.Poll(TimeSpan.FromMilliseconds(100));
}
}