代码之家  ›  专栏  ›  技术社区  ›  Tim

如何在将来取消时在Callable内终止CXF webservice调用

  •  2
  • Tim  · 技术社区  · 15 年前

    编辑

    这个问题到目前为止已经经过了几次迭代,所以请随意查看修订版,以查看有关历史和尝试过的事情的一些背景信息。


    我将CompletionService与ExecutorService和Callable一起使用,通过CXF生成的代码在几个不同的Web服务上并发调用多个函数。。这些服务都为我在项目中使用的一组信息提供了不同的信息。但是,服务可能会在长时间内无法响应,而不会引发异常,从而延长对组合信息集的等待时间。

    为了解决这个问题,我将同时运行所有服务调用,几分钟后,我想终止任何尚未完成的调用,最好从可调用的内部或通过抛出详细的异常来记录哪些调用尚未完成。

    下面是一些高度简化的代码来说明我已经在做的事情:

    private Callable<List<Feature>> getXXXFeatures(final WiwsPortType port, 
    final String accessionCode) {
        return new Callable<List<Feature>>() {
            @Override
            public List<Feature> call() throws Exception {
                List<Feature> features = new ArrayList<Feature>();
                //getXXXFeatures are methods of the WS Proxy
                //that can take anywhere from second to never to return
                for (RawFeature raw : port.getXXXFeatures(accessionCode)) {
                    Feature ft = convertFeature(raw);
                    features.add(ft);
                }
                if (Thread.currentThread().isInterrupted())
                    log.error("XXX was interrupted");
                return features;
            }
        };
    }
    

    以及并发启动WS调用的代码:

    WiwsPortType port = new Wiws().getWiws();
    List<Future<List<Feature>>> ftList = new ArrayList<Future<List<Feature>>>();
    //Counting wrapper around CompletionService, 
        //so I could implement ccs.hasRemaining()
    CountingCompletionService<List<Feature>> ccs = 
            new CountingCompletionService<List<Feature>>(threadpool);
    ftList.add(ccs.submit(getXXXFeatures(port, accessionCode)));
    ftList.add(ccs.submit(getYYYFeatures(port accessionCode)));
    ftList.add(ccs.submit(getZZZFeatures(port, accessionCode)));
    
    List<Feature> allFeatures = new ArrayList<Feature>();
    while (ccs.hasRemaining()) {
                //Low for testing, eventually a little more lenient
        Future<List<Feature>> polled = ccs.poll(5, TimeUnit.SECONDS);
        if (polled != null)
            allFeatures.addAll(polled.get());
        else {
            //Still jobs remaining, but unresponsive: Cancel them all
            int jobsCanceled = 0;
            for (Future<List<Feature>> job : ftList)
                if (job.cancel(true))
                    jobsCanceled++;
            log.error("Canceled {} feature jobs because they took too long",
                            jobsCanceled);
            break;
        }
    }
    

    我在这段代码中遇到的问题是,在等待port.getXXXFeatures(…)返回时,实际上并没有取消可调用项,而是以某种方式保持运行。正如你从照片上看到的 if (Thread.currentThread().isInterrupted()) log.error("XXX was interrupted"); 使用中断标志的语句 在port.getFeatures返回后设置,这仅在Webservice调用正常完成后可用,而不是在调用Cancel时中断。

    有人能告诉我我做错了什么,以及如何在给定的时间段后停止正在运行的CXF Webservice调用,并在我的应用程序中注册此信息吗?

    向你问好,蒂姆

    2 回复  |  直到 15 年前
        1
  •  2
  •   akarnokd    15 年前

    编辑3 新答案。

    • 将您的问题作为功能请求发布到Apache CXF上
    • 自己修复ACXF并公开一些功能。
    • 在Apache CXF中寻找异步WS-call支持选项
    • 如果服务支持RESTful API,您的WS是否使用RESTful API调用自己(例如,带参数的普通HTTP请求)
    • 仅限ber专家:使用真线程/线程组,并使用非正统方法杀死线程。
        2
  •  1
  •   Daniel Kulp    15 年前

    CXF文档中有一些关于设置HTTPURLConnection上的读取超时的说明: http://cwiki.apache.org/CXF20DOC/client-http-transport-including-ssl-support.html

    那可能会满足你的需要。如果服务器没有及时响应,将引发异常,可调用方将获得异常。(除了可能挂起的错误之外。我不记得是2.2.2修复了这个错误,还是现在只是在快照中。)