代码之家  ›  专栏  ›  技术社区  ›  plpm

如何在Python中的进程之间发送和接收消息(通过通道)?

  •  0
  • plpm  · 技术社区  · 3 年前

    我正在尝试实现一个简单的程序,其中有几个进程通过发送和接收消息来同时相互通信。在该计划中,共有4名参与者(每个参与者对应一个流程),他们之间的沟通如下:

    P1向P2发送一条_消息,然后P2向P3发送另一条_消息,然后P3向P4发送一条_消息。根据每个参与者收到的信息,他们执行特定的操作。

    显然,例如,当P1向P2发送消息时,P2正在从P1接收该消息,因此它们是成对的。

    我发现了不同的方法,没有一种是合适的,因为它们似乎很复杂,因为我正在寻找。例如

    • Python MPI 限制为“系统中没有足够的可用插槽”。有几种方法可以解决这个问题,但解决方案有点复杂。
    • Socket programming 这主要适用于服务器和客户端场景。但我的程序没有服务器。我也查过了 this answer ,这也是基于套接字编程的。

    我的问题是,没有比上述方法更简单的方法来实现我解释的内容吗?可以用Python创建通信通道吗 相当地 和戈朗的类似吗?

    0 回复  |  直到 3 年前
        1
  •  0
  •   InhirCode    3 年前

    我不久前写的这段代码 os.pipe -它是独立的,但不是“最低限度的复制”,因为我没有时间重做它。它使用tkinter UI来模拟流程,并在流程之间发送和接收数据。请注意,该代码仅为我的私人目的而编写。

    """Test run of the use of pipes between processes.
           .. processes are control, startup , send and receive.
           .. pipes from control to startup and send
           .. pipe from startup to send
           .. pipe from send to receive
           .. startup, user input of run mode 
                ... prompt, timer (seconds) or number of runs
           .. send, user input of data 
           .. receive, display of data received
    
        . each process operates independently of, and in isolation from, the other processes until data is transferred through pipes 
        
    """ 
    #     fr read file descriptor
    #     fw write file descriptor
    #    wbs write bytes
    #    snb string length of output filled with 0 to write as header
    #   bsnb for number of bytes written, needed for read
    # maxbuf number of bytes of header, 4 digits, max 9999 characters in a string/byte literal
    #    onb output number of bytes
    #    dbs data read in bytes
    
    import tkinter as tk
    from os import pipe as ospipe
    from os import read as osread
    from os import write as oswrite
    from os import close as osclose
    from datetime import datetime as dt
    from time import monotonic as clock
    from functools import partial
    
    BG = '#fa4'
    TBG = '#fe8'
    SndBG = '#f91'
    BLK = '#000'
    STOP = '#d30'
    START =  '#0b0'
    start = clock()
    
    def timer(halt):
        tm = int(clock())
        if int(tm - start) > halt:
            return True
        else: return False
    
    def piperead(r):
        maxbuf = 4
        onb = osread(r,maxbuf)
        oi = int(onb.decode())
        dbs = osread(r,oi).decode()    # bytes to string
        osclose(r)
        return dbs
    
    def pipewrite(w,s):
        wbs = bytes(s, encoding='utf-8')
        snb = str(len(s)).zfill(4)
        bsnb = bytes(snb, encoding='utf-8')
        wbs = bsnb + wbs
        oswrite(w,wbs)
        osclose(w)
    
    def setpipe(process, sub=None, vars=None):
        fdr, fdw = ospipe()
        if sub: process(fdw,proc=(sub,vars))
        else: process(fdw)
        return piperead(fdr)
    
    class Sloop():
        def __init__(sl, pipewrite=None):
            sl.fw = pipewrite
            sl.w = tk.Tk()
            sl.w.geometry('400x200-100+80')
            sl.w.overrideredirect(1)
            sl.w['bg'] = BG
            uifnt = sl.w.tk.call('font', 'create', 'uifnt', '-family','Consolas', '-size',11)
            sl.lvb = tk.Button(sl.w, bg=BLK, activebackground=BG, relief='flat', command=sl.stop)
            sl.lvb.pack()
            sl.lvb.place(width=15,height=15, x=380,y=10)
            sl.sndb = tk.Button(sl.w,bg=SndBG,activebackground=BG,fg=TBG,  text=chr(11166), command=sl.send)
            sl.sndb.pack()
            sl.sndb.place(width=25,height=25, x=20,y=160)
            sl.tlbl = tk.Label(sl.w,bg=BG, text='write data to send...')
            sl.tlbl.pack()
            sl.tlbl.place(x=20,y=20)
            sl.t = tk.Text(sl.w,bg=TBG)
            sl.t.pack()
            sl.t.place(width=300,height=100, x=20,y=45)
            sl.t.focus_set()
            sl.w.mainloop() 
    
        def send(sl):
            sl.output = sl.t.get('1.0','end')
            if sl.output != '\n':
                pipewrite(sl.fw,sl.output)
                sl.close()
            else:
                sl.error()
    
        def error(sl):
            def _clearlbl(ev):
                sl.erlbl.destroy()
    
            sl.erlbl = tk.Label(sl.w,bg=TBG,text='there is nothing to send')              
            sl.erlbl.pack()
            sl.erlbl.place(x=20,y=160)
            sl.t.focus_set()
            sl.t.bind('<KeyPress>',_clearlbl)
    
        def stop(sl):
            pipewrite(sl.fw,'stop')
            sl.close()
    
        def close(sl):
            sl.w.destroy()
    
    class Rloop():
        def __init__(rl, pipefread=None):
            rl.fr = pipefread
            rl.w = tk.Tk()
            rl.w.geometry('400x200-100+320')
            rl.w.overrideredirect(1)
            rl.w['bg'] = BG
            uifnt = rl.w.tk.call('font', 'create', 'uifnt', '-family','Consolas', '-size',10)
            rl.lvb = tk.Button(rl.w, bg=BLK, activebackground=BG, relief='flat', command=rl.close)
            rl.lvb.pack()
            rl.lvb.place(width=15,height=15, x=380,y=10)
            rl.tlbl = tk.Label(rl.w,bg=BG, text='received...')
            rl.tlbl.pack()
            rl.tlbl.place(x=20,y=20)
            rl.t = tk.Text(rl.w,bg=TBG)
            rl.t['font'] = uifnt
            rl.t.pack()
            rl.t.place(width=300,height=100, x=20,y=45)
            rl.t.focus_set()
            rl.receive()
            rl.w.mainloop() 
    
        def receive(rl):
            rec = piperead(rl.fr)
            if rec != 'stop':
                rl.t.insert('end', '\n'.join([str(dt.now()), rec]))
            else: rl.close()    
    
        def close(rl):
            rl.w.destroy() 
    
    class Startup():
        def __init__(su, pipefwrite=None):
            su.fw = pipefwrite
            su.mode = ''
            su.w = tk.Tk()
            su.w.geometry('400x200-100+500')
            su.w.overrideredirect(1)
            su.w['bg'] = BG
            uifnt = su.w.tk.call('font', 'create', 'uifnt', '-family','Consolas', '-size',11)
            su.lvb = tk.Button(su.w, bg=BLK, activebackground=BG, relief='flat', command=su.stop)
            su.lvb.pack()
            su.lvb.place(width=15,height=15, x=380,y=10)
            su.sndb = tk.Button(su.w,bg=SndBG,activebackground=BG,fg=TBG,  text=chr(11166), command=su.send)
            su.sndb.pack()
            su.sndb.place(width=25,height=25, x=20,y=160)
            su.title = tk.Label(su.w,bg=BG, text='Modes to continue data input')
            su.title.pack()
            su.titley = 10
            su.title.place(x=20,y=su.titley)
    
            su.ysp = 20
            su.margin = 200
            ptxt = 'prompt'
            su.pb = tk.Button(su.w,bg=BG, activebackground=BG, text=ptxt,  relief='flat', cursor='hand2', command=partial(su._get,e=None, nm='su.pb', ent=None))
            tmtxt = ' timer '
            su.tmb = tk.Button(su.w,bg=BG, activebackground=BG, text=tmtxt,  relief='flat', cursor='hand2', command=partial(su._enter,ent='su.tmb'))
            rntxt = '  runs  '
            su.rnb = tk.Button(su.w,bg=BG, activebackground=BG, text=rntxt, relief='flat', cursor='hand2', command=partial(su._enter,ent='su.rnb'))
            su.pb.pack()
            su.pby = su.titley + 1.5*su.ysp
            su.pb.place(x=25,y=su.pby)
            su.tmb.pack()
            su.tmby = su.pby + 2*su.ysp
            su.tmb.place(x=25,y=su.tmby)
            su.rnb.pack()
            su.rnby = su.pby + 4*su.ysp
            su.rnb.place(x=25,y=su.rnby)
            su.formd = {'su.pb':su.pb, 'su.tmb':su.tmb, 'su.rnb':su.rnb}
            su.w.mainloop()
    
        def _able(su,nm):
            for key in su.formd: 
                if nm[0:4] not in key: 
                    su.formd[key]['state'] = 'disabled'
                else:
                    su.formd[key]['state'] = 'normal'
    
        def _enter(su,ent):
            if ent == 'su.tmb':
                tmtxt = 'seconds'
                su.tmlbl = tk.Label(su.w,bg=BG, text=tmtxt)
                su.tment = tk.Entry(su.w,bg=TBG)
                su.tmlbl.pack()
                su.tment.pack()
                tmlbly = su.tmby
                su.tmlbl.place(x=su._margin(tmtxt), y=tmlbly)
                su.tment.place(x=su.margin, y=tmlbly)
                su.tment.focus_set()
                su.tment.bind('<Return>', partial(su._get,nm='su.tment', ent=su.tment))
                su.formd = su.formd | {'su.tmlbl':su.tmlbl, 'su.tment':su.tment}
            elif ent == 'su.rnb':
                rntxt = 'number'
                su.rnlbl = tk.Label(su.w,bg=BG, text=rntxt)
                su.rnent = tk.Entry(su.w,bg=TBG)
                su.rnlbl.pack()
                su.rnent.pack()
                rnlbly = su.rnby
                su.rnlbl.place(x=su._margin(rntxt), y=rnlbly)
                su.rnent.place(x=su.margin, y=rnlbly)
                su.rnent.focus_set()
                su.rnent.bind('<Return>', partial(su._get,nm='su.rnent', ent=su.rnent))
                su.formd = su.formd | {'su.rnlbl':su.rnlbl, 'su.rnent':su.rnent}
      
        def _get(su,e,nm,ent):
            if nm == 'su.pb':
                su._able('su.pb')
                su.mode = 'prompt,'+'1'
            else:
                su._able(nm)
                for key in su.formd:
                    if key == nm:
                        if 'tm' in key: modestr = 'timer'
                        elif 'rn' in key: modestr = 'runs'
                        su.formd[key]['bg']=BG
                        su.mode = ','.join([modestr,str(ent.get())])
                        break
    
        def _margin(su,txt):
            return su.margin-(len(txt)*8)
    
        def send(su):
            pipewrite(su.fw,su.mode)
            su.close()
    
        def stop(su):
            pipewrite(su.fw,'stop')
            su.close()
     
        def close(su):
            su.w.destroy()
    
    class Control():
        def __init__(c, pipefwrite=None, proc=None):
            c.fw = pipefwrite
            c.proc = proc
            if c.proc:
                c.proc = proc[0]
                if proc[1]:
                    c.procv = proc[1]
                else:
                    c.procvl = None
            c.procd = {'start':c._strtui, 'prompt':c._prui, 'timer':c._tmui, 'runs':c._rnui}
            c.w = tk.Tk()
            c.w.geometry('100x200-60+80')
            c.w.overrideredirect(1)
            c.w['bg'] = BG
            uifnt = c.w.tk.call('font', 'create', 'uifnt', '-family','Consolas', '-size',11)
            c.lvb = tk.Button(c.w, bg=BLK, activebackground=BG, relief='flat', command=c.stop)
            c.lvb.pack()
            c.lvb.place(width=15,height=15, x=80,y=10)
            c.title = tk.Label(c.w,bg=BG, text='pipe test\nControl')
            c.title.pack()
            c.title.place(x=5,y=5)
            c.stpclr = tk.Label(c.w,bg=STOP)
            c.stpclr.pack()
            stpy = 160
            c.stpclr.place(width=7,height=7,x=2,y=stpy+10)
            c.stopb = tk.Button(c.w, bg=BG, text='stop', cursor='hand2', relief='flat', activebackground=BG, command=c.stop)
            c.stopb.pack()
            c.stopb.place(x=10,y=160)
            c.procd[c.proc]()
            c.w.mainloop()
    
        def _strtui(c):
            c.strtclr = tk.Label(c.w,bg=START)
            c.strtclr.pack()
            strty = 60
            c.strtclr.place(width=7,height=7,x=2,y=strty+10)
            c.startb = tk.Button(c.w, bg=BG, text='start', cursor='hand2', relief='flat', activebackground=BG, command=c.strtup)
            c.startb.pack()
            c.startb.place(x=10,y=strty)
    
        def __write(c,s):
                pipewrite(c.fw,s)
                c.close()
    
        def _prui(c): 
            prb = tk.Button(c.w,bg=TBG, text='--- next ---', activebackground=BG, relief='flat',cursor='hand2', command=partial(c.__write,'prompt'))
            prb.pack()
            prb.place(x=10,y=80)
    
        def __confirm(c):
            cb = tk.Button(c.w, bg=TBG, text='confirm', activebackground=BG, relief= 'flat', cursor='hand2', command=partial(c.__write,'confirm'))
            cb.pack()
            cb.place(x=20,y=120)
    
        def _tmui(c):
            tmt = ''.join(['run for\n',str(c.procv),' seconds'])
            tmlbl = tk.Label(c.w,bg=BG, text=tmt)
            tmlbl.pack()
            tmlbl.place(x=10,y=80)
            c.__confirm()
    
        def _rnui(c): 
            rnt = ''.join(['run\n ',str(c.procv),' times'])
            rnlbl = tk.Label(c.w,bg=BG, text=rnt)
            rnlbl.pack()
            rnlbl.place(x=10,y=80)
            c.__confirm()
    
        def strtup(c): 
            pipewrite(c.fw,'startup')
            c.close()
    
        def stop(c):
            pipewrite(c.fw,'stop')
            c.close()
            
        def close(c):
            c.w.destroy()
    
    def once():
        fr, fw = ospipe()
        Sloop(fw)
        Rloop(fr)
    
    def many(mkey,mint=1):
        """modes are ('prompt',1), ('timer',secs), ('runs',runs)
        """
        if mkey == 'timer':
            rec = setpipe(Control,sub='timer',vars=mint)
            if rec == 'confirm':
                while not timer(mint): 
                    once()
                return True
            elif rec == 'stop':
                return False
        elif mkey == 'runs': 
            rec = setpipe(Control,sub='runs',vars=mint)
            if rec ==  'confirm':
                for r in range(mint): 
                    once()
                return True
            elif rec == 'stop':
                return False
        elif mkey == 'prompt':
            quit = False
            while not quit:
                once()
                rec = setpipe(Control,sub='prompt')
                if rec != 'prompt': 
                    quit = True
            return True
    
    def testui():
        incontrol = True
        while incontrol:    
            rec = setpipe(Control,sub='start')
            if rec == 'startup':
                rec = setpipe(Startup)
                if rec != 'stop':
                    modes, p, ns = rec.partition(',')
                    incontrol = many(modes,int(ns))
            else:
                incontrol = False
    
    if __name__ == '__main__':
        testui()