我的消费者没有从主题中获得任何记录。
消费者和生产者的主题是一样的。我可以在broker的UI上看到我的主题。
对于消费者,我还将group.id设置为与主题名称相同。
我可以在broker UI中看到生产者发送的消息,但消费者无法获得任何记录。
我的ConsumerReblanceLister似乎不起作用。我看不出有人叫它。
这是我的制作人:
public class BotKafkaProducer implements Runnable {
private BotLogger log;
private Set<NewsLetter> newsLetters;
private String topicName;
private Bot bot;
private News news;
@Setter
private NewsDao newsDao;
@Setter
private AtomicInteger sentNews;
private KafkaProducer<String, String> kafkaProducer;
public BotKafkaProducer(BotLogger log, Set<NewsLetter> newsLetters, String topicName,
String pathPropKafka, Bot bot, News news) {
this.log = log;
this.newsLetters = newsLetters;
this.topicName = topicName;
this.bot = bot;
this.news = news;
try {
Properties props = PropertiesHelper.getProperties(pathPropKafka);
log.info("Success creating Properties for kafka producer.");
this.kafkaProducer = new KafkaProducer<String, String>(props);
} catch (Exception e){
log.error("Error creating Properties for kafka producer.");
log.error(e.getMessage());
}
}
@Override
public void run() {
log.info("producer start, queue name : " + topicName);
try {
log.info("Success creating KafkaProducer .");
for (NewsLetter newsLetter : newsLetters) {
String value = new Gson().toJson(newsLetter);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, value);
kafkaProducer.send(producerRecord, (recordMetadata, e) -> {
if (e == null) {
log.info("Received new metadata. \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
log.error("Error while producing" + e.getMessage());
}
});
if(sentNews.get()%2000 == 0) {
this.kafkaProducer.flush();
}
sentNews.getAndIncrement();
}
this.kafkaProducer.flush();
log.info(sentNews.get() + " news letters committed.");
log.info("updating news status...");
news.setBotStatus(bot.toString(), false);
log.info(bot.toString() + " set bot status " + news.isBotStatus(bot.toString()));
newsDao.updateStatus(bot.toString(), news);
log.info("News status updated for " + bot);
}catch (Exception e){
if(kafkaProducer != null)
this.kafkaProducer.close();
} finally {
if(kafkaProducer != null)
this.kafkaProducer.close(Duration.ofMillis(500));
}
}
}
这是我的消费者:
public final class BotKafkaConsumer implements Runnable, ExceptionListener {
private final BotLogger log;
private final String topicName;
private FacebookSender facebookSender;
private TelegramSender telegramSender;
private ViberSender viberSender;
private volatile LocalTime startSendTime;
private volatile LocalTime endSendTime;
private AtomicInteger send;
private AtomicInteger notSend;
private AtomicInteger producerSent;
private LocalDateTime startProcess;
private KafkaConsumer<String, String> kafkaConsumer;
private BotKafkaConsumer(Bot bot, BotLogger log, String topicName, String pathPropKafka) {
this.log = log;
this.topicName = topicName;
Properties props;
try {
props = PropertiesHelper.getProperties(pathPropKafka);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-id-" + topicName);
log.info("Success creating Properties for kafka consumer.");
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
this.startSendTime = bot.getStartSendTime();
this.endSendTime = bot.getEndSendTime();
} catch (Exception e){
log.error("Error creating Properties for kafka consumer.");
log.error(e.getMessage());
}
}
public BotKafkaConsumer(Bot bot, BotLogger log, String topicName, String pathPropKafka, Object sender,
AtomicInteger send, AtomicInteger notSend, AtomicInteger producerSent, LocalDateTime startProcess){
this(bot, log, topicName, pathPropKafka);
this.send = send;
this.notSend = notSend;
this.producerSent = producerSent;
this.startProcess = startProcess;
if(sender instanceof FacebookSender)
this.facebookSender = (FacebookSender)sender;
if(sender instanceof TelegramSender)
this.telegramSender = (TelegramSender)sender;
if(sender instanceof ViberSender)
this.viberSender = (ViberSender)sender;
}
public LocalTime getStartSendTime() {
return startSendTime;
}
public LocalTime getEndSendTime() {
return endSendTime;
}
private boolean isNotTimeToSend() {
return ((startSendTime != null && startSendTime.isAfter(LocalTime.now())) ||
(endSendTime != null && endSendTime.isBefore(LocalTime.now())));
}
@Override
public void run() {
final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Starting exit...");
this.kafkaConsumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
try{
subscribeConsumer();
if (facebookSender != null) {
send(this.kafkaConsumer, facebookSender, log);
} else if (telegramSender != null) {
send(this.kafkaConsumer, telegramSender, log);
} else if (viberSender != null) {
send(this.kafkaConsumer, viberSender, log);
}
} catch (Exception e){
log.error("BotKafkaConsumer error : " + e.getMessage());
} finally {
log.warn("BotKafkaConsumer closed .");
if(this.kafkaConsumer != null) {
this.kafkaConsumer.close(Duration.ofMillis(500));
}
}
}
@Override
public void onException(JMSException e) {
log.error("consumer : " + this.toString() + " error : " + e.getMessage());
}
private void send(KafkaConsumer<String, String> consumer, TelegramSender telegramSender, BotLogger log) throws JMSException {
ConsumerRecords<String, String> records;
Gson gson = new Gson();
TelegramDto telegramBody = new TelegramDto();
InlineKeyboard inlineKeyboard = null;
synchronized(this) {
while (true) {
records = consumer.poll(Duration.ofMillis(500));
records.partitions().forEach(tp -> {
log.info("Topic : " + tp.topic() + " TP : " + tp.partition());
});
for (ConsumerRecord<String, String> record : records) {
NewsLetter newsLetter = getNewsLetter(record, gson);
String id;
News news;
try {
id = newsLetter.getUserId();
news = newsLetter.getNews();
String text = replaceForTelegram(news.getText());
String buttonText = news.getButtonText();
if (StringUtils.isNotBlank(buttonText)) {
String buttonUrl = news.getButtonUrl();
if (StringUtils.isNotBlank(buttonUrl)) {
inlineKeyboard = new InlineKeyboard().addRow()
.addButton(0, new InlineKeyboardButton(buttonText).setUrl(buttonUrl));
telegramBody.setReplyMarkup(inlineKeyboard);
}
}
if (news.getImageUrl() != null) {
String imageUrl = "(" + news.getImageUrl() + ")";
text = "[\u200B]" + imageUrl + text;
}
telegramBody.setChatId(id)
.setText(text)
.setMessageType(TelegramSendType.SEND_MESSAGE);
boolean result = telegramSender.sendMessage(telegramBody);
processResult(result);
log.info("send.get() + notSend.get()" + (send.get() + notSend.get()) +
" producerSent.get()" + producerSent.get());
if((send.get() + notSend.get()) >= producerSent.get()) {
break;
}
}catch (Exception e){
log.error(e.getMessage());
}
}
if (isNotTimeToSend()) {
log.info("is not time to send is : " + isNotTimeToSend());
producerSent.set(0);
break;
}
if((send.get() + notSend.get()) >= producerSent.get()) {
LocalDateTime endProcess = LocalDateTime.now();
log.info("TELEGRAM PRODUCER TOTAL NEWS are sent : " + producerSent.get() +
" by Producer: " + topicName);
log.info("TELEGRAM CONSUMER TOTAL NEWS are sent : " + send + ", TOTAL NOT sent : " + notSend +
" by Consumer: " + topicName);
log.info("START consumerNews process : " + startProcess.toString());
log.info("END consumeNews process : " + endProcess.toString());
log.info("TOTAL TIME FOR SENDING : " + ChronoUnit.MINUTES.between(startProcess, endProcess) +
" min " + ChronoUnit.SECONDS.between(startProcess, endProcess) + " sec " +
ChronoUnit.MILLIS.between(startProcess, endProcess) + " ms");
producerSent.set(0);
break;
}
}
}
}
private void send(KafkaConsumer<String, String> consumer, ViberSender viberSender, BotLogger log) throws JMSException {
}
private void send(KafkaConsumer<String, String> consumer, FacebookSender facebookSender, BotLogger log) throws JMSException {
}
private void subscribeConsumer() {
this.kafkaConsumer.subscribe(Collections.singletonList(topicName), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
log.info("Revoke partitions : " + formatPartitions(collection));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
collection.forEach(tp -> {
long offset = kafkaConsumer.position(tp);
log.info("Topic : " + tp.topic() + " Partition : " + tp.partition() +
" Offset : " + offset);
});
kafkaConsumer.resume(collection);
}
@Override
public void onPartitionsLost(Collection<TopicPartition> collection){
collection.forEach(tp -> {
long offset = kafkaConsumer.position(tp);
log.info("Topic : " + tp.topic() + " Partition : " + tp.partition() +
" Offset : " + offset);
});
kafkaConsumer.resume(collection);
}
});
}
private void processResult(boolean result){
if (result) {
send.getAndIncrement();
} else {
notSend.getAndIncrement();
}
}
private NewsLetter getNewsLetter(ConsumerRecord<String, String> record, Gson gson) throws JMSException {
String text = record.value();
return gson.fromJson(text, NewsLetter.class);
}
private String replaceForTelegram(String text) {
return text.replace("_", "\\_")
.replace("`", "\\`")
.replace("*", "\\*");
}
private static List<String> formatPartitions(Collection<TopicPartition> partitions) {
return partitions.stream().map(topicPartition ->
String.format("topic: %s, partition: %s", topicPartition.topic(), topicPartition.partition()))
.collect(Collectors.toList());
}
}
这是我用来设置经纪人、生产者和消费者的属性:
# The location of the Kafka server
bootstrap.servers=50-kafka-a:9092
#count of replicas of brokers
replication.factor=3
#max size of message on broker side
#max.message.bytes=10485880
#replica.fetch.max.bytes=10485880
#max size of message on consumer side
#max.partition.fetch.bytes=10485880
broker.id=1
zookeeper.connect=50-kafka-a:2181
##### PRODUCER SETTINGS ######
#to show how many confirmations have to get producer - in our case it is all
acks=all
#it means that two brokers which are async replicas include leader have to answer that they have data
#min.insync.replicas=2
#type of compression message on side of producer
compression.type=snappy
#the count of retry for trying to send message
retries=3
#timeout of producer after that message will be failed
delivery.timeout.ms=120000
#max size of message on the producer side
#max.request.size=10485880
# classes for serializing ...
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
##### CONSUMER SETTINGS ######
#max size of messages which will be return after calling one call poll()
max.poll.records=1000
# Make Kafka keep track of record reads by the consumer
enable.auto.commit=true
# The time in milliseconds to Kafka write the offset of the last message read
auto.commit.interval.ms=500
#it is need when the start offset is absent or if current offset is not exist more
auto.offset.reset=earliest
#auto.offset.reset=latest
#max size of message on the consumer side
#fetch.max.bytes=10485880
# ... and deserializing messages
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
并且依赖性是:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>