代码之家  ›  专栏  ›  技术社区  ›  Makky

Spring集成动态流线程问题

  •  2
  • Makky  · 技术社区  · 6 年前

    我正在基于数据库中的数据创建动态集成流。

    我们需要用文件名模式轮询的目录在数据库中

    例如

    Instance, directory , filename 
    ABC     , c:/input1  , test.txt
    DEF     , d:/input2 ,  fresh.xlsx
    

    我有大约200-300个条目,所以我要做的是为每个记录创建集成流,因为它将有不同的处理器等

    对于每条记录

     IntegrationFlowBuilder flowBuilder =
                            IntegrationFlows
                                    .from(new CustomFileReadingSource(input), consumer);
    
         flowBuilder.transform(new ObjectToJsonTransformer());
    
          flowBuilder.handle(o -> {
    
        //                System.out.println(o.getPayload());
                    });
    context.registration(flowBuilder.get()).register();
    

    从日志中

    2018-11-13 16:00:41.399 [task-scheduler-3] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:41.587 [task-scheduler-10] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
    2018-11-13 16:00:41.807 [task-scheduler-4] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:42.071 [task-scheduler-5] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:42.323 [task-scheduler-7] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:42.569 [task-scheduler-6] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:42.878 [task-scheduler-8] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:43.197 [task-scheduler-9] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:43.588 [task-scheduler-1] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:43.951 [task-scheduler-2] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:44.305 [task-scheduler-3] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:44.598 [task-scheduler-10] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32)
    2018-11-13 16:00:45.031 [task-scheduler-4] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:45.414 [task-scheduler-5] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    2018-11-13 16:00:45.974 [task-scheduler-7] INFO  c.b.m.p.f.b.LoggerSourceAdvisor.afterReceive(32) 
    

    正如您所看到的,只有少数线程轮询线程

    有人能帮我解释一下为什么它不创建线程或者没有更好的方法来实现并行轮询器吗?

    2 回复  |  直到 6 年前
        1
  •  2
  •   Artem Bilan    6 年前

    没错。因为所有 Polling Ednpoints ThreadPoolTaskScheduler 10 默认情况下作为池大小: https://docs.spring.io/spring-integration/docs/5.1.0.RELEASE/reference/html/configuration.html#namespace-taskscheduler

    另一方面,在CPU最多16核的情况下,尝试100个线程是毫无意义的。生成更多线程甚至可能会导致应用程序速度减慢。

        2
  •  1
  •   Gary Russell    6 年前