代码之家  ›  专栏  ›  技术社区  ›  FBryant87

无法使用关于卡夫卡主题的信息?

  •  2
  • FBryant87  · 技术社区  · 6 年前

    我已经看过几个合流的例子了。卡夫卡的客户( https://github.com/confluentinc/confluent-kafka-dotnet/ ),虽然我可以成功地让制作人将消息推送到卡夫卡,但我无法将任何消息拉回到消费者那里。

    通过UI,我可以看到主题已经创建,消息正在进入这个主题(目前有10个分区和3条消息),但是我的消费者总是报告“分区结束”,而没有任何消息的消耗(3条仍在主题上,“OnMessage”从不触发)。

    不过,使用者肯定正在访问主题,并且可以在其中一个分区上看到3条消息:

    分区结束:dotnet测试主题[6]@3

    它只是不使用消息并触发OnMessage()。有什么想法吗?

    var conf = new Dictionary<string, object> 
    { 
        { "group.id", Guid.NewGuid().ToString() },
        { "bootstrap.servers", "mykafkacluster:9094" },
        { "sasl.mechanisms", "SCRAM-SHA-256" },
        { "security.protocol", "SASL_SSL" },
        { "sasl.username", "myuser" },
        { "sasl.password", "mypass" }
    };
    
    using (var producer = new Producer<string, string>(conf, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
    {
        producer.ProduceAsync("dotnet-test-topic", "some key", "some value")
                .ContinueWith(result => 
                {
                    var msg = result.Result;
                    if (msg.Error.Code != ErrorCode.NoError)
                    {
                        Console.WriteLine($"failed to deliver message: {msg.Error.Reason}");
                    }
                    else 
                    {
                        Console.WriteLine($"delivered to: {result.Result.TopicPartitionOffset}");
                    }
                });
    
        producer.Flush(TimeSpan.FromSeconds(10));
    }
    
    using (var consumer = new Consumer<string, string>(conf, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
    { 
        consumer.Subscribe("dotnet-test-topic");
    
        consumer.OnConsumeError += (_, err)
            => Console.WriteLine($"consume error: {err.Error.Reason}");
    
        consumer.OnMessage += (_, msg)
            => Console.WriteLine($"consumed: {msg.Value}");
    
        consumer.OnPartitionEOF += (_, tpo)
            => Console.WriteLine($"end of partition: {tpo}");
    
        while (true)
        {
            consumer.Poll(TimeSpan.FromMilliseconds(100));
        }  
    }
    
    1 回复  |  直到 6 年前
        1
  •  4
  •   FBryant87    6 年前

    如果不提供以下配置,OnMessage事件将不会触发:

    { "auto.offset.reset", "smallest" }
    

    加上这个,我就可以阅读关于这个主题的信息了。