HBase
 sql >> база данни >  >> NoSQL >> HBase

Как да:Сканирайте Salted Apache HBase таблици със специфични за регион ключови диапазони в MapReduce

Благодарим на Pengyu Wang, софтуерен разработчик във FINRA, за разрешението да публикувам повторно тази публикация.

Salted Apache HBase таблици с предварително разделяне е доказано ефективно HBase решение за осигуряване на равномерно разпределение на работното натоварване между регионалните сървъри и предотвратяване на горещи точки по време на групово записване. В този дизайн се прави ключ за ред с логически ключ плюс сол в началото. Един от начините за генериране на сол е чрез изчисляване на n (брой региони) по модул на хеш кода на логическия ключ на ред (дата и т.н.).

Ключи на реда за осоляване

Например, таблица, приемаща ежедневно натоварване на данни, може да използва логически ключове на редове, започващи с дата, и ние искаме предварително да разделим тази таблица на 1000 региона. В този случай очакваме да генерираме 1000 различни соли. Солта може да бъде генерирана, например, като:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey =2015-04-26|abcrowKey =893|2015-04-26|abc

Резултатът от hashCode() с модуло осигурява случайност за стойността на солта от “000” до “999”. С тази ключова трансформация таблицата е предварително разделена на границите на солта, когато е създадена. Това ще направи обемите на редовете равномерно разпределени, докато зареждате HFiles с MapReduce bulkload. Това гарантира, че клавишите на редове със същата сол попадат в същия регион.

В много случаи на използване, като архивиране на данни, трябва да сканирате или копирате данните в определен диапазон на логически ключ (диапазон от време), като използвате заданието MapReduce. Стандартната таблица MapReduce задания се настройват чрез предоставяне на Scan екземпляр с атрибути на ключови диапазони.

Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);scan.setStartRow(Bytes.toBytes("2015- 04-26"));scan.setStopRow(Bytes.toBytes("2015-04-27"));/* Настройте задачата за картографиране на таблици */TableMapReduceUtil.initTableMapperJob(tablename,scan,DataScanMapper.class,ImmutableBytesWritable.class KeyValue.class,job, true, TableInputFormat.class);…

Въпреки това, настройката на такава работа става предизвикателство за осолените предварително разделени маси. Клавишите за стартиране и спиране на реда ще бъдат различни за всеки регион, тъй като всеки има уникална сол. И не можем да посочим множество диапазони за едно Scan пример.

За да разрешим този проблем, трябва да разгледаме как работи таблицата MapReduce. Като цяло рамката MapReduce създава една задача за карта за четене и обработка на всяко входно разделяне. Всяко разделяне се генерира в InputFormat база клас, чрез метод getSplits() .

В заданието MapReduce таблица на HBase, TableInputFormat се използва като InputFormat . Вътре в реализацията, getSplits() методът е отменен, за да извлече началния и стоп ключовете на реда от Scan екземпляр. Тъй като клавишите за старт и стоп се простират в множество региони, диапазонът е разделен на границите на региона и връща списъка с TableSplit обекти, които покриват диапазона на ключовете за сканиране. Вместо да се базира на HDFS блок, TableSplit s са базирани на региона. Чрез презаписване на getSplits() метод, ние сме в състояние да контролираме TableSplit .

Изграждане на персонализиран TableInputFormat

За да промените поведението на getSplits() метод, персонализиран клас, разширяващ TableInputFormat изисква се. Целта на getSplits() тук трябва да покриете диапазона на логически ключове във всеки регион, да изградите техния диапазон от ключове на редове с тяхната уникална сол. Класът HTable предоставя метод getStartEndKeys() който връща начален и краен ред ключове за всеки регион. От всеки стартов ключ анализирайте съответната сол за региона.

Сдвоете ключове =table.getStartEndKeys();for (int i =0; i  

Конфигурацията на заданието преминава логически ключов диапазон

TableInputFormat извлича ключа за стартиране и спиране от Scan екземпляр. Тъй като не можем да използваме Scan в нашата задача MapReduce бихме могли да използваме Configuration вместо това да се предадат тези две променливи и само логическият ключ за стартиране и спиране е достатъчно добър (променлива може да бъде дата или друга бизнес информация). getSplits() методът има JobContext аргумент, Конфигурационният екземпляр може да се чете като context.getConfiguration() .

В драйвера на MapReduce:

Конфигурация conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop" ", "2015-04-27");

