这还没有出版,但在阿尔帕卡的主分支中,
MongoSource.apply
采用类型参数:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
因此,在即将发布的Alpakka 0.18版本中,您将能够执行以下操作:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
请注意
source
这里假设
todoCollection.find()
返回
Observable[TodoMongo]
;根据需要调整类型。
同时,您可以简单地手动添加上述代码。例如:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
请注意
MyMongoSource
定义为驻留在
akka.stream.alpakka.mongodb.scaladsl
包装(如
MongoSource
),因为
ObservableToPublisher
是包私有类。您将使用
MyMongoSource
以同样的方式使用
MongoSource公司
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())