Това все още не е публикувано, но в главния клон на Alpakka, MongoSource.apply
приема параметър тип:
object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Следователно с предстоящото издание 0.18 на Alpakka ще можете да правите следното:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Обърнете внимание, че source
тук предполага, че todoCollection.find()
връща Observable[TodoMongo]
; коригирайте типовете според нуждите.
Междувременно можете просто да добавите горния код ръчно. Например:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Обърнете внимание, че MyMongoSource
е дефинирано да се намира в akka.stream.alpakka.mongodb.scaladsl
пакет (като MongoSource
), защото ObservableToPublisher
е пакетен частен клас. Ще използвате MyMongoSource
по същия начин, по който бихте използвали MongoSource
:
val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())