代码之家  ›  专栏  ›  技术社区  ›  visingh

Clickhouse Kafka引擎引发异常

  •  1
  • visingh  · 技术社区  · 8 年前

    我正在尝试使用Clickhouse Kafka引擎来摄取数据。数据为CSV格式。在数据摄取期间,有时会出现异常

    2018.01.08 08:41:47.016826 [ 3499 ] <Debug> StorageKafka (consumer_queue): Started streaming to 1 attached views
    2018.01.08 08:41:47.016906 [ 3499 ] <Trace> StorageKafka (consumer_queue): Creating formatted reader
    2018.01.08 08:41:49.680816 [ 3499 ] <Error> void DB::StorageKafka::streamThread(): Code: 117, e.displayText() = DB::Exception: Expected end of line, e.what() = DB::Exception, Stack trace:
    
    0. clickhouse-server(StackTrace::StackTrace()+0x16) [0x3221296]
    1. clickhouse-server(DB::Exception::Exception(std::string const&, int)+0x1f) [0x144a02f]
    2. clickhouse-server() [0x36e6ce1]
    3. clickhouse-server(DB::CSVRowInputStream::read(DB::Block&)+0x1a0) [0x36e6f60]
    4. clickhouse-server(DB::BlockInputStreamFromRowInputStream::readImpl()+0x64) [0x36e3454]
    5. clickhouse-server(DB::IProfilingBlockInputStream::read()+0x16e) [0x2bcae0e]
    6. clickhouse-server(DB::KafkaBlockInputStream::readImpl()+0x6c) [0x32f6e7c]
    7. clickhouse-server(DB::IProfilingBlockInputStream::read()+0x16e) [0x2bcae0e]
    8. clickhouse-server(DB::copyData(DB::IBlockInputStream&, DB::IBlockOutputStream&, std::atomic<bool>*)+0x55) [0x35b3e25]
    9. clickhouse-server(DB::StorageKafka::streamToViews()+0x366) [0x32f54f6]
    10. clickhouse-server(DB::StorageKafka::streamThread()+0x143) [0x32f58c3]
    11. clickhouse-server() [0x40983df]
    12. /lib/x86_64-linux-gnu/libpthread.so.0(+0x76ba) [0x7f4d115d06ba]
    13. /lib/x86_64-linux-gnu/libc.so.6(clone+0x6d) [0x7f4d10bf13dd]
    

    下表为

    CREATE TABLE test.consumer_queue (ID Int32,  DAY Date) ENGINE = Kafka('broker-ip:port', 'clickhouse-kyt-test','clickhouse-kyt-test-group', '**CSV**')
    
    CREATE TABLE test.consumer_request ( ID Int32,  DAY Date) ENGINE = MergeTree PARTITION BY DAY ORDER BY (DAY, ID) SETTINGS index_granularity = 8192
    
    CREATE MATERIALIZED VIEW test.consumer_view TO test.consumer_request (ID Int32, DAY Date) AS SELECT ID, DAY FROM test.consumer_queue
    

    CSV数据

    10034,"2018-01-05"
    10035,"2018-01-05"
    10036,"2018-01-05"
    10037,"2018-01-05"
    10038,"2018-01-05"
    10039,"2018-01-05"
    

    单击房屋服务器版本1.1.54318。

    2 回复  |  直到 8 年前
        1
  •  1
  •   Mikhail    8 年前

    ClickHouse似乎读取了卡夫卡的一批消息,然后试图将所有这些消息解码为一个CSV。 这个CSV中的消息应该用新行字符分隔。 因此,所有消息的结尾都应该有新行字符。

    我不确定这是ClickHouse的一个功能还是一个bug。

    您可以尝试只向卡夫卡发送一条消息,并检查它是否正确显示在ClickHouse中。

    如果使用脚本Kafka console producer向Kafka发送消息。然后这个脚本(ConsoleProducer.scala类)从文件中读取行,并将每行发送到卡夫卡主题,而不使用新行字符,因此无法正确处理此类消息。

    如果您使用自己的脚本/应用程序发送消息,则可以尝试对其进行修改,并在每条消息的末尾添加新行字符。这应该可以解决问题。 或者,您可以为Kafka引擎使用另一种格式,例如JSONEachRow。

        2
  •  -1
  •   MAYANK BHARTI    6 年前

    同意@mikhail的回答,我想,试试设置kafka引擎中的kafka\u row\u分隔符=“\n”