代码之家  ›  专栏  ›  技术社区  ›  Yang Liu

如何在二郎进行并行调用并等待所有结果?

  •  2
  • Yang Liu  · 技术社区  · 7 年前

    我在二郎的一个手机游戏后台工作。对于每个HTTP请求,可能需要查询不同的数据源,如PostgreSQL、MongoDB和Redis。我想并行地对这些数据源进行独立的调用,但找不到一种清晰的Erlang方法。

    例如,

    handle_request(?POST, <<"login">>, UserId, Token) ->
        % Verify token from PostgreSQL
        AuthResult = auth_service:login(UserId, Token), 
    
        % Get user data such as outfits and level from MongoDB
        UserDataResult = userdata_service:get(UserId),
    
        % Get messages sent to the user from Redis
        MessageResult = message_service:get(UserId),
    
        % How to run the above 3 calls in parallel?
        % Then wait for all their results here? 
    
        % Combine the result and send back to client
        build_response(AuthResult, UserDataResult, MessageResult).
    

    每个服务最终都会调用相应的数据驱动程序(epgsql、eredis、mongo-erlang),这些驱动程序最终会得到一些poooy:transaction和gen-server:call。如何设计这些服务模块也还没有决定。

    我想确保上面的3个数据调用可以并行运行,然后handle_请求函数等待所有3个调用完成,然后调用build_响应。我该怎么做呢?

    作为参考,在Nodejs,我可能会这么做。

    var authPromise = AuthService.login(user_id, token);
    var userDataPromise = UserdataService.get(user_id);
    var messagePromise = MessageService.get(user_id);
    Promise.all(authPromise, userDataPromise, messagePromise).then( function(values) { 
        buildResponse(values); 
    }
    

    在斯卡拉我可以这样做

    val authFuture = AuthService.login(userId, token)
    val userDataFuture = UserdataService.get(userId)
    val messageFuture = MessageService.get(userId)
    for {
        auth <- authFuture
        userData <- userDataFuture
        message <- messageFuture
    } yield ( buildResponse(auth, userData, message )
    

    显然,我认为这个问题是一个承诺/未来/收益问题。但有人告诉我,如果我在二郎寻找一个承诺,我可能会走错方向。在二郎实现这一目标的最佳实践是什么?

    1 回复  |  直到 7 年前
        1
  •  4
  •   7stud    7 年前

    如何在二郎进行并行调用并等待所有结果?

    你可以用堆积的 receive 条款。Erlang将永远在receive子句中等待,直到消息从进程到达为止(或者您可以用 after )--它类似于 等待承诺 在NoDEJS中:

    -module(my).
    -compile(export_all).
    
    all_results() -> 
        Pid1 = spawn(?MODULE, getdata1, [self(), {10, 20}]),
        Pid2 = spawn(?MODULE, getdata2, [self(), 30]),
        Pid3 = spawn(?MODULE, getdata3, [self()]),
    
        [receive {Pid1, Result1} -> Result1 end, 
         receive {Pid2, Result2} -> Result2 end,
         receive {Pid3, Result3} -> Result3 end].
    
    getdata1(From, {X, Y}) -> 
        %% mimic the time it takes to retrieve the data:
        SleepTime = rand:uniform(100),
        io:format("Sleeping for ~w milliseconds~n", [SleepTime]), 
        timer:sleep(SleepTime),
    
        From ! {self(), X+Y}.  %% send the data back to the main process
    
    getdata2(From, Z) ->
        SleepTime = rand:uniform(100),
        io:format("Sleeping for ~w milliseconds~n", [SleepTime]),
        timer:sleep(SleepTime),
    
        From ! {self(), Z+1}.
    
    getdata3(From) ->
        SleepTime = rand:uniform(100),
        io:format("Sleeping for ~w milliseconds~n", [SleepTime]),
        timer:sleep(SleepTime),
    
        From ! {self(), 16}. 
    

    请注意,此代码:

    [receive {Pid1, Result1} -> Result1 end, 
     receive {Pid2, Result2} -> Result2 end,
     receive {Pid3, Result3} -> Result3 end].
    

    相当于:

    R1 = receive {Pid1, Result1} -> 
             Result1 
         end,
    R2 = receive {Pid2, Result2} -> 
             Result2 
         end,
    R3 = receive {Pid3, Result3} -> 
             Result3 
         end,
    
    [R1, R2, R3].
    

    壳牌:

    ~/erlang_programs$ erl
    Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
    Eshell V9.3  (abort with ^G)
    
    1> c(my).                        
    my.erl:2: Warning: export_all flag enabled - all functions will be exported
    {ok,my}
    
    2> timer:tc(my, all_results, []).
    Sleeping for 66 milliseconds
    Sleeping for 16 milliseconds
    Sleeping for 93 milliseconds
    {96356,[30,31,16]}
    
    3> timer:tc(my, all_results, []).
    Sleeping for 57 milliseconds
    Sleeping for 30 milliseconds
    Sleeping for 99 milliseconds
    {100153,[30,31,16]}
    
    4> timer:tc(my, all_results, []).
    Sleeping for 66 milliseconds
    Sleeping for 31 milliseconds
    Sleeping for 24 milliseconds
    {66426,[30,31,16]}
    

    timer:tc() 返回函数执行所需的时间(以微秒为单位)(1000微秒=1毫秒)以及函数的返回值。例如,第一次 all_results() 调用它需要96.4毫秒才能完成,而如果按顺序执行,单个进程将需要66+16+93=175+毫秒才能完成。