代码之家  ›  专栏  ›  技术社区  ›  Piotr Czapla

AMQP的python客户端如何使用listen on basic.return

  •  2
  • Piotr Czapla  · 技术社区  · 15 年前

    我想确保我的消息被发送到队列中。

    为此,我要将强制参数添加到基本的\u发布中。 我还应该做什么来接收 basic.return 消息如果我的消息未能成功传递?

    我不能用 channel.wait() 倾听 基本返回 因为当我的消息成功传递时 wait() 函数永远挂起。(没有超时) 另一方面。当我不打电话的时候 频道。等待() 这个 channel.returned_messages 将保持为空,即使未传递消息。

    我用 py-amqplib 0.6版。

    欢迎任何解决方案。

    3 回复  |  直到 10 年前
        1
  •  1
  •   Piotr Czapla    15 年前

    目前不可能 basic.return 在代理中丢弃消息时异步发送。成功发送消息时,服务器没有报告任何数据。 所以Pyamqp听不到这样的消息。

    关于这个问题,我读了几篇文章。可能的解决方案是:

    • 使用Txamqp,处理basic.return的AMqp的扭曲版本
    • 使用带超时等待的pyamqp。(我不确定目前是否可行)
    • 经常使用同步命令ping服务器,以便pyamqp能够选择 基本返回 邮件到达时。

    因为对Pyamqp和Rabbitmq的支持水平一般都很低,所以我们决定根本不使用AMqp代理。

        2
  •  1
  •   Michael Dillon    13 年前

    您是否尝试过唯一完整的python amqp库?因为包装不整齐,所以没有被广泛使用。

    步骤1。编译C库-您可能需要 sudo apt-get install autotools-dev autoconf automake libtool

    mkdir rabbitc
    cd rabbitc
    hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
    hg clone http://hg.rabbitmq.com/rabbitmq-c/
    cd rabbitmq-c
    autoreconf -i
    make clean
    ./configure --prefix=/usr
    make
    sudo make install
    

    步骤2。安装python库

    pip install pylibrabbitmq
    
        3
  •  1
  •   Jonathan Wylie    13 年前

    您不能同步执行此操作,因为它是一个异步系统。但是您可以使用线程来解决这个问题。

    基本思想是,您启动一个线程,该线程在通道上执行等待操作,每当它退出等待时,它就会为返回的消息队列中的任何返回消息调用回调函数。 然后,您可以在回调函数中处理该消息

    def registerCallback(channel, call_back):
        """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange.
        """
        def wait():
            try:
                channel.wait()
            except Exception, e:
                print("Problem waiting on publish channel: %s" % str(e))
    
            while not channel.returned_messages.empty():
                returnedMessage = channel.returned_messages.get()
                processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage))
                processReturnedMessageThread.start()
    
            wait()
    
        waiting = Thread(target=wait) 
        waiting.start()