代码之家  ›  专栏  ›  技术社区  ›  bohunn

Kafka Streams-无法搜索状态存储

  •  0
  • bohunn  · 技术社区  · 2 年前

    我有一个Kafka Stream应用程序,它使用两个状态存储。我在Strimzi集群上的Openshift上运行这个应用程序时遇到了问题(kafka:0.29.0-kafka-3.1.0)。

    这意味着当我收到一个bp地址记录时,它成功地从bp状态存储中获取了该记录,但从person状态存储中总是返回null。

    当person-addr记录进入流并试图从bp状态存储中检索条目时,它也返回null。

    尝试使用 .get() 具有精确键和的方法 .prefixScan() 。在这两种情况下,我都有这种奇怪的行为。

    我仔细检查了密钥、条目和状态存储状态,它们看起来很好。更重要的是,我的单元测试和本地docker kafka集群(cp kafka)一样工作正常。

    我尝试检查strimzi集群上的ACL权限,它们看起来也正确(将我的用户添加为超级用户进行检查)。两个不同的openshift名称空间是从头开始构建的。

    有人能告诉我什么可以检查吗?

        private StreamsBuilder buildTopology(KafkaStreamsProperty kafkaStreamsProperty,
                                             SpecificAvroSerde<JoinedPersonAddrV2> joinedPASerde,
                                             SpecificAvroSerde<JoinedBpAddrV2> joinedBASerde,
                                             SpecificAvroSerde<ObjectUpdateEvent> updateSerde
        ) {
            StreamsBuilder builder = new StreamsBuilder();
    
            final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaStreamsProperty.getSchemaRegistryUrl());
    
            joinedBASerde.configure(serdeConfig, false);
            joinedPASerde.configure(serdeConfig, false);
            updateSerde.configure(serdeConfig, false);
    
            KStream<String, JoinedPersonAddrV2> personAddrKStream = builder.stream(kafkaStreamsProperty.getPersonInputTopic(), Consumed.with(Serdes.String(), joinedPASerde));
            KStream<String, JoinedBpAddrV2> bpAddrKStream = builder.stream(kafkaStreamsProperty.getBpInputTopic(), Consumed.with(Serdes.String(), joinedBASerde));
    
            StoreBuilder<KeyValueStore<String, JoinedBpAddrV2>> bpStoreBuilder = Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore(BP_STORE_NAME),
                    Serdes.String(),
                    joinedBASerde
            );
    
            StoreBuilder<KeyValueStore<String, JoinedPersonAddrV2>> personStoreBuilder = Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore(PERSON_STORE_NAME),
                    Serdes.String(),
                    joinedPASerde
            );
    
            StoreBuilder<KeyValueStore<String, Long>> debounceStoreBuilder = Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore(DEBOUNCE_STORE),
                    Serdes.String(),
                    Serdes.Long()
            );
    
            builder.addStateStore(bpStoreBuilder);
            builder.addStateStore(personStoreBuilder);
            builder.addStateStore(debounceStoreBuilder);
    
            KStream<String, BpAddrOrPersonAddrV2> mergedStream = bpAddrKStream
                    .mapValues(value -> new BpAddrOrPersonAddrV2(null, value))
                    .merge(personAddrKStream.mapValues(value -> new BpAddrOrPersonAddrV2(value, null)));
    
            mergedStream.process(() -> new ContextualProcessor<String, BpAddrOrPersonAddrV2, String, ObjectUpdateEvent>() {
                @Override
                public void process(Record<String, BpAddrOrPersonAddrV2> record) {
                    // assign value to a variable
                    BpAddrOrPersonAddrV2 eitherValue = record.value();
                    boolean isHashUpdated = false;
    
                    // get state stores
                    KeyValueStore<String, JoinedBpAddrV2> bpAddrStateStore = context().getStateStore(BP_STORE_NAME);
                    KeyValueStore<String, JoinedPersonAddrV2> personAddrStateStore = context().getStateStore(PERSON_STORE_NAME);
    
                    // assign single Values
                    JoinedBpAddrV2 bpAddr = eitherValue.getBpAddr();
                    JoinedPersonAddrV2 personAddr = eitherValue.getPersonAddr();
    
                    // process if bp-addr record
                    if (bpAddr != null) {
                        String[] splittedKey = record.key().split(":");
                        String personIdAddrIdKey = String.format("%s:%s", splittedKey[1], splittedKey[2]);
    
                        boolean isEmittent = isEmittent(bpAddr);
    
                        if (isRegisteredOwnerId(bpAddr, splittedKey[1]) || isEmittent) {
    //                        personId, addrId, bpId
                            String revertedKey = String.format("%s:%s:%s", splittedKey[1], splittedKey[2], splittedKey[0]);
    
                            // get stored value from bp-addr state store
                            JoinedBpAddrV2 storedValue = bpAddrStateStore.get(revertedKey);
                            log.info("bpaddr entry storedValue: {}", storedValue);
                            if (storedValue == null) {
                                log.debug("{}, New BpAddr Table Entry for: {}, hash: {}", BP_STORE_NAME, record.key(), bpAddr.getHash());
                                bpAddrStateStore.put(revertedKey, bpAddr);
                                isHashUpdated = true;
                            } else if (!Objects.equals(storedValue.getHash(), bpAddr.getHash())) {
                                log.debug("{}, Update BpAddr Entry for key: {}, old hash: {}, new hash: {}", BP_STORE_NAME, record.key(), storedValue.getHash(), bpAddr.getHash());
                                bpAddrStateStore.put(revertedKey, bpAddr);
                                isHashUpdated = true;
                            } else {
                                log.debug("{}, No hash change do nothing for BpAddr key:{} hash value:{}", BP_STORE_NAME, record.key(), storedValue.getHash());
                            }
                        }
    
                        if (isHashUpdated) {
                            log.info("SEARCHING IN PERSON - ADDR STATE STORE WITH A KEY: {}", personIdAddrIdKey);
    
                            JoinedPersonAddrV2 matching = personAddrStateStore.get(personIdAddrIdKey);
                            log.info("MATCHING: {}", matching);
    
                            KeyValueIterator<String, JoinedPersonAddrV2> matchedPersonAddrIterator = personAddrStateStore.prefixScan(personIdAddrIdKey, new StringSerializer());
    
                            if (matchedPersonAddrIterator.hasNext()) {
                                while (matchedPersonAddrIterator.hasNext()) {
                                    JoinedPersonAddrV2 matchedPersonAddr = matchedPersonAddrIterator.next().value;
                                    // if the bp-addr is an emmitent and didn't find mathing personAddr look with just personId using prefix scan
                                    // because addrId is not propagated on bp, therefore they might have a different domiAddr on bp and person
                                    if (matchedPersonAddr == null && isEmittent) {
                                        String personId = splittedKey[1];
                                        // do a prefix scan for a personId
                                        KeyValueIterator<String, JoinedPersonAddrV2> joinedPersonIterator = personAddrStateStore.prefixScan(personId, new StringSerializer());
                                        while (joinedPersonIterator.hasNext()) {
                                            JoinedPersonAddrV2 item = joinedPersonIterator.next().value;
                                            context().forward(record.withKey(personIdAddrIdKey).withValue(createObjectUpdateEvent(item, bpAddr)));
                                        }
                                        // "standard" match forward the record
                                    } else if (matchedPersonAddr != null) {
                                        context().forward(record.withKey(personIdAddrIdKey).withValue(createObjectUpdateEvent(matchedPersonAddr, bpAddr)));
                                    }
                                }
                            }
                        }
    
                    } else if (personAddr != null) { // process if person-addr record is coming
    
                        if (record.value() != null) {
                            JoinedPersonAddrV2 storedValue = personAddrStateStore.get(record.key());
                            log.info("personaddr entry storedValue: {}", storedValue);
    
                            if (storedValue == null) {
                                log.debug("{}, New PersonAddr Table entry for: {}, hash: {}", PERSON_STORE_NAME, record.key(), personAddr.getHash());
                                personAddrStateStore.put(record.key(), personAddr);
                                isHashUpdated = true;
                            } else if (!storedValue.getHash().equals(personAddr.getHash())) {
                                log.debug("{}, Update PersonAddr Table entry for: {}, old hash: {}, new hash: {}", PERSON_STORE_NAME, record.key(), storedValue.getHash(), personAddr.getHash());
                                personAddrStateStore.put(record.key(), personAddr);
                                isHashUpdated = true;
                            } else {
                                log.debug("{}, No hash change do nothing for PersonAddr key: {}, hash: {}", PERSON_STORE_NAME, record.key(), personAddr.getHash());
                            }
                        } else {
                            log.debug("Got NULL JoinedPersonAddr Object - skip! --> key: {}", record.key());
                        }
    
                        if (isHashUpdated) {
                            try {
    
                                // look for matches in bpAddr store
                                log.info("SEARCHING IN BP - ADDR STATE STORE WITH A KEY: {}", record.key());
                                KeyValueIterator<String, JoinedBpAddrV2> bpAddrIterator = bpAddrStateStore.prefixScan(record.key(), new StringSerializer());
    
                                if (bpAddrIterator.hasNext()) {
                                    log.trace("Found records for key {} record in {}", record.key(), BP_STORE_NAME);
                                    // got matches for given personId:addrId update all found records
                                    while (bpAddrIterator.hasNext()) {
                                        JoinedBpAddrV2 bpAddrObject = bpAddrIterator.next().value;
                                        context().forward(record.withKey(record.key()).withValue(createObjectUpdateEvent(personAddr, bpAddrObject)));
                                    }
                                } else {
                                    log.trace("Found NO records for key {} record in {}", record.key(), BP_STORE_NAME);
                                    String[] splittedKey = record.key().split(":");
                                    final String ZERO = "0";
    
                                    // only when addr != 0 and is emittent
                                    if (personAddr.getIsEmittent() && !splittedKey[1].equals(ZERO)) {
                                        String personIdNullAddrKey = String.format("%s:%s", splittedKey[0], ZERO);
    //                                look for records in bp-addr state store with key personId:0
                                        KeyValueIterator<String, JoinedBpAddrV2> bpNullAddrIterator = bpAddrStateStore.prefixScan(personIdNullAddrKey, new StringSerializer());
                                        if (bpNullAddrIterator.hasNext()) {
                                            log.trace("Found records for key {} record in {}", personIdNullAddrKey, BP_STORE_NAME);
                                            while (bpNullAddrIterator.hasNext()) {
                                                JoinedBpAddrV2 bpAddrV2 = bpNullAddrIterator.next().value;
                                                context().forward(record.withKey(record.key()).withValue(createObjectUpdateEvent(personAddr, bpAddrV2)));
                                            }
                                        }
                                    } else {
                                        log.trace("Found NO matched sending PERSON_ONLY records");
                                        // no match in bpAddr state store send PERSON_ONLY event
                                        context().forward(record.withValue(createObjectUpdateEvent(personAddr, null)));
                                    }
                                }
                            } catch (Exception e) {
                                log.error("EXCEPTION : {}", e.getMessage());
                            }
                        }
                    }
                }
            }, BP_STORE_NAME, PERSON_STORE_NAME)
                    .process(() -> new DebounceTransformer<>(DEBOUNCE_STORE, 5000), DEBOUNCE_STORE)
                    .peek((key, value) -> log.debug("Produced Update Event key: {}, hash: {}", key, value.getHash()))
                    .to(kafkaStreamsProperty.getOutputTopic(), Produced.with(Serdes.String(), updateSerde));
    
            return builder;
        }
    

    拓扑结构如下所示:

    Topology: Topologies:
       Sub-topology: 0
        Source: KSTREAM-SOURCE-0000000000 (topics: [person.addr.join.topic])
          --> KSTREAM-MAPVALUES-0000000003
        Source: KSTREAM-SOURCE-0000000001 (topics: [bp.addr.join.topic])
          --> KSTREAM-MAPVALUES-0000000002
        Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
          --> KSTREAM-MERGE-0000000004
          <-- KSTREAM-SOURCE-0000000001
        Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
          --> KSTREAM-MERGE-0000000004
          <-- KSTREAM-SOURCE-0000000000
        Processor: KSTREAM-MERGE-0000000004 (stores: [])
          --> KSTREAM-PROCESSOR-0000000005
          <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003
        Processor: KSTREAM-PROCESSOR-0000000005 (stores: [bp-state-store, person-state-store])
          --> KSTREAM-PROCESSOR-0000000006
          <-- KSTREAM-MERGE-0000000004
        Processor: KSTREAM-PROCESSOR-0000000006 (stores: [debounce-store])
          --> KSTREAM-PEEK-0000000007
          <-- KSTREAM-PROCESSOR-0000000005
        Processor: KSTREAM-PEEK-0000000007 (stores: [])
          --> KSTREAM-SINK-0000000008
          <-- KSTREAM-PROCESSOR-0000000006
        Sink: KSTREAM-SINK-0000000008 (topic: update.event.topic)
          <-- KSTREAM-PEEK-0000000007
    
    0 回复  |  直到 2 年前
        1
  •  0
  •   bohunn    2 年前

    “问题”是我使用的strimzi集群有一个分区的Topics。 情况:如果你有一个主题a和B,其中有10个带有+的分区,它用默认的Partitioner向这些主题发送消息,那么BpAddr记录和PersonAddr记录可能位于不同的分区中。

    这意味着状态存储(也被分区)中的条目随后也在不同的分区中进行->因此,当更新记录出现并且应用程序想要检查状态存储中的条目时,我们必须确保PersonAddr和BpAddr始终位于同一分区中。

    解决方法是为编写PersonAddr和BpAddr记录的生产者编写一个自定义分区器,以确保它们位于主题的同一分区中。