当我在bigquery上插入行时,
WriteTableRows,性能真的很差
与…相比
插入请求
. 显然,有些东西设置不正确。需要帮助。
用例1:
编写了一个使用Twitter 4J处理“样本”Twitter流的Java程序。
insertAllRequestBuilder.addRow(rowContent);
当我从我的
雨衣
,它每分钟直接向bigquery表中插入大约1000行。我认为在集群上运行数据流作业可以做得更好。
用例2:
当tweet出现时,我会把它写给
话题
谷歌的
蛹
. 我从我的Mac上运行这个程序,它每分钟发送大约1000条消息。
我写了一篇
数据流
读取此主题的作业&writes to bigquery using
bigqueryio.writeTableRows()。
. 我有一个
8机器数据处理
集群。我在这个集群的主节点上用
数据流管理器
. 它是
难以置信
慢点!大约每5分钟100行。以下是相关代码的片段:
statuses.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
Status status = c.element();
row.set("Id", status.getId());
row.set("Text", status.getText());
row.set("RetweetCount", status.getRetweetCount());
row.set("FavoriteCount", status.getFavoriteCount());
row.set("Language", status.getLang());
row.set("ReceivedAt", null);
row.set("UserId", status.getUser().getId());
row.set("CountryCode", status.getPlace().getCountryCode());
row.set("Country", status.getPlace().getCountry());
c.output(row);
}
}))
.apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)//
.withSchema(schema)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(org.joda.time.Duration.standardMinutes(2))
.withNumFileShards(1000)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
我做错什么了?我应该用“sparkrunner”吗?如何确认它正在我的集群的所有节点上运行。请给出建议。