我有来自Cloud PubSub的多个订阅,要使用Apache Beam根据特定前缀模式读取。我扩展
PTransform
分类和实施
expand()
方法读取多个订阅并执行
Flatten
转换到
PCollectionList
(多个
PCollection
从每个
subscription
)。我无法将订阅前缀传递为
ValueProvider
进入
展开()
方法,自
展开()
在模板创建时调用,而不是在启动作业时调用。但是,如果我只使用1个订阅,我可以通过
价值提供者
进入
PubsubIO.readStrings().fromSubscription()
。
下面是一些示例代码。
public class MultiPubSubIO extends PTransform<PBegin, PCollection<PubsubMessage>> {
private ValueProvider<String> prefixPubsub;
public MultiPubSubIO(@Nullable String name, ValueProvider<String> prefixPubsub) {
super(name);
this.prefixPubsub = prefixPubsub;
}
@Override
public PCollection<PubsubMessage> expand(PBegin input) {
List<String> myList = null;
try {
// prefixPubsub.get() will return error
myList = PubsubHelper.getAllSubscription("projectID", prefixPubsub.get());
} catch (Exception e) {
LogHelper.error(String.format("Error getting list of subscription : %s",e.toString()));
}
List<PCollection<PubsubMessage>> collectionList = new ArrayList<PCollection<PubsubMessage>>();
if(myList != null && !myList.isEmpty()){
for(String subs : myList){
PCollection<PubsubMessage> pCollection = input
.apply("ReadPubSub", PubsubIO.readMessagesWithAttributes().fromSubscription(this.prefixPubsub));
collectionList.add(pCollection);
}
PCollection<PubsubMessage> pubsubMessagePCollection = PCollectionList.of(collectionList)
.apply("FlattenPcollections", Flatten.pCollections());
return pubsubMessagePCollection;
} else {
LogHelper.error(String.format("No subscription with prefix %s found", prefixPubsub));
return null;
}
}
public static MultiPubSubIO read(ValueProvider<String> prefixPubsub){
return new MultiPubSubIO(null, prefixPubsub);
}
}
所以我在想怎么用同样的方法
PubsubIO.read().fromSubscription()
从中读取
价值提供者
。还是我遗漏了什么?
搜索的链接: