Благодарим на Pengyu Wang, софтуерен разработчик във FINRA, за разрешението да публикувам повторно тази публикация.
Salted Apache HBase таблици с предварително разделяне е доказано ефективно HBase решение за осигуряване на равномерно разпределение на работното натоварване между регионалните сървъри и предотвратяване на горещи точки по време на групово записване. В този дизайн се прави ключ за ред с логически ключ плюс сол в началото. Един от начините за генериране на сол е чрез изчисляване на n (брой региони) по модул на хеш кода на логическия ключ на ред (дата и т.н.).
Ключи на реда за осоляване
Например, таблица, приемаща ежедневно натоварване на данни, може да използва логически ключове на редове, започващи с дата, и ние искаме предварително да разделим тази таблица на 1000 региона. В този случай очакваме да генерираме 1000 различни соли. Солта може да бъде генерирана, например, като:
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey =2015-04-26|abcrowKey =893|2015-04-26|abc
Резултатът от hashCode()
с модуло осигурява случайност за стойността на солта от “000” до “999”. С тази ключова трансформация таблицата е предварително разделена на границите на солта, когато е създадена. Това ще направи обемите на редовете равномерно разпределени, докато зареждате HFiles с MapReduce bulkload. Това гарантира, че клавишите на редове със същата сол попадат в същия регион.
В много случаи на използване, като архивиране на данни, трябва да сканирате или копирате данните в определен диапазон на логически ключ (диапазон от време), като използвате заданието MapReduce. Стандартната таблица MapReduce задания се настройват чрез предоставяне на Scan
екземпляр с атрибути на ключови диапазони.
Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);scan.setStartRow(Bytes.toBytes("2015- 04-26"));scan.setStopRow(Bytes.toBytes("2015-04-27"));/* Настройте задачата за картографиране на таблици */TableMapReduceUtil.initTableMapperJob(tablename,scan,DataScanMapper.class,ImmutableBytesWritable.class KeyValue.class,job, true, TableInputFormat.class);…
Въпреки това, настройката на такава работа става предизвикателство за осолените предварително разделени маси. Клавишите за стартиране и спиране на реда ще бъдат различни за всеки регион, тъй като всеки има уникална сол. И не можем да посочим множество диапазони за едно Scan
пример.
За да разрешим този проблем, трябва да разгледаме как работи таблицата MapReduce. Като цяло рамката MapReduce създава една задача за карта за четене и обработка на всяко входно разделяне. Всяко разделяне се генерира в InputFormat
база клас, чрез метод getSplits()
.
В заданието MapReduce таблица на HBase, TableInputFormat
се използва като InputFormat
. Вътре в реализацията, getSplits()
методът е отменен, за да извлече началния и стоп ключовете на реда от Scan
екземпляр. Тъй като клавишите за старт и стоп се простират в множество региони, диапазонът е разделен на границите на региона и връща списъка с TableSplit
обекти, които покриват диапазона на ключовете за сканиране. Вместо да се базира на HDFS блок, TableSplit
s са базирани на региона. Чрез презаписване на getSplits()
метод, ние сме в състояние да контролираме TableSplit
.
Изграждане на персонализиран TableInputFormat
За да промените поведението на getSplits()
метод, персонализиран клас, разширяващ TableInputFormat
изисква се. Целта на getSplits()
тук трябва да покриете диапазона на логически ключове във всеки регион, да изградите техния диапазон от ключове на редове с тяхната уникална сол. Класът HTable предоставя метод getStartEndKeys()
който връща начален и краен ред ключове за всеки регион. От всеки стартов ключ анализирайте съответната сол за региона.
Сдвоете ключове =table.getStartEndKeys();for (int i =0; iКонфигурацията на заданието преминава логически ключов диапазон
TableInputFormat
извлича ключа за стартиране и спиране отScan
екземпляр. Тъй като не можем да използвамеScan
в нашата задача MapReduce бихме могли да използвамеConfiguration
вместо това да се предадат тези две променливи и само логическият ключ за стартиране и спиране е достатъчно добър (променлива може да бъде дата или друга бизнес информация).getSplits()
методът имаJobContext
аргумент, Конфигурационният екземпляр може да се чете катоcontext.getConfiguration()
.В драйвера на MapReduce:
Конфигурация conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop" ", "2015-04-27");В
Custom TableInputFormat
:@Override public List getSplits(JobContext context) хвърля IOException {conf =context.getConfiguration();String scanStart =conf.get("logical.scan.start");String scanStop =conf.get("logical.scan). .стоп");…}Реконструирайте обхвата на солените ключове по регион
Сега, когато имаме сол и логически ключ за стартиране/стоп за всеки регион, можем да възстановим действителния диапазон от ключове за редове.
byte[] startRowKey =Bytes.toBytes(regionSalt + "|" + scanStart);byte[] endRowKey =Bytes.toBytes(regionSalt + "|" + scanStop);Създаване на разделяне на таблица за всеки регион
С диапазона на ключовете за редове вече можем да инициализираме
TableSplit
екземпляр за региона.Списък разделя =нов ArrayList(keys.getFirst().length);for (int i =0; iОще едно нещо, което трябва да разгледате, е локалността на данните. Рамката използва информация за местоположението във всеки входен раздел, за да присвои задача за карта в своя локален хост. За нашия
TableInputFormat
, ние използваме методаgetTableRegionLocation()
за да извлечете местоположението на региона, обслужващ клавиша за ред.След това това местоположение се предава на
TableSplit
конструктор. Това ще гарантира, че картографът, който обработва разделянето на таблицата, е на същия сървър на региона. Един метод, нареченDNS.reverseDns()
, изисква адреса за сървъра за имена на HBase. Този атрибут се съхранява в конфигурация „hbase.nameserver.address
“.this.nameServer =context.getConfiguration().get("hbase.nameserver.address", null);...public String getTableRegionLocation(HTable table, byte[] rowKey) хвърля IOException {HServerAddress regionServerAddress =table.getRegionLocation(rowKey) ).getServerAddress();InetAddress regionAddress =regionServerAddress.getInetSocketAddress().getAddress();String regionLocation;try {regionLocation =reverseDNS(regionAddress);} catch (NamingException e) {regionLocation =regionServerAddress.getHostturnname region(Location); }protected String reverseDNS(InetAddress ipAddress) хвърля NamingException {String hostName =this.reverseDNSCacheMap.get(ipAddress);if (hostName ==null) {hostName =Strings.domainNamePointerToHostName(DNS.reverseDns. this);thipServer .reverseDNSCacheMap.put(ipAddress, hostName);}return hostName;}Пълен код на
getSplits
ще изглежда така:@Override public List getSplits(JobContext context) хвърля IOException {conf =context.getConfiguration();table =getHTable(conf);if (table ==null) {throw new IOException("Няма предоставена таблица.");}// Вземете адреса на сървъра за имена и стойността по подразбиране е null.this.nameServer =conf.get("hbase.nameserver.address", null);String scanStart =conf.get("region.scan.start");String scanStop =conf.get("region.scan.stop");Сдвояване ключове =table.getStartEndKeys();if (ключове ==null || keys.getFirst() ==null || keys.getFirst(). дължина ==0) {throw new RuntimeException("Очаква се поне един регион");}List splits =new ArrayList(keys.getFirst().length);for (int i =0; iИзползвайте Custom TableInoutFormat в драйвера MapReduce
Сега трябва да заменим
TableInputFormat
клас с персонализираната компилация, която използвахме за настройка на заданието на таблицата MapReduce.Configuration conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);HTableInterface status_table =new HTable(conf, status_tablename);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop", "2015-04-27");Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);/* Настройте задачата за картографиране на таблици */TableMapReduceUtil.initTableMapperJob(tablename,scan,DataScanMapper.class,ImmutableBytesWritable.class,KeyValue.class,job, true, MultiRangeT);Подходът на персонализирания
TableInputFormat
осигурява ефективно и мащабируемо сканиране за HBase таблици, които са проектирани да използват сол за балансирано натоварване на данни. Тъй като сканирането може да заобиколи всички несвързани ключове на редове, независимо колко голяма е таблицата, сложността на сканирането е ограничена само до размера на целевите данни. В повечето случаи на използване това може да гарантира относително последователно време за обработка с нарастването на таблицата.