代码之家  ›  专栏  ›  技术社区  ›  Juvenik

将kafka与应用程序分离

  •  0
  • Juvenik  · 技术社区  · 6 年前

    我有一个接收大量get请求的应用程序(5分钟内大约250000个请求)。应用程序解析查询参数并发布到kafka。要发布的代码如下:

    public class KafkaProcessor {
    
      private static final String BATCH_SIZE = "batch.size";
      private static final String REQUEST_REQUIRED_ACKS = "request.required.acks";
      private static final String PRODUCER_TYPE = "producer.type";
      private static final String VALUE_SERIALIZER = "value.serializer";
      private static final String KEY_SERIALIZER = "key.serializer";
      private static final String METADATA_BROKER_LIST = "bootstrap.servers";
      private static final String MAX_BLOCK_MS = "max.block.ms";
      private static final String KAFKA_ENABLED = "enabled";
    
      private static Properties props = new Properties();
      private static KafkaProducer<String, String> producer;
      private static ProducerRecord<String, String> producerRecord;
      private static String topic;
    
    
      static {
        boolean isEnabled = Boolean.parseBoolean(ResourceProps.INSTANCE.getKafkaProps(KAFKA_ENABLED));
        if (isEnabled) {
          //Setting up a producer configuration.
          props.put(METADATA_BROKER_LIST, "x.x.x.x:9092,y.y.y.y:9092");
          props.put(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
          props.put(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");
          props.put(PRODUCER_TYPE, "async");
          props.put(REQUEST_REQUIRED_ACKS, "1");
          props.put(BATCH_SIZE, "1000");
          props.put(MAX_BLOCK_MS, "10000");
          producer = new KafkaProducer<>(props);
          topic = "pixel-server";
        }
      }
    
    
      private static void publishToKafka(JSONObject data) {
          producerRecord = new ProducerRecord<String, String>(topic, data.toString());
          producer.send(producerRecord, new Callback() {
            @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
              if (exception != null) {
                exception.printStackTrace();
              }
            }
          });
      }
    }
    

    我的应用程序托管在aws实例中。kafka服务器也在另一台aws机器中。

    但是,如果kafka关闭或kafka由于任何原因响应缓慢,则我的应用程序将冻结,无法进一步处理任何请求。我想知道如何使我的应用程序独立于卡夫卡,也就是说,如果卡夫卡关闭(或响应缓慢),那么它不应该影响我的应用程序。

    我尝试了两种方法,比如如果kafka给出了超时,那么计算超时异常的数量并停止发布到kafka,但是由于请求量非常大,所以到了这个时候,我得到了任何超时异常,我的应用程序就会冻结。

    任何帮助或指示将不胜感激。

    我用的是卡夫卡0.8.2。我的服务器在Vertx。ubuntu中使用的操作系统。ulimit设置为max。

    1 回复  |  直到 6 年前
        1
  •  0
  •   Everv0id    6 年前

    假设您的kafka集群中有三个或更多节点(这对于任何高负载应用程序都至关重要),您可以尝试一些技巧:

    1. 试着设置 acks 生产者配置到 0 是的。这将影响应用程序的一致性(有些消息可以在生产者端丢弃,并将永远丢失)。文件上说:

      如果设置为零,则生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区,并被视为已发送。在这种情况下,不能保证服务器已经收到记录

    2. max.block.ms 生产者配置到 0个 是的。这将导致应用程序在每次发送到Kafka群集时立即抛出TimeoutException,而不会出现任何阻塞, 但仅在内存缓冲区溢出时 是的。 注意,它只影响客户端阻塞,而不影响网络调用!

    3. 减少 request.timeout.ms 小值(如 10 100 )中。这将导致Kafka客户端对任何需要时间超过 request.timeout.ms请求超时 是的。

    还有一些建议:

    1. 将kafka实例更新到最新版本以获得更稳定的集群;

    2. 为了获得高可用性,您的kafka集群必须至少包含三个节点(并且总是要避免奇数个节点 split-brain condition )

    3. 你应该试着玩 max.batch.size linger.ms producer配置以达到应用程序的最佳延迟吞吐量比率