代码之家  ›  专栏  ›  技术社区  ›  Giannis

AWS IoT Python SDK和asyncio

  •  4
  • Giannis  · 技术社区  · 8 年前

    我需要使用AWS IoT MQTT服务。我正在做一些实验 https://github.com/aws/aws-iot-device-sdk-python 目前。

    我的应用程序将使用WebSocket与另一个服务通信,然后发布/订阅MQTT主题以转发/接收消息。

    这个库是否可能会阻止代码执行?我仍然试着了解asyncio,不确定应该注意什么。我怎么知道它是否会引起问题?

    我相信我只需要使用 AWSIoTMQTTClient 从图书馆上方。

    这是我的工作代码摘录:

    class AWSIoTClient:
    
        def __init__():
            ...
            self.client = AWSIoTMQTTClient(...)
    
        def subscribe(self, callback):
            self.client.subscribe(f'{self.TOPIC}/subscribe/', 0, callback)
    
        def publish(self, message):
            self.client.publish(self.TOPIC, message, 0)
    
    
    class MyWSProtocol(WebSocketClientProtocol):
    
        def set_aws_client(self, client: AWSIoTClient):
            client.subscribe(self.customCallback)
            self.client = client
    
        def customCallback(self, client, userdata, message):
            # This will be called when we send message from AWS
            if message.payload:
                message = json.loads(message.payload.decode('utf-8').replace("'", '"'))
                message['id'] = self.next_id()
                self.sendMessage(json.dumps(message).encode('utf-8'))
    
        def onMessage(self, payload, isBinary):
            message = json.loads(payload)
    
            # This will forward message to AWS
            self.client.publish(str(payload))
    
    1 回复  |  直到 8 年前
        1
  •  5
  •   Community Mohan Dere    5 年前

    这个库是否可能会阻止代码执行?

    我怎么知道它是否会引起问题?

    您不应该允许在任何协程中包含长时间运行的阻塞(同步)代码。这将导致阻塞您的全局事件循环,并进一步阻塞您在任何地方的所有协同路由。

    async def main():
        await asyncio.sleep(3)  # async sleeping, it's ok
    
        time.sleep(3)           # synchronous sleeping, this freezes event loop 
                                # and all coroutines for 3 seconds, 
                                # you should avoid it!
        
        await asyncio.sleep(3)  # async sleeping, it's ok
    

    如果您需要在协程内运行阻塞代码,那么应该在executor中运行( read here 关于它)。

    在编写协同路由时,应该记住这一点,但如果启用,asyncio通常会警告您此错误 debug mode :

    import asyncio
    import time
    
    
    async def main():
        await asyncio.sleep(3)
        time.sleep(3)
        await asyncio.sleep(3)
    
    
    loop = asyncio.get_event_loop()
    loop.set_debug(True)  # debug mode
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    

    您将看到警告:

    Executing <Handle <TaskWakeupMethWrapper object at 0x000002063C2521F8>(<Future finis...events.py:275>) created at C:\Users\gmn\AppData\Local\Programs\Python\Python36\Lib\asyncio\futures.py:348> took 3.000 seconds