代码之家  ›  专栏  ›  技术社区  ›  smanurung

使用ValueProvider读取多个Pubsub订阅

  •  2
  • smanurung  · 技术社区  · 7 年前

    我有来自Cloud PubSub的多个订阅,要使用Apache Beam根据特定前缀模式读取。我扩展 PTransform 分类和实施 expand() 方法读取多个订阅并执行 Flatten 转换到 PCollectionList (多个 PCollection 从每个 subscription )。我无法将订阅前缀传递为 ValueProvider 进入 展开() 方法,自 展开() 在模板创建时调用,而不是在启动作业时调用。但是,如果我只使用1个订阅,我可以通过 价值提供者 进入 PubsubIO.readStrings().fromSubscription()

    下面是一些示例代码。

    public class MultiPubSubIO extends PTransform<PBegin, PCollection<PubsubMessage>> {
    
        private ValueProvider<String> prefixPubsub;
    
        public MultiPubSubIO(@Nullable String name, ValueProvider<String> prefixPubsub) {
            super(name);
            this.prefixPubsub = prefixPubsub;
        }
    
        @Override
        public PCollection<PubsubMessage> expand(PBegin input) {
            List<String> myList = null;
    
            try {
                // prefixPubsub.get() will return error
                myList = PubsubHelper.getAllSubscription("projectID", prefixPubsub.get());
            } catch (Exception e) {
                LogHelper.error(String.format("Error getting list of subscription : %s",e.toString()));
            }
    
            List<PCollection<PubsubMessage>> collectionList = new ArrayList<PCollection<PubsubMessage>>();
    
            if(myList != null && !myList.isEmpty()){
                for(String subs : myList){
                    PCollection<PubsubMessage> pCollection = input
                            .apply("ReadPubSub", PubsubIO.readMessagesWithAttributes().fromSubscription(this.prefixPubsub));    
                    collectionList.add(pCollection);
                }
    
                PCollection<PubsubMessage> pubsubMessagePCollection = PCollectionList.of(collectionList)
                        .apply("FlattenPcollections", Flatten.pCollections());
                return pubsubMessagePCollection;
            } else {
                LogHelper.error(String.format("No subscription with prefix %s found", prefixPubsub));
                return null;
            }
        }
    
        public static MultiPubSubIO read(ValueProvider<String> prefixPubsub){
            return new MultiPubSubIO(null, prefixPubsub);
        }
    }
    

    所以我在想怎么用同样的方法 PubsubIO.read().fromSubscription() 从中读取 价值提供者 。还是我遗漏了什么?

    搜索的链接:

    2 回复  |  直到 7 年前
        1
  •  1
  •   simplylizz    6 年前

    遗憾的是,目前无法做到这一点:

    • a的值是不可能的 ValueProvider 影响变换扩展-在扩展时,它是未知的;到知道时,管道形状已经固定。

    • 当前没有类似的转换 PubsubIO.read() 可以接受 PCollection 主题名称的。最终会有(通过 Splittable DoFn ),但这需要一段时间-目前还没有人在研究这个问题。

        2
  •  1
  •   Daljeet Singh    4 年前

    您可以使用 MultipleReadFromPubSub 来自apache beam io模块 https://beam.apache.org/releases/pydoc/2.27.0/_modules/apache_beam/io/gcp/pubsub.html

    topic_1 = PubSubSourceDescriptor('projects/myproject/topics/a_topic')
    topic_2 = PubSubSourceDescriptor(
                'projects/myproject2/topics/b_topic',
                'my_label',
                'my_timestamp_attribute')
    subscription_1 = PubSubSourceDescriptor(
                'projects/myproject/subscriptions/a_subscription')
    
    results = pipeline | MultipleReadFromPubSub(
                [topic_1, topic_2, subscription_1])