HBase
 sql >> база данни >  >> NoSQL >> HBase

Spark-on-HBase:HBase конектор, базиран на DataFrame

Тази публикация в блога е публикувана на Hortonworks.com преди сливането с Cloudera. Някои връзки, ресурси или препратки може вече да не са точни.

Горди сме да обявим техническата визуализация на Spark-HBase Connector, разработен от Hortonworks, работейки с Bloomberg.

Конекторът Spark-HBase използва API за източник на данни (SPARK-3247), въведен в Spark-1.2.0. Той преодолява пропастта между простото хранилище на HBase Key Value и сложните релационни SQL заявки и позволява на потребителите да извършват сложни анализи на данни върху HBase, използвайки Spark. HBase DataFrame е стандартен Spark DataFrame и е в състояние да взаимодейства с всякакви други източници на данни като Hive, ORC, Parquet, JSON и др.

Фон

Има няколко конектора Spark HBase с отворен код, налични или като пакети Spark, като независими проекти или в магистрала на HBase.

Spark се премести към приложните програмни интерфейси (API) Dataset/DataFrame, които осигуряват вградена оптимизация на плана за заявки. Сега крайните потребители предпочитат да използват интерфейс, базиран на DataFrames/Datasets.

HBase конекторът в магистрала HBase има богата поддръжка на ниво RDD, напр. BulkPut и т.н., но поддръжката на DataFrame не е толкова богата. HBase trunk конекторът разчита на стандартния HadoopRDD с вградения HBase TableInputFormat има някои ограничения на производителността. В допълнение, BulkGet, извършен в драйвера, може да бъде единична точка на отказ.

Има някои други алтернативни реализации. Вземете Spark-SQL-on-HBase като пример. Той прилага много усъвършенствани техники за персонализирана оптимизация, като вгражда свой собствен план за оптимизация на заявки в стандартния двигател Spark Catalyst, изпраща RDD към HBase и изпълнява сложни задачи, като частично агрегиране, вътре в HBase копроцесора. Този подход е в състояние да постигне висока производителност, но е труден за поддържане поради неговата сложност и бързата еволюция на Spark. Също така позволяването на произволен код да се изпълнява в копроцесор може да представлява рискове за сигурността.

Конекторът Spark-on-HBase (SHC) е разработен за преодоляване на тези потенциални тесни места и слабости. Той внедрява стандартния API на Spark Datasource и използва двигателя Spark Catalyst за оптимизиране на заявките. Успоредно с това, RDD се изгражда от нулата, вместо да се използва TableInputFormat с цел постигане на висока производителност. С този персонализиран RDD всички критични техники могат да бъдат приложени и напълно приложени, като изрязване на дялове, изрязване на колони, избутване на предикати и локализиране на данни. Дизайнът прави поддръжката много лесна, като същевременно се постига добър компромис между производителност и простота.

Архитектура

Предполагаме, че Spark и HBase са разположени в един и същ клъстер, а изпълнителите на Spark са разположени съвместно с регионалните сървъри, както е показано на фигурата по-долу.

Фигура 1. Архитектура на конектора Spark-on-HBase

На високо ниво конекторът третира както Scan, така и Get по подобен начин и и двете действия се изпълняват в изпълнителите. Драйверът обработва заявката, обобщава сканирания/получавания въз основа на метаданните на региона и генерира задачи за регион. Задачите се изпращат до предпочитаните изпълнители, разположени съвместно с регионалния сървър, и се изпълняват паралелно в изпълнителите, за да се постигне по-добро локализиране на данните и едновременност. Ако даден регион не съдържа необходимите данни, на този регионален сървър не е възложена никаква задача. Задачата може да се състои от множество сканирания и BulkGets, а заявките за данни от задача се извличат само от един регионален сървър и този регионален сървър също ще бъде предпочитанието за локализиране за задачата. Имайте предвид, че драйверът не участва в реалното изпълнение на заданието, освен в задачите за планиране. Това избягва водачът да бъде тесното място.

Каталог на таблици

За да приведем таблицата HBase като релационна таблица в Spark, ние дефинираме съпоставяне между таблици HBase и Spark, наречено Таблица Catalog. Има две критични части от този каталог. Едната е дефиницията на rowkey, а другата е съпоставянето между колоната на таблицата в Spark и семейството на колоните и квалификатора на колони в HBase. Моля, вижте раздела Използване за подробности.

Нативна поддръжка на Avro

