代码之家  ›  专栏  ›  技术社区  ›  el323

kafka流在docker容器中的应用奇怪行为

  •  2
  • el323  · 技术社区  · 7 年前

    我正在docker容器中运行kafka streams应用程序,其中包含docker compose。然而,streams应用程序的行为异常。所以,我有一个源话题( topicSource )以及多个目标主题( topicDestination1 , topicDestination2 topicDestination10 )我正基于某些谓词进行分支。

    topicSoure 主题1 有一个直接的映射,即所有的记录只是进入目的地主题,没有任何过滤。

    现在,当我在本地运行应用程序或在没有容器的服务器上运行应用程序时,所有这些都可以正常工作。

    另一方面,当我在容器中运行streams应用程序(使用docker compose和kubernetes)时,它不会转发来自 主题课程 主题1 . 实际上,只有少数记录被转发。例如,源主题中大约有3000多条记录,而目标主题中只有6条记录。这一切真的很奇怪。

    这是我的档案:

    #FROM openjdk:8u151-jdk-alpine3.7
    FROM openjdk:8-jdk
    
    COPY /target/streams-examples-0.1.jar /streamsApp/
    
    COPY /target/libs /streamsApp/libs
    
    COPY log4j.properties /
    
    CMD ["java", "-jar", "/streamsApp/streams-examples-0.1.jar"]
    

    注: 在创建映像之前,我正在构建一个jar,以便始终有一个更新的代码。我已经确认了两个代码,一个没有容器运行的代码和一个有容器运行的代码是相同的。

    main.java版本:

    从源主题创建源流:

    KStream<String, String> source_stream = builder.stream("topicSource");
    

    基于谓词的分支:

    KStream<String, String>[] branches_source_topic = source_stream.branch(
                    (key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),        // Sharing Set by Date
                    (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")), // Added to secure link
                    (key, value) -> (value.contains("Operation\":\"AddedToGroup")),                                             // Added to group
                    (key, value) -> (value.contains("Operation\":\"Add member to role.") || value.contains("Operation\":\"Remove member from role.")),//Role update by date
                    (key, value) -> (value.contains("Operation\":\"FileUploaded") || value.contains("Operation\":\"FileDeleted")
                            || value.contains("Operation\":\"FileRenamed") || value.contains("Operation\":\"FileMoved")),       // Upload file by date
                    (key, value) -> (value.contains("Operation\":\"UserLoggedIn")),                                             // User logged in by date
                    (key, value) -> (value.contains("Operation\":\"Delete user.") || value.contains("Operation\":\"Add user.")
                            && value.contains("ResultStatus\":\"success")),                                                     // Manage user by date
                    (key, value) -> (value.contains("Operation\":\"DLPRuleMatch") && value.contains("Workload\":\"OneDrive"))   // MS DLP
                    );
    

    将日志发送到目标主题:

    这是直接映射主题,即所有记录只进入目标主题而不进行任何筛选。

    AppUtil.pushToTopic(source_stream, Constant.USER_ACTIVITY_BY_DATE, "topicDestination1");
    

    将日志从分支发送到目标主题:

    AppUtil.pushToTopic(branches_source_topic[0], Constant.SHARING_SET_BY_DATE, "topicDestination2");
    AppUtil.pushToTopic(branches_source_topic[1], Constant.ADDED_TO_SECURE_LINK_BY_DATE, "topicDestination3");
    AppUtil.pushToTopic(branches_source_topic[2], Constant.ADDED_TO_GROUP_BY_DATE, "topicDestination4");
    AppUtil.pushToTopic(branches_source_topic[3], Constant.ROLE_UPDATE_BY_DATE, "topicDestination5");
    AppUtil.pushToTopic(branches_source_topic[4], Constant.UPLOAD_FILE_BY_DATE, "topicDestination6");
    AppUtil.pushToTopic(branches_source_topic[5], Constant.USER_LOGGED_IN_BY_DATE, "topicDestination7");
    AppUtil.pushToTopic(branches_source_topic[6], Constant.MANAGE_USER_BY_DATE, "topicDestination8");
    

    apputli.java:

    public static void pushToTopic(KStream<String, String> sourceTopic, HashMap<String, String> hmap, String destTopicName) {
        sourceTopic.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                ArrayList<String> keywords = new ArrayList<String>();
                try {
                    JSONObject send = new JSONObject();
                    JSONObject received = processJSON(new JSONObject(value), destTopicName);
    
                    boolean valid_json = true;
                    for(String key: hmap.keySet()) {
                        if (received.has(hmap.get(key))) {
                            send.put(key, received.get(hmap.get(key)));
                        }
                        else {
                            valid_json = false;
                        }
                    }   
                    if (valid_json) {
                        keywords.add(send.toString());  
                    }
                } catch (Exception e) {
                    System.err.println("Unable to convert to json");
                    e.printStackTrace();
                }
    
                return keywords;
            }
        }).to(destTopicName);
    }
    

    日志从何而来:

    因此,日志来自在线连续流。python作业获取基本上是url的日志并将它们发送到 pre-source-topic . 然后在streams应用程序中,我从该主题创建一个流,并点击这些url,然后返回我正在推送到的json日志 主题来源 .

    我花了很多时间试图解决这个问题。我不知道出了什么问题,也不知道它为什么不处理所有日志。请帮我弄清楚。

    1 回复  |  直到 6 年前
        1
  •  1
  •   el323    7 年前

    所以经过大量的调试,我才知道我的探索方向是错误的,这是一个简单的例子,消费者比生产者慢。制作者继续写关于主题的新记录,由于消息是在流处理之后被消费的,所以消费者显然很慢。只需增加主题分区并使用相同的应用程序ID启动多个应用程序实例就可以了。

    推荐文章