代码之家  ›  专栏  ›  技术社区  ›  DerM

Spring Kafka制作人未发送到Kafka 1.0.0(Magic v1不支持记录头)

  •  12
  • DerM  · 技术社区  · 8 年前

    我正在使用此docker compose设置在本地设置Kafka: https://github.com/wurstmeister/kafka-docker/

    docker-compose up 很好,通过shell创建主题也很好。

    现在我试着通过 spring-kafka:2.1.0.RELEASE

    启动Spring应用程序时,它会打印卡夫卡的正确版本:

    o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
    o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d
    

    我试着发出这样的信息

    kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
    

    客户端发送失败

    UnknownServerException: The server experienced an unexpected error when processing the request
    

    在服务器控制台中,我得到消息 Magic v1不支持记录头

    Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
    java.lang.IllegalArgumentException: Magic v1 does not support record headers
    

    谷歌搜索表明存在版本冲突,但版本似乎合适( org.apache.kafka:kafka-clients:1.0.0 在类路径中)。

    有什么线索吗?谢谢

    编辑: 我缩小了问题的范围。发送普通字符串是可行的,但通过JsonSerializer发送Json会导致给定的问题。以下是我的生产者配置的内容:

    @Value("\${kafka.bootstrap-servers}")
    lateinit var bootstrapServers: String
    
    @Bean
    fun producerConfigs(): Map<String, Any> =
            HashMap<String, Any>().apply {
                // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
                put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
                put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
                put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
            }
    
    @Bean
    fun producerFactory(): ProducerFactory<String, MyClass> =
            DefaultKafkaProducerFactory(producerConfigs())
    
    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
            KafkaTemplate(producerFactory())
    
    4 回复  |  直到 8 年前
        1
  •  23
  •   Vasyl Sarzhynskyi    6 年前

    我也有类似的问题。如果我们使用 JsonSerializer JsonSerde 对于值。 为了防止此问题,我们需要禁用添加信息头。

    如果您对默认json序列化很满意,那么可以使用以下命令(这里的关键点是 ADD_TYPE_INFO_HEADERS ):

    Map<String, Object> props = new HashMap<>(defaultSettings);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
    

    但如果你需要定制 JsonSerializer公司 具有特定 ObjectMapper (类似于 PropertyNamingStrategy.SNAKE_CASE ),应在上显式禁用添加信息标题 JsonSerializer公司 ,就像卡夫卡忽视的那样 DefaultKafkaProducerFactory 的属性 添加\u TYPE\u INFO\u标题 (对我来说,这是一个糟糕的春季卡夫卡设计)

    JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
    valueSerializer.setAddTypeInfo(false);
    ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
    

    或者如果我们使用 杰森塞德 ,然后:

    Map<String, Object> jsonSerdeProperties = new HashMap<>();
    jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
    jsonSerde.configure(jsonSerdeProperties, false);
    
        2
  •  2
  •   DerM    8 年前

    解决了的。问题既不在于代理、一些docker缓存,也不在于Spring应用程序。

    问题是一个控制台使用者,我并行使用它进行调试。这是一个“老”消费者 kafka-console-consumer.sh --topic=topic --zookeeper=...

    启动时,它实际上会打印一条警告: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

    “新”消费者 --bootstrap-server 应使用选项(尤其是在将Kafka 1.0与JsonSerializer一起使用时)。 注意:在这里使用旧的消费者确实会影响生产者。

        3
  •  1
  •   Gary Russell    8 年前

    我刚刚对docker的图像进行了测试,没有任何问题。。。

    $docker ps
    
    CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
    f093b3f2475c        kafkadocker_kafka        "start-kafka.sh"         33 minutes ago      Up 2 minutes        0.0.0.0:32768->9092/tcp                              kafkadocker_kafka_1
    319365849e48        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   33 minutes ago      Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1
    

    .

    @SpringBootApplication
    public class So47953901Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So47953901Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
            return args -> template.send("foo", "bar", "baz");
        }
    
        @KafkaListener(id = "foo", topics = "foo")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    

    .

    spring.kafka.bootstrap-servers=192.168.177.135:32768
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=false
    

    .

    2017-12-23 13:27:27.990  INFO 21305 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
    baz
    

    编辑

    对我来说仍然有效。。。

    spring.kafka.bootstrap-servers=192.168.177.135:32768
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    

    .

    2017-12-23 15:27:59.997  INFO 44079 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
        acks = 1
        ...
        value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
    
    ...
    
    2017-12-23 15:28:00.071  INFO 44079 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
    baz
    
        4
  •  0
  •   Kshitiz Agarwal    5 年前

    您正在使用卡夫卡版本<=0.10.x、 x个 一旦使用它,就必须设置JsonSerializer。将\u TYPE\u INFO\u标题添加为false,如下所示。

    Map<String, Object> props = new HashMap<>(defaultSettings);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
    

    为您的生产者工厂属性。

    如果您使用的是卡夫卡版本>0.10.x、 x,应该很好用