Въведение
Python се използва широко сред инженерите по данни и специалистите по данни за решаване на всякакви проблеми от ETL/ELT тръбопроводи до изграждане на модели за машинно обучение. Apache HBase е ефективна система за съхранение на данни за много работни потоци, но достъпът до тези данни специално чрез Python може да бъде труден. За специалисти по данни, които искат да използват данните, съхранявани в HBase, скорошният проект „hbase-connectors“ може да се използва с PySpark за основни операции.
В тази серия от блогове ще обясним как да конфигурирате PySpark и HBase заедно за основно използване на Spark, както и за работни места, поддържани в CDSW. За тези, които не са запознати с CDSW, това е сигурна, самообслужваща се корпоративна платформа за наука за данни за учените за данни, които да управляват собствените си аналитични тръбопроводи, като по този начин ускоряват проектите за машинно обучение от проучване до производство. За повече информация относно CDSW посетете продуктовата страница на Cloudera Data Science Workbench.
В тази публикация ще бъдат обяснени и демонстрирани няколко операции заедно с примерен изход. За контекст, всички примерни операции в тази конкретна публикация в блога се изпълняват с разполагане на CDSW.
Предварителни условия:
- Имате CDP клъстер с HBase и Spark
- Ако ще следвате примери чрез CDSW, ще ви трябва инсталирането – Инсталиране на Cloudera Data Science Workbench
- Python 3 е инсталиран на всеки възел по същия път
Конфигурация:
Първо, HBase и Spark трябва да бъдат конфигурирани заедно, за да работят правилно Spark SQL заявките. За да направите това, има две части:първо, конфигурирайте сървърите на регион HBase чрез Cloudera Manager; и второ, уверете се, че времето за изпълнение на Spark има HBase връзки. Една забележка, която трябва да имате предвид обаче, е, че Cloudera Manager вече настройва някои конфигурационни и променливи на средата, за да насочва автоматично Spark към HBase вместо вас. Независимо от това, първата стъпка от конфигуриране на Spark SQL заявки е често срещана при всички типове внедряване в CDP клъстери, но втората е малко по-различна в зависимост от типа внедряване.
Конфигуриране на HBase регионални сървъри
- Отидете в Cloudera Manager и изберете услугата HBase.
- Търсете „среда на регионалния сървър“
- Добавете нова променлива на средата, като използвате фрагмента за разширена конфигурация на RegionServer Environment (предпазен клапан):
- Ключ:HBASE_CLASSPATH
- Стойност:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
Уверете се, че използвате подходящите номера на версиите.
- Рестартирайте регионалните сървъри.
След като изпълните горните стъпки, следвайте стъпките по-долу в зависимост от това дали искате разполагане с CDSW или без CDSW.
Добавяне на HBase обвързвания към Spark Runtime в разгръщания без CDSW
За да разположите обвивката или да използвате правилно spark-submit, използвайте следните команди, за да гарантирате, че spark има правилните HBase обвързвания.
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. буркан
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- засенчени.jar
Добавяне на HBase обвързване към Spark Runtime в CDSW внедрявания
За да конфигурирате CDSW с HBase и PySpark, трябва да предприемете няколко стъпки.
1) Уверете се, че Python 3 е инсталиран на всеки възел на клъстера и отбележете пътя към него
2) Направете нов проект в CDSW и използвайте шаблон PySpark
3) Отворете проекта, отидете на Settings -> Engine -> Environment Variables.
4) Задайте PYSPARK3_DRIVER_PYTHON и PYSPARK3_PYTHON до пътя, където Python е инсталиран на вашите възли на клъстер (Пътят, отбелязан в Стъпка 1).
По-долу е дадена извадка за това как трябва да изглежда.
5) Във вашия проект отидете на Files -> spark-defaults.conf и го отворете в Workbench
6) Копирайте и поставете реда по-долу в този файл и се уверете, че е запазен, преди да започнете нова сесия.
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
В този момент CDSW вече е конфигуриран да изпълнява задания на PySpark на HBase! Останалата част от тази публикация в блога се отнася до някои примерни операции при разполагане на CDSW.
Примерни операции
Постави операции
Има два начина за вмъкване и актуализиране на редове в HBase. Първият и най-препоръчан метод е да се изгради каталог, който е схема, която ще съпоставя колоните на таблица на HBase към рамка от данни на PySpark, като същевременно указва името на таблицата и пространството от имена. Изграждането на този дефиниран от потребителя JSON формат е най-предпочитаният метод, тъй като може да се използва и с други операции. За повече информация относно каталозите вижте тази документация http://hbase.apache.org/book.html#_define_catalog. Вторият метод използва специфичен параметър за съпоставяне, наречен „hbase.columns.mapping“, който просто приема низ от двойки ключ-стойност.
- Използване на каталози
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() tableCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empState":{"cf":"personal", "col":"empWeight", "type":"string"} } }""".split()) employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3])) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .options(catalog=tableCatalog, newTable=5) \ .option("hbase.spark.use.hbasecontext", False) \ .save() # newTable refers to the NumberOfRegions which has to be > 3
Уверете се, че в HBase е създадена нова таблица, наречена „tblEmployee“, като просто отворите обвивката на HBase и изпълните следната команда:
сканирайте ‘tblEmployee’, {‘LIMIT’ => 2}
Използването на каталози може също да ви позволи лесно да зареждате таблици на HBase. Това ще бъде обсъдено в следваща част.
- Използване на hbase.columns.mapping
Докато пишете PySpark Dataframe, може да се добави опция, наречена „hbase.columns.mapping“, за да включи низ, който картографира правилно колоните. Тази опция ви позволява само да вмъквате редове в съществуващи таблици.
В обвивката на HBase, нека първо създадем таблица за създаване на „tblEmployee2“, „personal“
Сега в PySpark нека вмъкнем 2 реда, използвайки „hbase.columns.mapping“
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3]))) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \ .option("hbase.table", "tblEmployee2") \ .option("hbase.spark.use.hbasecontext", False) \ .save()
Отново просто проверете дали нова таблица, наречена „tblEmployee2“, има тези нови редове.
сканирайте ‘tblEmployee2’, {‘LIMIT’ => 2}
Това завършва нашите примери за това как да вмъкнете редове чрез PySpark в HBase таблици. В следващата част ще обсъдя операциите за получаване и сканиране, PySpark SQL и някои начини за отстраняване на неизправности. Дотогава трябва да получите CDP клъстер и да си проправите път през тези примери.