3. atlas的使用
Atlas 的使用相对简单,其主要工作是同步各服务(主要是 Hive)的元数据,并构建元数据实体之间的关联关系,然后对所存储的元数据建立索引,最终未用户提供数据血缘查看及元数据检索等功能。
Atlas 在安装之初,需手动执行一次元数据的全量导入,后续 Atlas 便会利用 Hive Hook 增量同步 Hive 的元数据。
手动导入hbase的元数据,与spark任务相关联获取数据血缘关系。
3.1. Atlas集成hive
3.1.1. 修改atlas配置
修改/opt/atlas/conf/atlas-application.properties 配置文件中的以下参数
######### Hive Hook Configs #######
atlas.hook.hive.synchronous=false
atlas.hook.hive.numRetries=3
atlas.hook.hive.queueSize=10000
atlas.cluster.name=primary
3.1.2. 修改 Hive 配置文件
在/opt/hive/conf/hive-site.xml 文件中增加以下参数,配置 Hive Hook
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
3.1.3. 安装 Hive Hook
1)解压 Hive Hook
tar -zxvf apache-atlas-2.1.0-hive-hook.tar.gz
2)将 Hive Hook 依赖复制到 Atlas 安装路径
cp -r apache-atlas-hive-hook-2.1.0/* /opt/atlas/
3)修改/opt/hive/conf/hive-env.sh 配置文件
注:需先需改文件名
mv hive-env.sh.template hive-env.sh
增加如下参数
export HIVE_AUX_JARS_PATH=/opt/atlas/hook/hive
4)将 Atlas 配置文件/opt/atlas/conf/atlas-application.properties
拷贝到/opt/hive/conf 目录
cp /opt/atlas/conf/atlas-application.properties /opt/hive/con
f/
3.1.4. 重启atlas与hive
停止atlas
/opt/atlas/bin/atlas_stop.py
停止hive,先用jps查询相关RunJar的进程,然后将对应的进程杀死。
kill -9 37163 36730
启动hive
nohup hive --service metastore &
nohup hive --service hiveserver2 &
启动atlas
/opt/atlas/bin/atlas_start.py
3.1.5. Hive元数据初次导入
Atlas 提供了一个 Hive 元数据导入的脚本,直接执行该脚本,即可完成 Hive 元数据的初次全量导入。
- 导入 Hive 元数据,执行以下命令
/opt/atlas/hook-bin/import-hive.sh
按提示输入用户名:admin;输入密码:admin
Enter username for atlas :- admin
Enter password for atlas :-
等待片刻,出现以下日志,即表明导入成功
- 查看 Hive 元数据
1)搜索 hive_table 类型的元数据,可已看到 Atlas 已经拿到了 Hive 元数据.
登入atlas页面, admin -> Switch to New:
可以查看到hive_table相关的表:
2)任选一张表查看血缘依赖关系
发现此时并未出现期望的血缘依赖,原因是 Atlas 是根据 Hive 所执行的 SQL 语句获取 表与表之间以及字段与字段之间的依赖关系的,例如执行 insert into table_a select * from table_b 语句,Atlas 就能获取 table_a 与 table_b 之间的依赖关系。此时并未执行任何 SQL 语 句,故还不能出现血缘依赖关系。
3.1.6. Hive 元数据增量同步
Hive 元数据的增量同步,无需人为干预,只要 Hive 中的元数据发生变化(执行 DDL 语 句),Hive Hook 就会将元数据的变动通知 Atlas。除此之外,Atlas 还会根据 DML 语句获取.数据之间的血缘关系。
3.1.7. 生成血缘依赖
为查看血缘关系效果,在 hive 里面创建两张数据表。
1.建表语句:
1) 订单事实表
CREATE TABLE dwd_order_info (
`id` STRING COMMENT '订单号',
`final_amount` DECIMAL(16,2) COMMENT '订单最终金额',
`order_status` STRING COMMENT '订单状态',
`user_id` STRING COMMENT '用户 id',
`payment_way` STRING COMMENT '支付方式',
`delivery_address` STRING COMMENT '送货地址',
`out_trade_no` STRING COMMENT '支付流水号',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '操作时间',
`expire_time` STRING COMMENT '过期时间',
`tracking_no` STRING COMMENT '物流单编号',
`province_id` STRING COMMENT '省份 ID',
`activity_reduce_amount` DECIMAL(16,2) COMMENT '活动减免金额',
`coupon_reduce_amount` DECIMAL(16,2) COMMENT '优惠券减免金额',
`original_amount` DECIMAL(16,2) COMMENT '订单原价金额',
`feight_fee` DECIMAL(16,2) COMMENT '运费',
`feight_fee_reduce` DECIMAL(16,2) COMMENT '运费减免'
) COMMENT '订单表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
2) 地区维度表
CREATE TABLE dim_base_province (
`id` STRING COMMENT '编号',
`name` STRING COMMENT '省份名称',
`region_id` STRING COMMENT '地区 ID',
`area_code` STRING COMMENT '地区编码',
`iso_code` STRING COMMENT 'ISO-3166 编码,供可视化使用',
`iso_3166_2` STRING COMMENT 'IOS-3166-2 编码,供可视化使用'
) COMMENT '省份表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
2.数据装载
将资料里面提前准备好的数据 order_info.txt 和 base_province.txt 上传到两张 hive 表的 hdfs 路径下。
此处提供数据下载链接:
链接:https://pan.baidu.com/s/14Xj-1toobJt1iseOCR9ftQ
提取码:fan3
3.需求指标
1)根据订单事实表和地区维度表,求出每个省份的订单次数和订单金额
2)建表语句 :
CREATE TABLE `ads_order_by_province` (
`dt` STRING COMMENT '统计日期',
`province_id` STRING COMMENT '省份 id',
`province_name` STRING COMMENT '省份名称',
`area_code` STRING COMMENT '地区编码',
`iso_code` STRING COMMENT '国际标准地区编码',
`iso_code_3166_2` STRING COMMENT '国际标准地区编码',
`order_count` BIGINT COMMENT '订单数',
`order_amount` DECIMAL(16,2) COMMENT '订单金额'
) COMMENT '各省份订单统计' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
3)数据装载
insert into table ads_order_by_province
select
'2021-08-30' dt,
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.iso_3166_2,
count(*) order_count,
sum(oi.final_amount) order_amount
from dwd_order_info oi
left join dim_base_province bp
on oi.province_id=bp.id
group by bp.id,bp.name,bp.area_code,bp.iso_code,bp.iso_3166_2;
3.1.8. 查看血缘依赖
此时再通过 Atlas 查看 Hive 元数据,即可发现血缘依赖图
1) 表血缘依赖
2) 字段血缘依赖在这里插入图片描述
3.2 atlas集成hbase
参考官网:https://atlas.apache.org/2.2.0/index.html#/HookHBase
3.2.1. 修改atlas配置
修改/opt/atlas/conf/atlas-application.properties 配置文件中的以下参数
######### hbase Hook Configs #######
atlas.hook.hbase.synchronous=false
atlas.hook.hbase.numRetries=3
atlas.hook.hbase.queueSize=10000
atlas.cluster.name=primary
atlas.kafka.zookeeper.connect=ds1:2181,ds2:2181,ds3:2181
atlas.kafka.zookeeper.connection.timeout.ms=30000
atlas.kafka.zookeeper.session.timeout.ms=60000
atlas.kafka.zookeeper.sync.time.ms=20
3.2.2. 修改hbase配置
修改/opt/hbase-2.4.0/conf下的hbase-site.xml配置文件,新增以下内容:
<property>
<name>hbase.coprocessor.master.classes</name>
<value>org.apache.atlas.hbase.hook.HBaseAtlasCoprocessor</value>
</property>
3.2.3. 安装 hbase Hook
1)解压 Hbase Hook
tar -zxvf apache-atlas-2.1.0-hbase-hook.tar.gz
2)将 Hive Hook 依赖复制到 Atlas 安装路径
cp -r apache-atlas-hbase-hook-2.1.0/* /opt/atlas/
3)将 Atlas 配置文件/opt/atlas/conf/atlas-application.properties
拷贝到/opt/hbase-2.4.0/conf 目录
cp /opt/atlas/conf/atlas-application.properties /opt/hbase-2.4.0/conf/
4)在hbase的lib目录下创建软连接
ln -s /opt/atlas/hook/hbase/* /opt/hbase-2.4.0/lib/
3.2.4. 分发hbase到ds2,ds3
因为hbase安装的是集群版本,所以需要将修改的相关配置分发到其他机器。
scp -r /opt/hbase-2.4.0 ds2:/opt
scp -r /opt/hbase-2.4.0 ds3:/opt
3.2.5. 重启atlas与hbase
停止atlas:
/opt/atlas/bin/atlas_stop.py
停止hbase:
stop-hbase.sh
启动habse
start-hbase.sh
启动atlas
/opt/atlas/bin/atlas_start.py
3.2.6. hbase元数据初次导入
Atlas 提供了一个 hbase元数据导入的脚本,直接执行该脚本,即可完成 Hive 元数据的初次全量导入。
- 导入 hbase元数据,执行以下命令
/opt/atlas/hook-bin/import-hbase.sh
按提示输入用户名:admin;输入密码:admin
等待片刻,出现以下日志,即表明导入成功
HBase Data Model imported successfully!!!
备注:
在执行import-hbase.sh时出现了如下报错:
2023-02-28 16:35:30,822 ERROR - [main:] ~ ImportHBaseEntities failed (HBaseBridge:197)
org.apache.atlas.AtlasException: Failed to load application properties
at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:147)
at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:100)
at org.apache.atlas.hbase.bridge.HBaseBridge.main(HBaseBridge.java:139)
Caused by: org.apache.commons.configuration.ConversionException: 'atlas.graph.index.search.solr.wait-searcher' doesn't map to a List object: true, a java.lang.Boolean
at org.apache.commons.configuration.AbstractConfiguration.getList(AbstractConfiguration.java:1144)
at org.apache.commons.configuration.AbstractConfiguration.getList(AbstractConfiguration.java:1109)
at org.apache.commons.configuration.AbstractConfiguration.interpolatedConfiguration(AbstractConfiguration.java:1274)
at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:142)
... 2 more
解决方案:
将/opt/atlas/hook/hbase/atlas-hbase-plugin-impl/commons-configuration-1.10.jar替换/opt/hbase-2.4.0/lib/commons-configuration-1.6.jar
后再执行则行。如果执行import-hive.sh也有类似的报错,则也是替换hive/lib下类型的jar包。
cp /opt/atlas/hook/hbase/atlas-hbase-plugin-impl/commons-configuration-1.10.jar /opt/hbase-2.4.0/lib
rm -f /opt/hbase-2.4.0/lib/commons-configuration-1.6.jar
3.2.7. 查看hbase元数据
搜索hbase_table 类型的元数据,可已看到 Atlas 已经拿到了 hbase元数据.
Atlas HBase hook暂时仅捕获命名空间、表和列族创建/更新/删除操作。捕获对列的更改。无法获取hbase的数据血缘。
3.3. atlas集成spark
3.3.1. 说明
atlas 官方文档中并不支持 spark sql 的解析,在 github 中有支持spark 解析的插件。
git地址: https://github.com/hortonworks-spark/spark-atlas-connector
此文档暂时只支持atlas 2.1.0 及以上版本 ,spark 2.3.0 及2.4.0版本。
如果是atlas 2.1.0以下版本参照git说明安装。
atlas暂时只能获取到spark的hive表级的数据血缘关系,暂时无法获取字段级的血缘关系。
3.3.2. 编译
建议采用maven 3.6及以上的版本进行编译,尝试用maven 3.5.4版本编译报错。
mvn package -DskipTests
编译后得到如下两个文件
spark-atlas-connector_2.11-0.1.0-SNAPSHOT.jar
spark-atlas-connector-assembly-0.1.0-SNAPSHOT.jar
提供编译后的下载包
链接:https://pan.baidu.com/s/1OJRzIvt7ovU70Q_4rh9Xkw
提取码:fan2
3.3.3. 上传编译后的文件
mkdir -p /opt/atlas/hook/spark
-
将spark-atlas-connector_2.11-0.1.0-SNAPSHOT.jar上传到mkdir -p /opt/atlas/hook/spark
目录下。
2)将spark-atlas-connector-assembly-0.1.0-SNAPSHOT.jar上传到/opt/spark-2.4.6-bin-hadoop2.7/jars目录下。
如果漏掉此步则在后面启动spark shell或者任务时则会报错。报错如下所示:
-
如果需要spark在yarn上执行,则修改/opt/atlas/conf/atlas-application.properties 配置文件中的以下参数
atlas.jaas.KafkaClient.loginModuleControlFlag=required
atlas.jaas.KafkaClient.loginModuleName=com.sun.security.auth.module.Krb5LoginModule
atlas.jaas.KafkaClient.option.keyTab=./a.keytab
atlas.jaas.KafkaClient.option.principal=[email protected]
atlas.jaas.KafkaClient.option.serviceName=kafka
atlas.jaas.KafkaClient.option.storeKey=true
atlas.jaas.KafkaClient.option.useKeyTab=true
将atlas-application.properties文件复制到spark的conf目录.
cp /opt/atlas/conf/atlas-application.properties /opt/spark-2.4.6-bin-hadoop2.7/conf
3.3.4. 分发atlas与spark
如果执行spark-submit提交到yarn中执行任务时,则需要把atlas与spark分发到其他机器.
scp -r /opt/atlas ds2:/opt
scp -r /opt/atlas ds3:/opt
scp -r /opt/spark-2.4.6-bin-hadoop2.7 ds2:/opt
scp -r /opt/spark-2.4.6-bin-hadoop2.7 ds3:/opt
3.3.5. 生成spark数据血缘
创建hive表,建表语句:
CREATE TABLE dwd_order_info_partition (
`id` STRING COMMENT '订单号',
`final_amount` DECIMAL(16,2) COMMENT '订单最终金额',
`order_status` STRING COMMENT '订单状态',
`user_id` STRING COMMENT '用户 id',
`payment_way` STRING COMMENT '支付方式',
`delivery_address` STRING COMMENT '送货地址',
`out_trade_no` STRING COMMENT '支付流水号',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '操作时间',
`expire_time` STRING COMMENT '过期时间',
`tracking_no` STRING COMMENT '物流单编号',
`province_id` STRING COMMENT '省份 ID',
`activity_reduce_amount` DECIMAL(16,2) COMMENT '活动减免金额',
`coupon_reduce_amount` DECIMAL(16,2) COMMENT '优惠券减免金额',
`original_amount` DECIMAL(16,2) COMMENT '订单原价金额',
`feight_fee` DECIMAL(16,2) COMMENT '运费',
`feight_fee_reduce` DECIMAL(16,2) COMMENT '运费减免'
) COMMENT '订单分区表'
partitioned by (dt STRING COMMENT '日期' )
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
3.3.5.1. spark shell生成数据血缘
在ds1服务器上执行如下命令打开spark shell的编辑命令:
spark-shell \
--jars /opt/atlas/hook/spark/spark-atlas-connector_2.11-0.1.0-SNAPSHOT.jar \
--conf spark.extraListeners=com.hortonworks.spark.atlas.SparkAtlasEventTracker \
--conf spark.sql.queryExecutionListeners=com.hortonworks.spark.atlas.SparkAtlasEventTracker \
--conf spark.sql.streaming.streamingQueryListeners=com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker
执行如下代码:
val sourceSql = "select * from atlas_test.dwd_order_info"
val sourceDF = spark.sql(sourceSql)
import org.apache.spark.sql.{SaveMode, SparkSession, functions}
lazy val getDt = functions.udf((timeStr: String) => timeStr.substring(0, 10).replace("-", ""))
val result = sourceDF.withColumn("dt",getDt('create_time))
result.write.mode(SaveMode.Overwrite).insertInto("atlas_test.dwd_order_info_partition")
查看atlas页面可以得到刚才执行的相关数据血缘关系
3.3.5.2. spark-submit生成数据血缘
创建一个maven工程,采用spark-submit的方式执行spark任务,如下所以
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 4g \
--num-executors 1 \
--executor-cores 1 \
--conf spark.network.timeout=10000000 \
--conf spark.dynamicAllocation.enabled=false \
--jars /opt/atlas/hook/spark/spark-atlas-connector_2.11-0.1.0-SNAPSHOT.jar \
--conf spark.extraListeners=com.hortonworks.spark.atlas.SparkAtlasEventTracker \
--conf spark.sql.queryExecutionListeners=com.hortonworks.spark.atlas.SparkAtlasEventTracker \
--conf spark.sql.streaming.streamingQueryListeners=com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker \
--files /opt/atlas/conf/atlas-application.properties \
--class com.atlas.Test1 \
lib/atlas_test.jar
查看数据血缘,如下图所示:
3.4. atlas RestApi部分说明
参数官网: https://atlas.apache.org/2.1.0/index.html#/RestApi
#查询所有Hive表
curl -s -u admin:admin “http://192.168.11.25:21000/api/atlas/v2/search/basic?typeName=hive_table”
#查询所有Hive表,且包含某一关键字
curl -s -u admin:admin “http://192.168.11.25:21000/api/atlas/v2/search/basic?query=dim_channel&typeName=hive_table”
#查询某一数据血缘
curl -s -u admin:admin “http://192.168.11.25:21000/api/atlas/v2/lineage/5f0142fd-ef3c-41fb-a103-35406ec19be2”