代码之家  ›  专栏  ›  技术社区  ›  DilTeam

在BigQuery上插入行:InsertAllRequest与BigQueryIO.WriteTableRows()。

  •  1
  • DilTeam  · 技术社区  · 7 年前

    当我在bigquery上插入行时, WriteTableRows,性能真的很差 与…相比 插入请求 . 显然,有些东西设置不正确。需要帮助。

    用例1: 编写了一个使用Twitter 4J处理“样本”Twitter流的Java程序。

    insertAllRequestBuilder.addRow(rowContent);
    

    当我从我的 雨衣 ,它每分钟直接向bigquery表中插入大约1000行。我认为在集群上运行数据流作业可以做得更好。

    用例2: 当tweet出现时,我会把它写给 话题 谷歌的 . 我从我的Mac上运行这个程序,它每分钟发送大约1000条消息。

    我写了一篇 数据流 读取此主题的作业&writes to bigquery using bigqueryio.writeTableRows()。 . 我有一个 8机器数据处理 集群。我在这个集群的主节点上用 数据流管理器 . 它是 难以置信 慢点!大约每5分钟100行。以下是相关代码的片段:

    statuses.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            TableRow row = new TableRow();
            Status status = c.element();
            row.set("Id", status.getId());
            row.set("Text", status.getText());
            row.set("RetweetCount", status.getRetweetCount());
            row.set("FavoriteCount", status.getFavoriteCount());
            row.set("Language", status.getLang());
            row.set("ReceivedAt", null);
            row.set("UserId", status.getUser().getId());
            row.set("CountryCode", status.getPlace().getCountryCode());
            row.set("Country", status.getPlace().getCountry());
            c.output(row);
        }
    })) 
        .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)//
                .withSchema(schema)
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                .withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
                .withNumFileShards(1000)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
    

    我做错什么了?我应该用“sparkrunner”吗?如何确认它正在我的集群的所有节点上运行。请给出建议。

    1 回复  |  直到 7 年前
        1
  •  2
  •   Felipe Hoffa    7 年前

    使用BigQuery,您可以:

    • 数据流输入。低延迟(每秒高达10万行)具有成本。
    • 批处理数据。更高的延迟,难以置信的吞吐量,完全免费。

    这就是你所经历的不同。如果您只想摄取1000行,那么批处理速度会明显减慢。同样的,100亿行通过批处理速度会更快,而且不会产生任何成本。

    数据流/BEM's BigQueryIO.writeTableRows 无法在中传输或批处理数据。

    BigQueryIO.Write.Method.FILE_LOADS 粘贴的代码选择批处理。

    推荐文章
    El.Hum  ·  SSIS不识别索引?
    8 年前