代码之家  ›  专栏  ›  技术社区  ›  yoav.str

Vertx-如何从可执行程序异步读取流

  •  10
  • yoav.str  · 技术社区  · 6 年前

    我有了生成器G,一旦我从控制台运行它,它就开始向控制台输出(stdout)sleep写入x秒,然后继续一个数据流。

    我希望我的程序运行G并绑定到他的输出到Java VLTX应用程序作为流输入。

    我希望所有的阅读都是异步的 我怎样才能做到?

    这就是我要做的:

    public class InputHandler extends AbstractVerticle {
    
        final String command = "path";
    
        @Override
        public void start() throws Exception {
            Runtime r = Runtime.getRuntime();
            Process p;     // Process tracks one external native process
            BufferedReader is;  // reader for output of process
            String line;
            p = r.exec(command);
            System.out.println("In Main after exec");
             is = new BufferedReader(new InputStreamReader(p.getInputStream()));
    
            while ((line = is.readLine()) != null)
               try {
                   System.out.println(line);
               }catch (Exception ex){
                   System.out.println(ex);
               }
        }
    }
    

    这就是抛出的异常:

    io.vertx.core.VertxException: Thread blocked
        at java.io.FileInputStream.readBytes(Native Method)
        at java.io.FileInputStream.read(FileInputStream.java:255)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at io.vertx.example.InputHandler.start(InputHandler.java:27)
        at io.vertx.core.AbstractVerticle.start(AbstractVerticle.java:106)
        at io.vertx.core.impl.DeploymentManager.lambda$doDeploy$8(DeploymentManager.java:483)
        at io.vertx.core.impl.DeploymentManager$$Lambda$8/884603232.handle(Unknown Source)
        at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
        at io.vertx.core.impl.ContextImpl$$Lambda$9/154173878.run(Unknown Source)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:745)
    
    2 回复  |  直到 6 年前
        1
  •  4
  •   Gregoire Ducharme    6 年前

    Worker verticle https://vertx.io/preview/docs/vertx-core/java/#blocking_code

    vertx.deployVerticle(YourVerticle.class.getName(), new DeploymentOptions().setWorker(true));
    
        2
  •  1
  •   Alistair Carscadden    6 年前

     public void start() throws Exception {
            Runtime r = Runtime.getRuntime();
            Process p = r.exec(command);
            AsyncInputStream asyncInputStream = new AsyncInputStream(vertx, vertx.nettyEventLoopGroup(), p.getInputStream());
            asyncInputStream.handler(buffer ->
                    lineHandler(buffer.getString(0, buffer.length() - 1))
            );
        }
    

    org.wisdom.framework.vertx.AsyncInputStream

    <wisdomVertxEngin.version>0.10.0</wisdomVertxEngin.version>
    
    <dependency>
      <groupId>org.wisdom-framework</groupId>
      <artifactId>wisdom-vertx-engine</artifactId>
      <version>${wisdomVertxEngin.version}</version>
    </dependency>