合流Kafka库(本例中为python版本)有一个product方法,该方法接受传递回调函数:
kafka_producer.produce(topic=topic, key=key, value=value, on_delivery=delivery_callback)
无论消息是否成功传递,都会调用此回调 还是不 :
def delivery_callback(err, msg):
相反,每100条左右,我就依靠 flush()
flush()
messages_outstanding = kafka_producer.flush() if messages_outstanding == 0: //continue to the next batch of 100 else: //produce the batch again
将 无法生成任何消息的帐户?(报告为错误) delivery_callback
delivery_callback
换句话说,, 我能肯定吗 刷新() 如果任何消息失败,则不会返回零 ?
刷新()
确认了以下结果:
使命感 .flush()
.flush()
从我们的角度来看,整件事令人惊讶地尴尬。如果您不能承受消息丢失的代价,则需要检测传递回调何时失败,并实现某种形式的重试逻辑来覆盖失败的消息。