代码之家  ›  专栏  ›  技术社区  ›  xDs

Alpakka MongoDB-在MongoSource中指定类型

  •  0
  • xDs  · 技术社区  · 7 年前

    我目前正在与Akka Streams和 Alpakka MongoDB connector

    是否可以指定 MongoSource ?

    val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
      private val todoCollection: MongoCollection[TodoMongo] = mongoDb
        .withCodecRegistry(codecRegistry)
        .getCollection("todo")
    

    我想这样做:

    val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
    MongoSource(t) // Stuck here
    

    但我得到以下错误:

    Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].
    

    我找不到有关此部件的正确文档。

    1 回复  |  直到 7 年前
        1
  •  1
  •   Jeffrey Chung    7 年前

    这还没有出版,但在阿尔帕卡的主分支中, MongoSource.apply 采用类型参数:

    object MongoSource {
      def apply[T](query: Observable[T]): Source[T, NotUsed] =
        Source.fromPublisher(ObservableToPublisher(query))
    }
    

    因此,在即将发布的Alpakka 0.18版本中,您将能够执行以下操作:

    val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
    

    请注意 source 这里假设 todoCollection.find() 返回 Observable[TodoMongo] ;根据需要调整类型。

    同时,您可以简单地手动添加上述代码。例如:

    package akka.stream.alpakka.mongodb.scaladsl
    
    import akka.NotUsed
    import akka.stream.alpakka.mongodb.ObservableToPublisher
    import akka.stream.scaladsl.Source
    import org.mongodb.scala.Observable
    
    object MyMongoSource {
      def apply[T](query: Observable[T]): Source[T, NotUsed] =
        Source.fromPublisher(ObservableToPublisher(query))
    }
    

    请注意 MyMongoSource 定义为驻留在 akka.stream.alpakka.mongodb.scaladsl 包装(如 MongoSource ),因为 ObservableToPublisher 是包私有类。您将使用 MyMongoSource 以同样的方式使用 MongoSource公司 :

    val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())