В Custom TableInputFormat :

@Override public List getSplits(JobContext context) хвърля IOException {conf =context.getConfiguration();String scanStart =conf.get("logical.scan.start");String scanStop =conf.get("logical.scan). .стоп");…}

Реконструирайте обхвата на солените ключове по регион

Сега, когато имаме сол и логически ключ за стартиране/стоп за всеки регион, можем да възстановим действителния диапазон от ключове за редове.

byte[] startRowKey =Bytes.toBytes(regionSalt + "|" + scanStart);byte[] endRowKey =Bytes.toBytes(regionSalt + "|" + scanStop);

Създаване на разделяне на таблица за всеки регион

С диапазона на ключовете за редове вече можем да инициализираме TableSplit екземпляр за региона.

Списък разделя =нов ArrayList(keys.getFirst().length);for (int i =0; i  

Още едно нещо, което трябва да разгледате, е локалността на данните. Рамката използва информация за местоположението във всеки входен раздел, за да присвои задача за карта в своя локален хост. За нашия TableInputFormat , ние използваме метода getTableRegionLocation() за да извлечете местоположението на региона, обслужващ клавиша за ред.

След това това местоположение се предава на TableSplit конструктор. Това ще гарантира, че картографът, който обработва разделянето на таблицата, е на същия сървър на региона. Един метод, наречен DNS.reverseDns() , изисква адреса за сървъра за имена на HBase. Този атрибут се съхранява в конфигурация „hbase.nameserver.address “.

this.nameServer =context.getConfiguration().get("hbase.nameserver.address", null);...public String getTableRegionLocation(HTable table, byte[] rowKey) хвърля IOException {HServerAddress regionServerAddress =table.getRegionLocation(rowKey) ).getServerAddress();InetAddress regionAddress =regionServerAddress.getInetSocketAddress().getAddress();String regionLocation;try {regionLocation =reverseDNS(regionAddress);} catch (NamingException e) {regionLocation =regionServerAddress.getHostturnname region(Location); }protected String reverseDNS(InetAddress ipAddress) хвърля NamingException {String hostName =this.reverseDNSCacheMap.get(ipAddress);if (hostName ==null) {hostName =Strings.domainNamePointerToHostName(DNS.reverseDns. this);thipServer .reverseDNSCacheMap.put(ipAddress, hostName);}return hostName;}

Пълен код на getSplits ще изглежда така:

@Override public List getSplits(JobContext context) хвърля IOException {conf =context.getConfiguration();table =getHTable(conf);if (table ==null) {throw new IOException("Няма предоставена таблица.");}// Вземете адреса на сървъра за имена и стойността по подразбиране е null.this.nameServer =conf.get("hbase.nameserver.address", null);String scanStart =conf.get("region.scan.start");String scanStop =conf.get("region.scan.stop");Сдвояване ключове =table.getStartEndKeys();if (ключове ==null || keys.getFirst() ==null || keys.getFirst(). дължина ==0) {throw new RuntimeException("Очаква се поне един регион");}List splits =new ArrayList(keys.getFirst().length);for (int i =0; i  

Използвайте Custom TableInoutFormat в драйвера MapReduce

Сега трябва да заменим TableInputFormat клас с персонализираната компилация, която използвахме за настройка на заданието на таблицата MapReduce.

Configuration conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);HTableInterface status_table =new HTable(conf, status_tablename);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop", "2015-04-27");Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);/* Настройте задачата за картографиране на таблици */TableMapReduceUtil.initTableMapperJob(tablename,scan,DataScanMapper.class,ImmutableBytesWritable.class,KeyValue.class,job, true, MultiRangeT); 

Подходът на персонализирания TableInputFormat осигурява ефективно и мащабируемо сканиране за HBase таблици, които са проектирани да използват сол за балансирано натоварване на данни. Тъй като сканирането може да заобиколи всички несвързани ключове на редове, независимо колко голяма е таблицата, сложността на сканирането е ограничена само до размера на целевите данни. В повечето случаи на използване това може да гарантира относително последователно време за обработка с нарастването на таблицата.


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Преобразуване на HBase ACL в политики на Ranger

  2. Екосистема Hadoop – Въведение в компонентите на Hadoop

  3. 6 най-добри техники за оптимизация на работа в MapReduce

  4. Какво е автоматичен отказ при отказ в NameNode в Hadoop HDFS?

  5. Настройка на производителността в MapReduce за подобряване на производителността