寻找最好的方法。
1.使用Mongo反应流
Flux.from(collection.find()).doOnError(e -> { e.printStackTrace(); }).doOnComplete(() -> { System.out.println("Finished "); }).subscribe(doc -> { // Code to insert into Kafka });
另外,我是否需要在subscribe方法中执行多线程?
Kafka Connect MongoDB source connector
配置示例如下:
name=mongodb-source-connector connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector tasks.max=1 uri=mongodb://127.0.0.1:27017 batch.size=100 schema.name=yourSchemaName topic.prefix=aPrefix # optional databases=mydb.test,mydb.test2