В Apache Kafka Java приложенията, наречени производители, пишат структурирани съобщения в клъстер на Kafka (съставен от брокери). По същия начин Java приложенията, наречени потребители, четат тези съобщения от същия клъстер. В някои организации има различни групи, които отговарят за писането и управлението на производителите и потребителите. В такива случаи една основна болезнена точка може да бъде в координирането на договорения формат на съобщение между производители и потребители.
Този пример демонстрира как да използвате Apache Avro за сериализиране на записи, които се създават на Apache Kafka, като същевременно позволяват еволюция на схеми и несинхронно актуализиране на приложенията производители и потребители.
Сериализация и десериализация
Записът на Kafka (наричан преди съобщение) се състои от ключ, стойност и заглавки. Кафка не е наясно със структурата на данните в ключа и стойността на записите. Той ги обработва като байтови масиви. Но системите, които четат записи от Kafka, се интересуват от данните в тези записи. Така че трябва да създадете данни в четим формат. Форматът на данните, който използвате, трябва да
- Бъдете компактни
- Бъдете бързи за кодиране и декодиране
- Разрешаване на еволюция
- Разрешаване на системите нагоре по веригата (тези, които записват в клъстер Kafka) и системите надолу по веригата (тези, които четат от същия клъстер Kafka) да надграждат до по-нови схеми по различно време
JSON, например, е разбираем, но не е компактен формат на данни и се анализира бавно. Avro е рамка за бърза сериализация, която създава сравнително компактен изход. Но за да прочетете Avro записи, ви е необходима схемата, с която данните са били сериализирани.
Една от възможностите е да съхранявате и прехвърляте схемата със самия запис. Това е добре във файл, където съхранявате схемата веднъж и я използвате за голям брой записи. Съхраняването на схемата във всеки запис на Kafka обаче добавя значителни разходи по отношение на пространството за съхранение и използването на мрежата. Друг вариант е да имате договорен набор от съпоставяния на идентификатор-схема и да се позовавате на схемите по техните идентификатори в записа.
От обект до запис на Kafka и обратно
Приложенията на производителите не трябва да конвертират данни директно в байтови масиви. KafkaProducer е генеричен клас, който се нуждае от неговия потребител, за да посочи типовете ключ и стойност. След това продуцентите приемат екземпляри на ProducerRecord
които имат параметри от същия тип. Преобразуването от обект в масив от байтове се извършва от сериализатор. Kafka предоставя някои примитивни сериализатори:например IntegerSerializer
, ByteArraySerializer
, StringSerializer
. От страна на потребителя, подобни десериализатори преобразуват байтови масиви в обект, с който приложението може да работи.
Така че има смисъл да се включите на ниво сериализатор и десериализатор и да позволите на разработчиците на приложения за производители и потребители да използват удобния интерфейс, предоставен от Kafka. Въпреки че най-новите версии на Kafka позволяват ExtendedSerializers
и ExtendedDeserializers
за достъп до заглавки, решихме да включим идентификатора на схемата в ключа и стойността на записите на Kafka, вместо да добавяме заглавки на записите.
Avro Essentials
Avro е рамка за сериализиране на данни (и отдалечено извикване на процедури). Той използва JSON документ, наречен схема, за да опише структурите от данни. Повечето използване на Avro става или чрез GenericRecord, или чрез подкласове на SpecificRecord. Java класовете, генерирани от Avro схеми, са подкласове на вторите, докато първите могат да се използват без предварително познаване на структурата от данни, с която се работи.
Когато две схеми удовлетворяват набор от правила за съвместимост, данните, записани с една схема (наречена схема за запис), могат да се четат, сякаш са написани с другата (наречена схема за четене). Схемите имат канонична форма, която съдържа всички детайли, които са ирелевантни за сериализацията, като коментари, премахнати, за да подпомогнат проверката за еквивалентност.
VersionedSchema и SchemaProvider
Както споменахме по-горе, имаме нужда от едно към едно съпоставяне между схемите и техните идентификатори. Понякога е по-лесно да се отнасяте към схемите по имена. Когато се създаде съвместима схема, тя може да се счита за следваща версия на схемата. По този начин можем да се позоваваме на схеми с двойка име, версия. Нека наречем заедно схемата, нейния идентификатор, име и версия VersionedSchema
. Този обект може да съдържа допълнителни метаданни, които приложението изисква.
public class VersionedSchema { private final int id; private final String name; private final int version; private final Schema schema; public VersionedSchema(int id, String name, int version, Schema schema) { this.id = id; this.name = name; this.version = version; this.schema = schema; } public String getName() { return name; } public int getVersion() { return version; } public Schema getSchema() { return schema; } public int getId() { return id; } }
SchemaProvider
обектите могат да търсят екземплярите на VersionedSchema
.
public interface SchemaProvider extends AutoCloseable { public VersionedSchema get(int id); public VersionedSchema get(String schemaName, int schemaVersion); public VersionedSchema getMetadata(Schema schema); }
Как се внедрява този интерфейс е разгледано в „Внедряване на хранилище на схеми“ в бъдеща публикация в блога.
Сериализиране на общи данни
Когато сериализираме запис, първо трябва да разберем коя схема да използваме. Всеки запис има getSchema
метод. Но откриването на идентификатора от схемата може да отнеме време. По принцип е по-ефективно да зададете схемата по време на инициализация. Това може да стане директно чрез идентификатор или по име и версия. Освен това, когато произвеждаме към множество теми, може да искаме да зададем различни схеми за различни теми и да разберем схемата от името на темата, предоставено като параметър на метода serialize(T, String)
. Тази логика е пропусната в нашите примери за краткост и простота.
private VersionedSchema getSchema(T data, String topic) { return schemaProvider.getMetadata( data.getSchema()); }
Със схемата в ръка трябва да я съхраняваме в нашето съобщение. Сериализирането на идентификатора като част от съобщението ни дава компактно решение, тъй като цялата магия се случва в сериализатора/десериализатора. Освен това позволява много лесна интеграция с други рамки и библиотеки, които вече поддържат Kafka, и позволява на потребителя да използва свой собствен сериализатор (като Spark).
Използвайки този подход, първо записваме идентификатора на схемата върху първите четири байта.
private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException { try (DataOutputStream os = new DataOutputStream(stream)) { os.writeInt(id); } }
След това можем да създадем DatumWriter
и сериализирайте обекта.
private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null); DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema); datumWriter.write(data, encoder); encoder.flush(); }
Обединявайки всичко това, ние внедрихме общ сериализатор на данни.
public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> { private SchemaProvider schemaProvider; @Override public void configure(Map<String, ?> configs, boolean isKey) { schemaProvider = SchemaUtils.getSchemaProvider(configs); } @Override public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { VersionedSchema schema = getSchema(data, topic); writeSchemaId(stream, schema.getId()); writeSerializedAvro(stream, data, schema.getSchema()); return stream.toByteArray(); } catch (IOException e) { throw new RuntimeException("Could not serialize data", e); } } private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...} private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...} private VersionedSchema getSchema(T data, String topic) {...} @Override public void close() { try { schemaProvider.close(); } catch (Exception e) { throw new RuntimeException(e); } } }
Десериализиране на общи данни
Десериализацията може да работи с една схема (данните за схемата са записани с), но можете да посочите различна схема на четец. Схемата на четеца трябва да е съвместима със схемата, с която данните са били сериализирани, но не е необходимо да е еквивалентна. Поради тази причина въведохме имена на схеми. Сега можем да посочим, че искаме да четем данни с конкретна версия на схема. По време на инициализация четем желаните версии на схема за име на схема и съхраняваме метаданни в readerSchemasByName
за бърз достъп. Сега можем да четем всеки запис, написан със съвместима версия на схемата, сякаш е написан с посочената версия.
@Override public void configure(Map<String, ?> configs, boolean isKey) { this.schemaProvider = SchemaUtils.getSchemaProvider(configs); this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider); }
Когато даден запис трябва да бъде десериализиран, първо четем идентификатора на схемата за записване. Това позволява да се търси схемата на четеца по име. С двете налични схеми можем да създадем GeneralDatumReader
и прочетете записа.
@Override public GenericData.Record deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); VersionedSchema readerSchema = readerSchemasByName.get(writerSchema.getName()); GenericData.Record avroRecord = readAvroRecord(stream, writerSchema.getSchema(), readerSchema.getSchema()); return avroRecord; } catch (IOException e) { throw new RuntimeException(e); } } private int readSchemaId(InputStream stream ) throws IOException { try(DataInputStream is = new DataInputStream(stream)) { return is.readInt(); } } private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); GenericData.Record record = new GenericData.Record(readerSchema); datumReader.read(record, decoder); return record; }
Работа със специфични записи
По-често има един клас, който искаме да използваме за нашите записи. След това този клас обикновено се генерира от схема на Avro. Apache Avro предоставя инструменти за генериране на Java код от схеми. Един такъв инструмент е плъгинът Avro Maven. Генерираните класове имат схемата, от която са генерирани, налична по време на изпълнение. Това прави сериализацията и десериализацията по-лесни и по-ефективни. За сериализация можем да използваме класа, за да разберем за идентификатора на схемата, който да използваме.
@Override public void configure(Map<String, ?> configs, boolean isKey) { String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString(); try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) { Class<?> recordClass = Class.forName(className); Schema writerSchema = new SpecificData(recordClass.getClassLoader()).getSchema(recordClass); this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId(); } catch (Exception e) { throw new RuntimeException(e); } }
По този начин нямаме нужда от логика, за да определим схемата от тема и данни. Използваме схемата, налична в класа на запис, за да пишем записи.
По същия начин, за десериализация, схемата на четеца може да бъде открита от самия клас. Логиката на десериализация става по-проста, тъй като схемата на четеца е фиксирана по време на конфигурация и не е необходимо да се търси по име на схемата.
@Override public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); return readAvroRecord(stream, writerSchema.getSchema(), readerSchema); } catch (IOException e) { throw new RuntimeException(e); } } private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); return datumReader.read(null, decoder); }
Допълнително четене
За повече информация относно съвместимостта на схемите вижте спецификацията на Avro за разделителна способност на схемата.
За повече информация относно каноничните форми направете справка със спецификацията на Avro за синтактичен анализ на канонична форма за схеми.
Следващия път...
Част 2 ще покаже реализация на система за съхраняване на дефинициите на схемата Avro.