Конекторът поддържа формата Avro изначално, тъй като е много често срещана практика да се съхраняват структурирани данни в HBase като масив от байтове. Потребителят може да запази записа Avro директно в HBase. Вътрешно схемата Avro се преобразува автоматично в роден тип данни на Spark Catalyst. Имайте предвид, че и двете части ключ-стойност в таблица на HBase могат да бъдат дефинирани във формат Avro. Моля, вижте примерите/тестовите случаи в репото за точна употреба.

Избутване на предикат

Конекторът извлича само необходимите колони от сървъра на региона, за да намали мрежовите разходи и да избегне излишната обработка в двигателя Spark Catalyst. Съществуващите стандартни HBase филтри се използват за извършване на предикат натискане надолу, без да се използва възможността на копроцесора. Тъй като HBase не знае за типа данни с изключение на байтовия масив и несъответствието на реда между примитивните типове на Java и масива от байтове,  трябва да обработим предварително условието на филтъра, преди да зададем филтъра в операцията Scan, за да избегнем загуба на данни. Вътре в сървъра на региона се филтрират записи, които не отговарят на условието на заявката.

Подрязване на дял

Чрез извличане на ключа на реда от предикатите, ние разделяме Scan/BulkGet на множество неприпокриващи се диапазони, само сървърите на региона, които имат исканите данни, ще изпълняват Scan/BulkGet. Понастоящем изрязването на дяла се извършва върху първото измерение на ключовете на редовете. Например, ако ключът на реда е „ключ1:ключ2:ключ3“, подрязването на дяла ще се основава само на „ключ1“. Имайте предвид, че условията WHERE трябва да бъдат дефинирани внимателно. В противен случай подрязването на дяла може да не влезе в сила. Например, WHERE rowkey1> “abc” OR column =“xyz” (където rowkey1 е първото измерение на rowkey, а колоната е обикновена колона hbase) ще доведе до пълно сканиране, тъй като трябва да покрием всички диапазони, тъй като на ИЛИ логика.

Местоположение на данните

Когато изпълнител на Spark е локализиран съвместно със сървърите на региона на HBase, локалността на данните се постига чрез идентифициране на местоположението на сървъра на региона и полага максимални усилия да локализира задачата заедно със сървъра на региона. Всеки изпълнител изпълнява Scan/BulkGet върху частта от данните, разположени съвместно на същия хост.

Сканиране и BulkGet

Тези два оператора са изложени на потребителите чрез посочване на КЛАУЗА WHERE, напр., WHERE колона> x и колона за сканиране и колона WHERE =x за получаване. Операциите се извършват в изпълнителите, а водачът само конструира тези операции. Вътрешно те се преобразуват за сканиране и/или получаване, а Iterator[Row] се връща към катализаторната машина за обработка на горния слой.

Употреба

По-долу е илюстрирана основната процедура за използване на конектора. За повече подробности и разширен случай на употреба, като поддръжка на Avro и композитен ключ, моля, вижте примерите в хранилището.

1) Дефинирайте каталога за картографиране на схемата:

[code language="scala"]def catalog =s"""{        |"table":{"namespace":"default", "name":"table1"},        |"rowkey":"key" ,        |"columns":{          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},          |"col1":{"cf":"cf1 ", "col":"col1", "type":"boolean"},          |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"},          |"col4":{"cf":"cf4", "col":" col4", "type":"int"},          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},          |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"},          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}        |}      |}""".stripMargin[/code] 

2) Подгответе данните и попълнете таблицата HBase:
case class HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

обект HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}”””       HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s”String$i:$t”,      i.toByte) }}

val data =(0 до 255).map { i =>  HBaseRecord(i, “extra”)}

sc.parallelize(data).toDF.write.options(
 Карта(HBaseTableCatalog.tableCatalog -> каталог, HBaseTableCatalog.newTable -> „5”))
 .format(“org.apache.spark. sql.execution.datasources.hbase”)
 .save()
 
3) Заредете DataFrame:
def withCatalog(cat:String):DataFrame ={
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( “org.apache.spark.sql.execution.datasources.hbase”)
 .load()
}

val df =withCatalog(каталог)

4) Езикова интегрирана заявка:
val s =df.filter((($”col0″ <=“row050″ &&$”col0”> “row040”) ||
 $”col0″ ===“row005” ||
 $”col0″ ===“row020” ||
 $”col0″ === “r20” ||
 $”col0″ <=“row005”) &&
 ($”col4″ ===1 ||
 $”col4″ ===42))
 .select(“col0”, “col1”, “col4”)
