代码之家  ›  专栏  ›  技术社区  ›  Jeremie

使用并行处理丢失HTTPS请求

  •  1
  • Jeremie  · 技术社区  · 7 年前

    我使用以下两个类方法从Questrade API请求信息( http://www.questrade.com/api/documentation/rest-operations/market-calls/markets-quotes-id )。我有11000多个库存符号,其中我要求Questrade API批量提供100个符号。

    import  requests
    from joblib import Parallel, delayed
    
    def parallel_request(self, elem, result, url, key):
        response = requests.get(''.join((url, elem)), headers=self.headers)
        result.extend(response.json().get(key))
    
    Parallel(n_jobs=-1, backend="threading")(
             delayed(self.parallel_request)(elem, self.symbol_ids_list, self.uri, 'symbols')\
             for elem in self.batch_result
         )
    

    如果我使用并行类发出110个以上的HTTPS请求,那么我得到的不是11000个输出,而是10500或10600个输出。所以我丢失了并行处理的数据。请注意,我在这里使用了两个python模块,即joblib( https://github.com/joblib/joblib/issues/651 )和请求( https://github.com/requests/requests )。

    以下内容 for 循环工作得很好,所以我知道我的问题是并行类。

    for elem in self.batch_result:
           response = requests.get(''.join((self.uri, elem)), headers=self.headers)
           self.symbol_ids_list.extend(response.json().get('symbols'))
    

    如何提高上一个 对于 循环而不丢失数据?

    更新

    一个样本 self.batch_result (简化结果)可能是 ['AAME,ABAC,ABIL,ABIO,ACERW,ACHN,ACHV,ACRX,ACST,ACTG,ADMA,ADMP,ADOM,ADXS,ADXSW,AEHR,AEMD,AETI,AEY,AEZS,AFMD,AGFSW,AGRX,AGTC,AHPAW,AHPI,AIPT,AKER,AKTX,ALIM,ALJJ,ALQA,ALSK,ALT,AMCN,AMDA,AMMA,AMRH,AMRHW,AMRN,AMRWW,AMTX,ANDAR,ANDAW,ANTH,ANY,APDN,APDNW,APOPW,APPS,APRI,APTO,APVO,APWC,AQB,AQMS,ARCI,ARCW,ARDM,AREX,ARGS,ARLZ,ARQL,ARTW,ARTX,ASFI,ASNA,ASRV,ASTC,ATACR,ATEC,ATHX,ATLC,ATOS,ATRS,AUTO,AVEO,AVGR,AVID,AVXL,AWRE,AXAS,AXON,AXSM,AYTU,AZRX,BASI,BBOX,BBRG,BCACR,BCACW,BCLI,BDSI,BHACR,BHACW,BIOC,BIOL,BIOS,BKEP,BKYI', 'BLDP,BLIN,BLNK,BLNKW,BLPH,BLRX,BMRA,BNSO,BNTC,BNTCW,BOSC,BOXL,BPTH,BRACR,BRACW,BRPAR,BRPAW,BSPM,BSQR,BUR,BURG,BVSN,BVXVW,BWEN,BYFC,CAAS,CADC,CALI,CAPR,CARV,CASI,CASM,CATB,CATS,CBAK,CBLI,CCCL,CCCR,CCIH,CDMO,CDTI,CELGZ,CERCW,CETV,CETX,CETXW,CFBK,CFMS,CFRX,CGEN,CGIX,CGNT,CHCI,CHEK,CHEKW,CHFS,CHKE,CHMA,CHNR,CIDM,CJJD,CKPT,CLDC,CLDX,CLIR,CLIRW,CLNE,CLRB,CLRBW,CLRBZ,CLSN,CLWT,CMSSR,CMSSW,CNACR,CNACW,CNET,CNIT,CNTF,CODA,CODX,COGT,CPAH,CPLP,CPRX,CPSH,CPSS,CPST,CREG,CRIS,CRME,CRNT,CSBR,CTHR,CTIB,CTIC,CTRV,CTXR,CTXRW,CUI', 'CUR,CVONW,CXDC,CXRX,CYCC,CYHHZ,CYRN,CYTR,CYTX,CYTXW,DARE,DCAR,DCIX,DELT,DEST,DFBG,DFFN,DGLY,DHXM,DLPN,DLPNW,DMPI,DOGZ,DOTAR,DOTAW,DRAD,DRIO,DRIOW,DRRX,DRYS,DSKEW,DSWL,DTEA,DTRM,DXLG,DXYN,DYNT,DYSL,EACQW,EAGLW,EARS,EASTW,EBIO,EDAP,EFOI,EGLT,EKSO,ELECW,ELGX,ELON,ELSE,ELTK,EMITF,EMMS,ENG,ENPH,ENT,EPIX,ESEA,ESES,ESTRW,EVEP,EVGN,EVK,EVLV,EVOK,EXFO,EXXI,EYEG,EYEGW,EYES,EYESW,FCEL,FCRE,FCSC,FFHL,FLGT,FLL,FMCIR,FMCIW,FNJN,FNTEW,FORD,FORK,FPAY,FRAN,FRED,FRSX,FSACW,FSNN,FTD,FTEK,FTFT,FUV,FVE,FWP,GALT,GASS,GCVRZ,GEC']

    self.uri 只是 'https://api01.iq.questrade.com/v1/symbols?names=' 如上述Questrade API链接所示。

    更新2

    马拉松的回答是一次很好的尝试,但没有给我更好的结果。第一次测试给了我31356(如果我将结果除以3,则为10452),而不是10900。第二次测试只给了我0或进程完全阻塞。

    我发现 Maximum allowed requests per second 是20。链接: http://www.questrade.com/api/documentation/rate-limiting . 如何提高上一个 对于 在考虑新信息时循环而不丢失数据?

    2 回复  |  直到 7 年前
        1
  •  0
  •   avigil    7 年前

    如果您没有坚持使用 joblib 您可以尝试一些标准的库并行处理模块。在python2/3中 multiprocessing.Pool 可用,并提供用于跨并行线程映射任务的函数。简化版本如下所示:

    from multiprocessing import Pool
    import requests
    
    HEADERS = {} # define headers here
    
    def parallel_request(symbols):
        response = requests.get('https://api01.iq.questrade.com/v1/symbols?names={}'.format(symbols), headers=HEADERS)
        return response.json()
    
    if __name__ == '__main__':
        p = Pool()
        batch_result = ['AAME,ABAC,ABIL,...',
                        'BLDP,BLIN,BLNK,...',
                        'CUR,CVONW,CXDC,...', 
                         ...]
    
        p.map(parallel_request, batch_result) # will return a list of len(batch_result) responses
    

    有异步和可移植版本 map 您可能需要更大尺寸的作业,当然您可以将参数添加到 parallel_requests 避免像我这样硬编码的任务。使用的注意事项 Pool 传递给它的任何参数都必须是可拾取的。

    在python3中 concurrent.futures 模块实际上在文档中有一个很好的多线程url检索示例。只要稍加努力,您就可以替换 load_url 在这个例子中 parallel_request 作用有一个版本 concurrent.futures 作为 futures 模块,以及。

    这些可能需要更多的重构工作,所以如果有一个解决方案 joblib公司 请随意选择。你的问题很可能是 joblib公司 ,有很多方法可以通过标准库以多线程方式实现这一点(尽管有一些添加的样板)。

        2
  •  0
  •   Marat    7 年前

    很可能是因为一些HTTP调用由于网络负载而失败。要测试,请更改 parallel_request :

    def parallel_request(self, elem, result, url, key):
        for i in range(3):  # 3 retries
            try:
                response = requests.get(''.join((url, elem)), headers=self.headers)
            except IOError: 
                continue
            result.extend(response.json().get(key))
            return
    

    可能性要小得多: list.extend 不是线程安全的。如果上面的代码片段没有帮助,请尝试保护 extend 带锁:

    import threading
    ...
    
    lock = threading.Lock()
    
    def parallel_request(self, elem, result, url, key):
        response = requests.get(''.join((url, elem)), headers=self.headers)
        lock.acquire()
        result.extend(response.json().get(key))
        lock.release()