Тази публикация в блога е публикувана на Hortonworks.com преди сливането с Cloudera. Някои връзки, ресурси или препратки може вече да не са точни.
Горди сме да обявим техническата визуализация на Spark-HBase Connector, разработен от Hortonworks, работейки с Bloomberg.
Конекторът Spark-HBase използва API за източник на данни (SPARK-3247), въведен в Spark-1.2.0. Той преодолява пропастта между простото хранилище на HBase Key Value и сложните релационни SQL заявки и позволява на потребителите да извършват сложни анализи на данни върху HBase, използвайки Spark. HBase DataFrame е стандартен Spark DataFrame и е в състояние да взаимодейства с всякакви други източници на данни като Hive, ORC, Parquet, JSON и др.
Фон
Има няколко конектора Spark HBase с отворен код, налични или като пакети Spark, като независими проекти или в магистрала на HBase.
Spark се премести към приложните програмни интерфейси (API) Dataset/DataFrame, които осигуряват вградена оптимизация на плана за заявки. Сега крайните потребители предпочитат да използват интерфейс, базиран на DataFrames/Datasets.
HBase конекторът в магистрала HBase има богата поддръжка на ниво RDD, напр. BulkPut и т.н., но поддръжката на DataFrame не е толкова богата. HBase trunk конекторът разчита на стандартния HadoopRDD с вградения HBase TableInputFormat има някои ограничения на производителността. В допълнение, BulkGet, извършен в драйвера, може да бъде единична точка на отказ.
Има някои други алтернативни реализации. Вземете Spark-SQL-on-HBase като пример. Той прилага много усъвършенствани техники за персонализирана оптимизация, като вгражда свой собствен план за оптимизация на заявки в стандартния двигател Spark Catalyst, изпраща RDD към HBase и изпълнява сложни задачи, като частично агрегиране, вътре в HBase копроцесора. Този подход е в състояние да постигне висока производителност, но е труден за поддържане поради неговата сложност и бързата еволюция на Spark. Също така позволяването на произволен код да се изпълнява в копроцесор може да представлява рискове за сигурността.
Конекторът Spark-on-HBase (SHC) е разработен за преодоляване на тези потенциални тесни места и слабости. Той внедрява стандартния API на Spark Datasource и използва двигателя Spark Catalyst за оптимизиране на заявките. Успоредно с това, RDD се изгражда от нулата, вместо да се използва TableInputFormat с цел постигане на висока производителност. С този персонализиран RDD всички критични техники могат да бъдат приложени и напълно приложени, като изрязване на дялове, изрязване на колони, избутване на предикати и локализиране на данни. Дизайнът прави поддръжката много лесна, като същевременно се постига добър компромис между производителност и простота.
Архитектура
Предполагаме, че Spark и HBase са разположени в един и същ клъстер, а изпълнителите на Spark са разположени съвместно с регионалните сървъри, както е показано на фигурата по-долу.
Фигура 1. Архитектура на конектора Spark-on-HBase
На високо ниво конекторът третира както Scan, така и Get по подобен начин и и двете действия се изпълняват в изпълнителите. Драйверът обработва заявката, обобщава сканирания/получавания въз основа на метаданните на региона и генерира задачи за регион. Задачите се изпращат до предпочитаните изпълнители, разположени съвместно с регионалния сървър, и се изпълняват паралелно в изпълнителите, за да се постигне по-добро локализиране на данните и едновременност. Ако даден регион не съдържа необходимите данни, на този регионален сървър не е възложена никаква задача. Задачата може да се състои от множество сканирания и BulkGets, а заявките за данни от задача се извличат само от един регионален сървър и този регионален сървър също ще бъде предпочитанието за локализиране за задачата. Имайте предвид, че драйверът не участва в реалното изпълнение на заданието, освен в задачите за планиране. Това избягва водачът да бъде тесното място.
Каталог на таблици
За да приведем таблицата HBase като релационна таблица в Spark, ние дефинираме съпоставяне между таблици HBase и Spark, наречено Таблица Catalog. Има две критични части от този каталог. Едната е дефиницията на rowkey, а другата е съпоставянето между колоната на таблицата в Spark и семейството на колоните и квалификатора на колони в HBase. Моля, вижте раздела Използване за подробности.
Нативна поддръжка на Avro
Конекторът поддържа формата Avro изначално, тъй като е много често срещана практика да се съхраняват структурирани данни в HBase като масив от байтове. Потребителят може да запази записа Avro директно в HBase. Вътрешно схемата Avro се преобразува автоматично в роден тип данни на Spark Catalyst. Имайте предвид, че и двете части ключ-стойност в таблица на HBase могат да бъдат дефинирани във формат Avro. Моля, вижте примерите/тестовите случаи в репото за точна употреба.
Избутване на предикат
Конекторът извлича само необходимите колони от сървъра на региона, за да намали мрежовите разходи и да избегне излишната обработка в двигателя Spark Catalyst. Съществуващите стандартни HBase филтри се използват за извършване на предикат натискане надолу, без да се използва възможността на копроцесора. Тъй като HBase не знае за типа данни с изключение на байтовия масив и несъответствието на реда между примитивните типове на Java и масива от байтове, трябва да обработим предварително условието на филтъра, преди да зададем филтъра в операцията Scan, за да избегнем загуба на данни. Вътре в сървъра на региона се филтрират записи, които не отговарят на условието на заявката.
Подрязване на дял
Чрез извличане на ключа на реда от предикатите, ние разделяме Scan/BulkGet на множество неприпокриващи се диапазони, само сървърите на региона, които имат исканите данни, ще изпълняват Scan/BulkGet. Понастоящем изрязването на дяла се извършва върху първото измерение на ключовете на редовете. Например, ако ключът на реда е „ключ1:ключ2:ключ3“, подрязването на дяла ще се основава само на „ключ1“. Имайте предвид, че условията WHERE трябва да бъдат дефинирани внимателно. В противен случай подрязването на дяла може да не влезе в сила. Например, WHERE rowkey1> “abc” OR column =“xyz” (където rowkey1 е първото измерение на rowkey, а колоната е обикновена колона hbase) ще доведе до пълно сканиране, тъй като трябва да покрием всички диапазони, тъй като на ИЛИ логика.
Местоположение на данните
Когато изпълнител на Spark е локализиран съвместно със сървърите на региона на HBase, локалността на данните се постига чрез идентифициране на местоположението на сървъра на региона и полага максимални усилия да локализира задачата заедно със сървъра на региона. Всеки изпълнител изпълнява Scan/BulkGet върху частта от данните, разположени съвместно на същия хост.
Сканиране и BulkGet
Тези два оператора са изложени на потребителите чрез посочване на КЛАУЗА WHERE, напр., WHERE колона> x и колона
Употреба
По-долу е илюстрирана основната процедура за използване на конектора. За повече подробности и разширен случай на употреба, като поддръжка на Avro и композитен ключ, моля, вижте примерите в хранилището.
1) Дефинирайте каталога за картографиране на схемата:
[code language="scala"]def catalog =s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key" , |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"cf1 ", "col":"col1", "type":"boolean"}, |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |"col4":{"cf":"cf4", "col":" col4", "type":"int"}, |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"}, |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |} |}""".stripMargin[/code]
2) Подгответе данните и попълнете таблицата HBase:
case class HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int, col5:Long, col6:Short, col7:String, col8:Byte)обект HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}””” HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s”String$i:$t”, i.toByte) }}
val data =(0 до 255).map { i => HBaseRecord(i, “extra”)}
sc.parallelize(data).toDF.write.options(
Карта(HBaseTableCatalog.tableCatalog -> каталог, HBaseTableCatalog.newTable -> „5”))
.format(“org.apache.spark. sql.execution.datasources.hbase”)
.save()
3) Заредете DataFrame:
def withCatalog(cat:String):DataFrame ={
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format( “org.apache.spark.sql.execution.datasources.hbase”)
.load()
}val df =withCatalog(каталог)
4) Езикова интегрирана заявка:
val s =df.filter((($”col0″ <=“row050″ &&$”col0”> “row040”) ||
$”col0″ ===“row005” ||
$”col0″ ===“row020” ||
$”col0″ === “r20” ||
$”col0″ <=“row005”) &&
($”col4″ ===1 ||
$”col4″ ===42))
.select(“col0”, “col1”, “col4”)
s .покажи5) SQL заявка:
df.registerTempTable(“table”)
sqlContext.sql(“изберете брой (col1) от таблица”).showКонфигуриране на Spark-Package
Потребителите могат да използват конектора Spark-on-HBase като стандартен пакет Spark. За да включите пакета във вашето приложение Spark, използвайте:
spark-shell, pyspark или spark-submit
> $SPARK_HOME/bin/spark-shell –пакети zhzhan:shc:0.0.11-1.6.1-s_2.10
Потребителите могат да включат пакета като зависимост и във вашия SBT файл. Форматът е spark-package-name:version
spDependencies +=“zhzhan/shc:0.0.11-1.6.1-s_2.10”
Изпълнява се в защитен клъстер
За да работи в клъстер с активиран Kerberos, потребителят трябва да включи свързани с HBase буркани в пътя на класа, тъй като извличането и подновяването на токен HBase се извършва от Spark и е независимо от конектора. С други думи, потребителят трябва да инициира средата по нормалния начин, или чрез kinit, или чрез предоставяне на principal/keytab. Следващите примери показват как да работите в защитен клъстер с режим на прежда-клиент и режим на клъстер на прежда. Имайте предвид, че SPARK_CLASSPATH трябва да бъде зададен и за двата режима, а примерният буркан е само заместител за Spark.
експортиране SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar
Да предположим, че hrt_qa е акаунт без глава, потребителят може да използва следната команда за kinit:
kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 – брой-изпълнители 4 –драйвер-памет 512m –изпълнител-памет 512m –изпълнител-ядра 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar
/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –пакети zhzhan:shc:0.0.11-1.6.1-s_2.10 –брой-изпълнители 4 –драйвер-памет 512m –изпълнител-памет 512m –изпълнител-ядра 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar
Обединяване на всичко
Току-що дадохме бърз преглед на това как HBase поддържа Spark на ниво DataFrame. С DataFrame API приложенията Spark могат да работят с данни, съхранявани в таблицата на HBase, толкова лесно, колкото всички данни, съхранявани в други източници на данни. С тази нова функция данните в HBase таблици могат лесно да се консумират от Spark приложения и други интерактивни инструменти, напр. потребителите могат да изпълняват сложна SQL заявка върху HBase таблица вътре в Spark, да извършват присъединяване на таблица към Dataframe или да се интегрират със Spark Streaming, за да внедрят по-сложна система.
Какво следва?
Понастоящем конекторът се хоства в репозиторията на Hortonworks и се публикува като пакет Spark. Той е в процес на мигриране към Apache HBase trunk. По време на миграцията идентифицирахме някои критични грешки в багажника на HBase и те ще бъдат коригирани заедно със сливането. Работата в общността се проследява от HBase JIRA HBASE-14789, включително HBASE-14795 и HBASE-14796 за оптимизиране на основната изчислителна архитектура за Scan и BulkGet, HBASE-14801 за предоставяне на JSON потребителски интерфейс за лесна употреба, HBASE- за пътя за запис на DataFrame, HBASE-15334 за поддръжка на Avro, HBASE-15333 за поддържане на примитивни типове на Java, като short, int, long, float и double и т.н., HBASE-15335 за поддръжка на композитен ключ за ред и HBASE-15572 за добавяне на незадължителна семантика на времеви печат. Очакваме с нетърпение да изготвим бъдеща версия на конектора, която прави работата с конектора още по-лесна.
Потвърждение
Искаме да благодарим на Хамел Котари, Сударшан Кадамби и екипа на Bloomberg, че ни насочиха в тази работа и също така ни помогнаха да потвърдим тази работа. Също така искаме да благодарим на общността на HBase, че ни предостави обратна връзка и направи това по-добро. И накрая, тази работа използва уроците от по-ранните интеграции на Spark HBase и искаме да благодарим на техните разработчици за проправянето на пътя.
Справка:
SHC:https://github.com/hortonworks/shc-release
Spark-package:http://spark-packages.org/package/zhzhan/shc
Apache HBase: https://hbase.apache.org/
Apache Spark:http://spark.apache.org/