Mysql
 sql >> база данни >  >> RDS >> Mysql

Google Dataflow (Apache beam) JdbcIO групово вмъкване в mysql база данни

РЕДАКТИРАНЕ 2018-01-27:

Оказва се, че този проблем е свързан с DirectRunner. Ако стартирате същия конвейер с помощта на DataflowRunner, трябва да получите партиди, които всъщност са до 1000 записа. DirectRunner винаги създава пакети с размер 1 след операция на групиране.

Оригинален отговор:

Сблъсках се със същия проблем, когато пиша в облачни бази данни, използвайки JdbcIO на Apache Beam. Проблемът е, че докато JdbcIO поддържа писане на до 1000 записа в една партида, никога не съм го виждал да пише повече от 1 ред наведнъж (трябва да призная:това винаги използваше DirectRunner в среда за разработка).

Ето защо добавих функция към JdbcIO, където можете сами да контролирате размера на партидите, като групирате данните си заедно и напишете всяка група като една партида. По-долу е даден пример как да използвате тази функция въз основа на оригиналния пример WordCount на Apache Beam.

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

Разликата с нормалния метод за запис на JdbcIO е новият метод writeIterable() който приема PCollection<Iterable<RowT>> като вход вместо PCollection<RowT> . Всеки Iterable се записва като една партида в базата данни.

Версията на JdbcIO с това допълнение може да се намери тук:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java

Целият примерен проект, съдържащ примера по-горе, може да бъде намерен тук:https://github.com/ олавлоит/пример за гаечен лъч

(В Apache Beam също има изчакваща заявка за изтегляне за включване на това в проекта)




  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. mysql:заобикаляне на неявни транзакции?

  2. Добър инструмент за визуализиране на схема на база данни?

  3. Неуспешно изграждане на mysql конектор/c (libmysql) от източник в cygwin

  4. Динамичен псевдоним на колона въз основа на стойността на колоната

  5. Преобразуване/конвертиране на BigInt в Varchar в MySQL