代码之家  ›  专栏  ›  技术社区  ›  eric zhao

如何定义StreamsBuilderFactoryBean的两个实例

  •  0
  • eric zhao  · 技术社区  · 7 年前

    我是春天卡夫卡的新人。出于某种原因,我想创建两个StreamsBuilderFactoryBean,如您所见,我定义了两个StreamsBuilderFactoryBean一个名为“ commonDSLBuilder “另一个是” propertyDSLBuilder “与 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4) . 所以” commonDSLBuilde “只创建一个消费者,但是” 房地产开发商 “创造四个消费者。

     @Configuration
    @EnableKafka
    public class KafkaStreamsConfig {
        private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
    
        @Value("${spring.kafka.stream.application-id}")
        private String applicationId;
    
        @Bean(name = "commonDSLBuilder")
        public StreamsBuilderFactoryBean commonDSLBuilder() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            StreamsConfig streamsConfig = new StreamsConfig(props);
            StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
            streamsBuilder.setSingleton(Boolean.FALSE);
            return streamsBuilder;
        }
    
        @Bean(name = "propertyDSLBuilder")
        public StreamsBuilderFactoryBean propertyDSLBuilder() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
            StreamsConfig streamsConfig = new StreamsConfig(props);
            CleanupConfig cleanupConfig = new CleanupConfig(Boolean.TRUE, Boolean.TRUE);
            StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig, cleanupConfig);
            streamsBuilder.setSingleton(Boolean.FALSE);
            return streamsBuilder;
        }
    }
    

    我用的是“ 通用生成器 “像这样

    @Configuration
    public class BindPostDSL {
    
        private static final Logger log = LoggerFactory.getLogger(BindPostDSL.class);
    
        @Autowired
        @Qualifier("commonDSLBuilder")
        private StreamsBuilder builder;
    
        @Bean(name = "bindPostKStream")
        public KStream<String, String> kStream() {
            log.info("bind 事件处理启动");
                KStream<String, String> stream = builder.stream("test");
                stream.foreach((key, value) -> {
                    log.info("receive kafka bind post,key:{},value:{}", key, value);
                });
                return stream;
        }
    }
    

    但是当我启动应用程序时,5个消费者(1个来自 通用生成器 +4从 房地产开发商

    2018-08-06 10:34:12 [test-streams-827fcc9b-3b9a-46f3-a941-961033cdb2cf-StreamThread-1] INFO  StreamThread:336 - stream-thread [test-streams-827fcc9b-3b9a-46f3-a941-961033cdb2cf-StreamThread-1] Starting
    2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-2] INFO  StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-2] Starting
    2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-4] INFO  StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-4] Starting
    2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-3] INFO  StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-3] Starting
    2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-5] INFO  StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-5] Starting
    
    1 回复  |  直到 7 年前
        1
  •  2
  •   Vasyl Sarzhynskyi    7 年前

    你的方向是对的。 你需要两个豆子 StreamsBuilderFactoryBean 两颗豆子 KStream . 每个KStream都有特定的StreamsBuilderFactoryBean。你不需要调用 setSingleton(Boolean.FALSE); 在streamsBuilder上。

    @Bean
    public FactoryBean<StreamsBuilder> commonDSLBuilder() {
        ...
        StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
        return streamsBuilder;
    }
    
    @Bean
    public FactoryBean<StreamsBuilder> propertyDSLBuilder() {
        ...
        StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig, cleanupConfig);
        return streamsBuilder;
    }
    
    @Bean
    public KStream<String, String> bindKStream(StreamsBuilder commonDSLBuilder) {
        KStream<String, String> kStream = commonDSLBuilder.stream("commonTopicName");
        kStream.foreach((key, value) -> { ...  });
        return kStream;
    }
    
    @Bean
    public KStream<String, String> perperyKStream(StreamsBuilder propertyDSLBuilder) {
        KStream<String, String> kStream = propertyDSLBuilder.stream("propertyTopicName");
        kStream.foreach((key, value) -> { ... });
        return kStream;
    }
    
    推荐文章