代码之家  ›  专栏  ›  技术社区  ›  Shan-Desai askovpen

使用popen、serial和mqtt的复杂应用程序中的线程

  •  1
  • Shan-Desai askovpen  · 技术社区  · 6 年前

    我正在创建一个 python3 考虑以下因素的模块:

    1. MQTT库来自 paho.mqtt.client
    2. GPS数据的读取与解析 pynmea2 and serial
    3. 从中读取特定信息 Popen and PIPE 子流程
    4. 读/写应用程序到 InfluxDB

    结构

    .
    |-- myApp
    |   |-- mqtt
    |   |-- db
    |   |-- gps
    |   |-- n2k
    |-- main.py
    

    mqtt 我已经为 mqtt.Client 在上面提到的 main.py 只需使用:

      client = myMqtt()
      client.run() # <--- run() has `loop_forever()`
    

    loop_forever() code from paho.mqtt

    对于 gps db 我正在考虑为我的应用程序创建类似的包装。

    为了 n2k 我有一个简单的包装器,它调用预先安装的二进制文件( actisense-serial analyzer )打开 /dev/ttyUSB0 端口如下:

    class N2KParser:
        def __init__(self, parser=None):
            self.cfg = get_configuration('n2k') # this has all the configuration (/dev/ttyUSB0)
    
            self.actisense_process = Popen(['actisense-serial', '-r', self.cfg['port']], stdout=PIPE)
            self.analyzer_proc = Popen(['analyzer', '-json'], stdin=self.actisense_process.stdout,
                                        stdout=PIPE, stderr=PIPE)
        def read(self):
            logger.info('Reading NMEA2000 via Actisense-NGT1 Gateway')
            while self.analyzer_proc.poll() is None:
                n2k_data = self.analyzer_proc.stdout.readline().decode('utf-8')
                try:
                    n2k_dict = json.loads(n2k_data)
                except Exception as e:
                    raise e
                    self.actisense_process.close()
                    self.analyzer_proc.close()
    
        def close():
            logger.info('Closing the Parser for N2K')
            self.actisense_process.close()
            self.analyzer_proc.close()
    

    主.py 类似于 MQTT公司 我添加了一个 N2K公司 client.run()

    from myApp.cloud.Client import CloudMqtt
    from myApp.db.dbClient import influxInstance
    from myApp.nmea2k.n2kClient import N2KParser
    import time
    
    if __name__ == '__main__':
    
        try:
            db = influxInstance()
        except Exception as e:
            raise e
        try:
            client = CloudMqtt()
            rc = client.run()
        except Exception as e:
            raise e
            db.close()
            client.loop_stop()
            client.disconnect()
        try:
            parser = N2KParser() # <--- this is not executed since blocked by `client.run()`
        except Exception as e:
            raise e
            parser.close()
    

    如评论中所述:

    • 自从 run() 方法使用 永远循环() 它使用线程并被永远阻塞,因此 N2KParser 不创建和调用。

    在我的 myApp 这样它们就可以并行运行而不会相互阻塞?

    附笔。

    因为我用 logging 对于所有子模块:下面是 stdout

    INFO:myApp.db.dbClient:Client instance to DB created INFO:myApp.cloud.Client:MQTT client instance created DEBUG:myApp.cloud.Client:Sending CONNECT (u1, p1, wr0, wq0, wf0, c0, k60) client_id=b'f628a6d8-188a-403e-90ef-da72ec1474b6'DEBUG:umg.cloud.Client:Received CONNACK (1, 0) DEBUG:myApp.cloud.Client:RC: 0 INFO:myApp.cloud.Client:connected to cloud DEBUG:myApp.cloud.Client:Sending PINGREQ DEBUG:myApp.cloud.Client:Received PINGRESP

    0 回复  |  直到 6 年前