РЕДАКТИРАНЕ 2018-01-27:
Оказва се, че този проблем е свързан с DirectRunner. Ако стартирате същия конвейер с помощта на DataflowRunner, трябва да получите партиди, които всъщност са до 1000 записа. DirectRunner винаги създава пакети с размер 1 след операция на групиране.
Оригинален отговор:
Сблъсках се със същия проблем, когато пиша в облачни бази данни, използвайки JdbcIO на Apache Beam. Проблемът е, че докато JdbcIO поддържа писане на до 1000 записа в една партида, никога не съм го виждал да пише повече от 1 ред наведнъж (трябва да призная:това винаги използваше DirectRunner в среда за разработка).
Ето защо добавих функция към JdbcIO, където можете сами да контролирате размера на партидите, като групирате данните си заедно и напишете всяка група като една партида. По-долу е даден пример как да използвате тази функция въз основа на оригиналния пример WordCount на Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
Разликата с нормалния метод за запис на JdbcIO е новият метод writeIterable()
който приема PCollection<Iterable<RowT>>
като вход вместо PCollection<RowT>
. Всеки Iterable се записва като една партида в базата данни.
Версията на JdbcIO с това допълнение може да се намери тук:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
Целият примерен проект, съдържащ примера по-горе, може да бъде намерен тук:https://github.com/ олавлоит/пример за гаечен лъч
(В Apache Beam също има изчакваща заявка за изтегляне за включване на това в проекта)