目前,我在spring kafka中使用JSON来处理消息,这非常简单,几乎不需要编码:
@KafkaListener(topics = "foo.t",)
public void receive(Foo payload) {
LOG.info("received payload='{}'", payload);
doMagic(payload);
}
@KafkaListener(topics = "bar.t",)
public void receive(Bar payload) {
LOG.info("received payload='{}'", payload);
doMagic(payload);
}
# Kafka Config
spring.kafka.bootstrap-servers=broker.kafka
spring.kafka.consumer.group-id=some-app
spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.someapp.dto
这很有用,因为内容/类型信息是用JSON编码的,因此可以单独从字节恢复。(这也有一些问题,但它是有效的)
但是在protobuf中我没有这些元信息,或者至少我不知道在哪里可以找到它们。
有没有一种方法可以声明一个适用于多种类型的通用kafka MessageConverter,而不会把spring中所有漂亮的抽象/自动配置抛出窗外?
JSON/I的安全性问题与消息类型I的安全性有关(I)
我想避免这样的解决方案:
https://stackoverflow.com/a/46670801/4573065
.
选择
编写消息转换器/
Deserializer
@Override
public T deserialize(String topic, byte[] data) {
try {
return (T) Foo.parse(data);
} catch (Exception e) {
try {
return (T) Bar.parse(data);
} catch (Exception e1) {
throw e
}
}
}
然而,这可能会抵消我希望通过使用二进制格式获得的所有性能提升。
编辑:
我的制作人是这样的:
public void send(String message) {
kafkaTemplate.send("string.t", message);
}