Серийни вложки
Най-лесният начин би бил да направите вмъквания
в рамките на Sink.foreach
.
Ако приемем, че сте използвали генериране на код за схема и освен това да приемем, че вашата таблица се казва "NumberTable"
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
Можем да напишем функция, която извършва вмъкването
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
И тази функция може да бъде поставена в мивката
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
Пакетни вложки
Можете допълнително да разширите методологията на Sink, като групирате N вмъквания наведнъж:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
Тази партидна мивка може да се захранва от Flow
който извършва групирането на партиди:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)