我正在docker容器中运行kafka streams应用程序,其中包含docker compose。然而,streams应用程序的行为异常。所以,我有一个源话题(
topicSource
)以及多个目标主题(
topicDestination1
,
topicDestination2
…
topicDestination10
)我正基于某些谓词进行分支。
topicSoure
和
主题1
有一个直接的映射,即所有的记录只是进入目的地主题,没有任何过滤。
现在,当我在本地运行应用程序或在没有容器的服务器上运行应用程序时,所有这些都可以正常工作。
另一方面,当我在容器中运行streams应用程序(使用docker compose和kubernetes)时,它不会转发来自
主题课程
到
主题1
. 实际上,只有少数记录被转发。例如,源主题中大约有3000多条记录,而目标主题中只有6条记录。这一切真的很奇怪。
这是我的档案:
#FROM openjdk:8u151-jdk-alpine3.7
FROM openjdk:8-jdk
COPY /target/streams-examples-0.1.jar /streamsApp/
COPY /target/libs /streamsApp/libs
COPY log4j.properties /
CMD ["java", "-jar", "/streamsApp/streams-examples-0.1.jar"]
注:
在创建映像之前,我正在构建一个jar,以便始终有一个更新的代码。我已经确认了两个代码,一个没有容器运行的代码和一个有容器运行的代码是相同的。
main.java版本:
从源主题创建源流:
KStream<String, String> source_stream = builder.stream("topicSource");
基于谓词的分支:
KStream<String, String>[] branches_source_topic = source_stream.branch(
(key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")), // Sharing Set by Date
(key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")), // Added to secure link
(key, value) -> (value.contains("Operation\":\"AddedToGroup")), // Added to group
(key, value) -> (value.contains("Operation\":\"Add member to role.") || value.contains("Operation\":\"Remove member from role.")),//Role update by date
(key, value) -> (value.contains("Operation\":\"FileUploaded") || value.contains("Operation\":\"FileDeleted")
|| value.contains("Operation\":\"FileRenamed") || value.contains("Operation\":\"FileMoved")), // Upload file by date
(key, value) -> (value.contains("Operation\":\"UserLoggedIn")), // User logged in by date
(key, value) -> (value.contains("Operation\":\"Delete user.") || value.contains("Operation\":\"Add user.")
&& value.contains("ResultStatus\":\"success")), // Manage user by date
(key, value) -> (value.contains("Operation\":\"DLPRuleMatch") && value.contains("Workload\":\"OneDrive")) // MS DLP
);
将日志发送到目标主题:
这是直接映射主题,即所有记录只进入目标主题而不进行任何筛选。
AppUtil.pushToTopic(source_stream, Constant.USER_ACTIVITY_BY_DATE, "topicDestination1");
将日志从分支发送到目标主题:
AppUtil.pushToTopic(branches_source_topic[0], Constant.SHARING_SET_BY_DATE, "topicDestination2");
AppUtil.pushToTopic(branches_source_topic[1], Constant.ADDED_TO_SECURE_LINK_BY_DATE, "topicDestination3");
AppUtil.pushToTopic(branches_source_topic[2], Constant.ADDED_TO_GROUP_BY_DATE, "topicDestination4");
AppUtil.pushToTopic(branches_source_topic[3], Constant.ROLE_UPDATE_BY_DATE, "topicDestination5");
AppUtil.pushToTopic(branches_source_topic[4], Constant.UPLOAD_FILE_BY_DATE, "topicDestination6");
AppUtil.pushToTopic(branches_source_topic[5], Constant.USER_LOGGED_IN_BY_DATE, "topicDestination7");
AppUtil.pushToTopic(branches_source_topic[6], Constant.MANAGE_USER_BY_DATE, "topicDestination8");
apputli.java:
public static void pushToTopic(KStream<String, String> sourceTopic, HashMap<String, String> hmap, String destTopicName) {
sourceTopic.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> keywords = new ArrayList<String>();
try {
JSONObject send = new JSONObject();
JSONObject received = processJSON(new JSONObject(value), destTopicName);
boolean valid_json = true;
for(String key: hmap.keySet()) {
if (received.has(hmap.get(key))) {
send.put(key, received.get(hmap.get(key)));
}
else {
valid_json = false;
}
}
if (valid_json) {
keywords.add(send.toString());
}
} catch (Exception e) {
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to(destTopicName);
}
日志从何而来:
因此,日志来自在线连续流。python作业获取基本上是url的日志并将它们发送到
pre-source-topic
. 然后在streams应用程序中,我从该主题创建一个流,并点击这些url,然后返回我正在推送到的json日志
主题来源
.
我花了很多时间试图解决这个问题。我不知道出了什么问题,也不知道它为什么不处理所有日志。请帮我弄清楚。