代码之家  ›  专栏  ›  技术社区  ›  Giorgos Myrianthous

Kafka在启用SSL后连接出Java堆空间

  •  1
  • Giorgos Myrianthous  · 技术社区  · 7 年前

    我最近启用了SSL,并尝试在分布式模式下启动Kafka connect。 运行时

    connect-distributed connect-distributed.properties
    

    我得到以下错误:

    [2018-10-09 16:50:57,190] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:106)
    [2018-10-09 16:50:55,471] ERROR WorkerSinkTask{id=sink-mariadb-test} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
    java.lang.OutOfMemoryError: Java heap space
            at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
            at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
            at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
            at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
            at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
            at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
            at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
            at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
            at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
            at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
            at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
            at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
            at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
            at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    

    java.lang.OutOfMemoryError: Direct buffer memory
            at java.nio.Bits.reserveMemory(Bits.java:694)
            at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
            at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
            at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
            at sun.nio.ch.IOUtil.read(IOUtil.java:195)
            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
            at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
            at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
            at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:344)
            at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:305)
            at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
            at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
            at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
            at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
            at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
            at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
            at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
            at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    

    max and initial heap size by setting the KAFKA_HEAP_OPTS environment variable 通过跑步

    KAFKA_HEAP_OPTS="-Xms4g -Xmx6g" connect-distributed connect-distributed.properties
    

    但还是不行。

    我的问题是:

    1. SSL身份验证会影响内存使用吗?

    编辑:

    1 回复  |  直到 7 年前
        1
  •  9
  •   Giorgos Myrianthous    7 年前

    启用时遇到此问题 SASL_SSL

    [2018-10-12 12:33:36,426] ERROR WorkerSinkTask{id=test-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) 
    java.lang.OutOfMemoryError: Java heap space
    

    检查ConsumerConfig值表明我的配置未应用:

    [2018-10-12 12:33:35,573] INFO ConsumerConfig values: 
    ...
    security.protocol = PLAINTEXT
    

    found out 必须在configs前面加上前缀 producer. consumer.

    consumer.security.protocol=SASL_SSL
    
    推荐文章