s .покажи

5) SQL заявка:
df.registerTempTable(“table”)
sqlContext.sql(“изберете брой (col1) от таблица”).show

Конфигуриране на Spark-Package

Потребителите могат да използват конектора Spark-on-HBase като стандартен пакет Spark. За да включите пакета във вашето приложение Spark, използвайте:

spark-shell, pyspark или spark-submit

> $SPARK_HOME/bin/spark-shell –пакети zhzhan:shc:0.0.11-1.6.1-s_2.10

Потребителите могат да включат пакета като зависимост и във вашия SBT файл. Форматът е spark-package-name:version

spDependencies +=“zhzhan/shc:0.0.11-1.6.1-s_2.10”

Изпълнява се в защитен клъстер

За да работи в клъстер с активиран Kerberos, потребителят трябва да включи свързани с HBase буркани в пътя на класа, тъй като извличането и подновяването на токен HBase се извършва от Spark и е независимо от конектора. С други думи, потребителят трябва да инициира средата по нормалния начин, или чрез kinit, или чрез предоставяне на principal/keytab. Следващите примери показват как да работите в защитен клъстер с режим на прежда-клиент и режим на клъстер на прежда. Имайте предвид, че SPARK_CLASSPATH трябва да бъде зададен и за двата режима, а примерният буркан е само заместител за Spark.

експортиране SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Да предположим, че hrt_qa е акаунт без глава, потребителят може да използва следната команда за kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 – брой-изпълнители 4 –драйвер-памет 512m –изпълнител-памет 512m –изпълнител-ядра 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –пакети zhzhan:shc:0.0.11-1.6.1-s_2.10 –брой-изпълнители 4 –драйвер-памет 512m –изпълнител-памет 512m –изпълнител-ядра 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Обединяване на всичко

Току-що дадохме бърз преглед на това как HBase поддържа Spark на ниво DataFrame. С DataFrame API приложенията Spark могат да работят с данни, съхранявани в таблицата на HBase, толкова лесно, колкото всички данни, съхранявани в други източници на данни. С тази нова функция данните в HBase таблици могат лесно да се консумират от Spark приложения и други интерактивни инструменти, напр. потребителите могат да изпълняват сложна SQL заявка върху HBase таблица вътре в Spark, да извършват присъединяване на таблица към Dataframe или да се интегрират със Spark Streaming, за да внедрят по-сложна система.

Какво следва?

Понастоящем конекторът се хоства в репозиторията на Hortonworks и се публикува като пакет Spark. Той е в процес на мигриране към Apache HBase trunk. По време на миграцията идентифицирахме някои критични грешки в багажника на HBase и те ще бъдат коригирани заедно със сливането. Работата в общността се проследява от HBase JIRA HBASE-14789, включително HBASE-14795 и HBASE-14796  за оптимизиране на основната изчислителна архитектура за Scan и BulkGet,  HBASE-14801 за предоставяне на JSON потребителски интерфейс за лесна употреба, HBASE- за пътя за запис на DataFrame, HBASE-15334 за поддръжка на Avro, HBASE-15333  за поддържане на примитивни типове на Java, като short, int, long, float и double и т.н., HBASE-15335 за поддръжка на композитен ключ за ред и HBASE-15572 за добавяне на незадължителна семантика на времеви печат. Очакваме с нетърпение да изготвим бъдеща версия на конектора, която прави работата с конектора още по-лесна.

Потвърждение

Искаме да благодарим на Хамел Котари, Сударшан Кадамби и екипа на Bloomberg, че ни насочиха в тази работа и също така ни помогнаха да потвърдим тази работа. Също така искаме да благодарим на общността на HBase, че ни предостави обратна връзка и направи това по-добро. И накрая, тази работа използва уроците от по-ранните интеграции на Spark HBase и искаме да благодарим на техните разработчици за проправянето на пътя.

Справка:

SHC:https://github.com/hortonworks/shc-release

Spark-package:http://spark-packages.org/package/zhzhan/shc

Apache HBase: https://hbase.apache.org/

Apache Spark:http://spark.apache.org/


  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Spark на HBase с Spark shell

  2. Как наистина работи мащабирането в Apache HBase

  3. Spark-on-HBase:HBase конектор, базиран на DataFrame

  4. HBase производителност CDH5 (HBase1) срещу CDH6 (HBase2)

  5. Apache HBase I/O – HFile