代码之家  ›  专栏  ›  技术社区  ›  visa

Kafka streams应用程序无法在一台机器上扩展更多的CPU内核和线程

  •  0
  • visa  · 技术社区  · 7 年前

    我正在编写一个kafka streams应用程序,它基本上从avro记录中提取两种类型的密钥,并在指定的窗口中对它们进行计数。它应该每秒处理约6k个事件。

    • c4.8xlarge 举例 num.stream.threads = 20 线程(输入主题的分区数)每秒仅消耗约2.5k个事件
    • 同一实例 num.stream.threads = 10
    • c4.2xlarge num.stream.threads = 5 每秒最多消耗10-25k个事件

    以下是我的streams配置:

    kafka.streaming {  
      compression.type = "lz4"
      acks = 1
      retries = 1
    
      // I care about throughput more than about latency
      max.poll.records = 6000
      fetch.min.bytes = 3300000 // 6000 * 550 (average record size)
      fetch.max.wait.ms = 1000 // we get 6000 records in 1 second
      batch.size = 165000 // (6000 / 20) * 550
      linger.ms = 1000
    }
    

    代理版本: 0.10.2.1

    卡夫卡流版本: 1.1.1.

    许多EC2实例可以解决可伸缩性问题,但我希望在单个实例上运行我的应用程序,因为聚合必须通过交互式查询公开,我不想开发RPC层。

    UPD:流定义

    signalStream
      .map[EventDetailsGroup, java.lang.Short]((_, v) => new KeyValue(extractEventDetailsGroup(v), Short.box(1)))
      .groupByKey(Serialized.`with`(eventDetailsSerde, Serdes.Short()))   
      .windowedBy(TimeWindows.of(30 * 60 * 1000).advanceBy(60 * 1000))   
      .count(Materialized.as("store-name").withCachingDisabled().withLoggingDisabled())
    
    0 回复  |  直到 7 年前