代码之家  ›  专栏  ›  技术社区  ›  Deven

Python Kafka多进程与线程

  •  3
  • Deven  · 技术社区  · 8 年前

    我可以使用 KafkaConsumer

    然而,当我使用 multiprocessing.Process threading.Thread ,我得到一个错误:

    OSError: [Errno 9] Bad file descriptor

    question documentation

    编辑

    穿线。线 而不是 多处理。过程 .

    from multiprocessing import Process
    
    class KafkaWrapper():
        def __init__(self):
            self.consumer = KafkaConsumer(bootstrap_servers='my.server.com')
    
        def consume(self, topic):
            self.consumer.subscribe(topic)
            for message in self.consumer:
                print(message.value)
    
    class ServiceInterface():
        def __init__(self):
            self.kafka_wrapper = KafkaWrapper()
    
        def start(self, topic):
            self.kafka_wrapper.consume(topic)
    
    class ServiceA(ServiceInterface):
        pass
    
    class ServiceB(ServiceInterface):
        pass
    
    
    def main():
    
        serviceA = ServiceA()
        serviceB = ServiceB()
    
        jobs=[]
        # The code works fine if I used threading.Thread here instead of Process
        jobs.append(Process(target=serviceA.start, args=("my-topic",)))
        jobs.append(Process(target=serviceB.start, args=("my-topic",)))
    
        for job in jobs:
            job.start()
    
        for job in jobs:
            job.join()
    
    if __name__ == "__main__":
        main()
    

    这是我看到的错误(同样,我的实际代码与上面的示例不同,如果我使用 但如果我使用 多处理。过程 ):

    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
        self.run()
      File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "service_interface.py", line 58, in start
        self._kafka_wrapper.start_consuming(self.service_object_id)
      File "kafka_wrapper.py", line 141, in start_consuming
        for message in self._consumer:
      File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
        return next(self._iterator)
      File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
        self._client.poll(timeout_ms=poll_ms, sleep=True)
      File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
        responses.extend(self._poll(timeout, sleep=sleep))
      File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
        ready = self._selector.select(timeout)
      File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
        self.run()
      File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
        kev_list = self._kqueue.control(None, max_ev, timeout)
      File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "service_interface.py", line 58, in start
        self._kafka_wrapper.start_consuming(self.service_object_id)
      File "kafka_wrapper.py", line 141, in start_consuming
        for message in self._consumer:
      File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
        return next(self._iterator)
      File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
        self._client.poll(timeout_ms=poll_ms, sleep=True)
      File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
        responses.extend(self._poll(timeout, sleep=sleep))
    OSError: [Errno 9] Bad file descriptor
      File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
        ready = self._selector.select(timeout)
      File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
        kev_list = self._kqueue.control(None, max_ev, timeout)
    OSError: [Errno 9] Bad file descriptor
    
    1 回复  |  直到 8 年前
        1
  •  10
  •   Jacky Wang    8 年前

    Kafka使用者可以是多进程或多线程(确保正确使用的客户端库支持Kafka使用者组,这在早期版本的Kafka中是必需的),选择由您决定。

    传输控制协议 使用的连接(连接到Kafka服务器)不应由多个进程共享。这就是为什么你会出现连接错误。

    卡夫卡消费者

    另一种方法是使用单个线程/进程获取消息,并使用额外的进程池来执行实际操作。