探索利用 LineageLogger 获取hive的字段级血缘关系

apache hive 源码中有 org.apache.hadoop.hive.ql.hooks.LineageLogger 类可以获取 insert hql 的字段之间的关系。但是又由于 org.apache.hadoop.hive.ql.optimizer.Optimizer的原因,使我们重写 hook 类无法实现字段级血缘。

  if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_LINEAGE_INFO) // 版本 4.0+加入
      || postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")
      || postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")
      // 版本 2.3 加入
      || postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {
    transformations.add(new Generator(postExecHooks));
  }

现在考虑通过LineageLogger 搭配日志监测服务来实现字段级血缘

  1. 加入插件 conf/hive-site.xml
  <property>
    <name>hive.exec.post.hooks</name>
    <value>org.apache.hadoop.hive.ql.hooks.LineageLogger</value>
  </property>
  1. 打开日志 conf/log4j.properties
log4j.logger.org.apache.hadoop.hive.ql.hooks.LineageLogger=INFO
  1. hive任务日志目录
>set system:hive.log.dir; # 服务日志

>set hive.querylog.location; #查询日志


/tmp/hive-{用户名}/

4.写脚本监测

# -*- coding: utf-8 -*-
import hashlib
import json
import os.path
from json import JSONDecodeError

import requests

log_path_list = [
    "/tmp/root/hive.log"
]


def read_hive_log(file_path):
    """
    读取Hive日志文件并返回包含关键词的行内容列表

    参数:
    file_path (str):Hive日志文件的路径

    返回:
    content (list):包含关键词的行内容json列表
    """
    save_dict = {}
    if os.path.exists('./hash_index.log'):
        try:
            with open("./hash_index.log", 'r') as f:
                file_content = f.read()
                if file_content != '':
                    save_dict = json.loads(file_content)
        except json.JSONDecodeError as e:
            print(f"无法将文件内容转换为JSON:{e}")

    new_file = log_path.split("/")[-1]


    if new_file in save_dict.keys():
        old_size = save_dict.get(new_file).get('size', 0)
        line_index = save_dict.get('index', 0)
    else:
        # print("此为新文件,从头开始读取")
        old_size = 0
        line_index = 0


    is_new_file = False
    try:
        new_size: int = os.path.getsize(file_path)
    except Exception as e:
        print("读取文件大小失败:", e)
        new_size = 0
    if (new_file not in save_dict.keys()) or (new_file in save_dict.keys() and (new_size < old_size or old_size == 0)):
        is_new_file = True

    content = []

    is_new_file_only_one = is_old_file_only_one = is_just_info_only_one = False
    try:
        with open(file_path, 'r', encoding='utf-8', errors='replace') as log_file:
            for line_number, line in enumerate(log_file, 1):
                if search_keyword in line:
                    if is_new_file:
                        if not is_new_file_only_one:
                            print("是新文件,从头开始读取")
                            is_new_file_only_one = True
                        content.append((line_number, line.split(search_keyword)[-1]))
                        line_index = line_number

                    else:
                        if line_number >= line_index:
                            if not is_old_file_only_one:
                                print("是旧文件,从上次读取位置继续读取: {}".format(line_index))
                                is_old_file_only_one = True
                            content.append((line_number, line.split(search_keyword)[-1]))
                            line_index = line_number

    except Exception as e:
        print(f"读取Hive日志文件失败:{e}")
    return content, new_size, line_index, new_file


def parse_vertice(vertices):
    """
    解析顶点数据并返回顶点字典

    参数:
    vertices(list): 顶点数据列表

    返回值:
    vertex_dict(dict): 顶点字典,键为顶点ID,值为元组,包含数据库名、表名和列名(如果顶点类型为列)

    """
    vertex_dict = {}
    for vertex in vertices:
        vertex_id = vertex.get("id", "")
        vertex_type = vertex.get("vertexType", "")
        vertex_names = vertex.get("vertexId", "").split(".")

        if len(vertex_names) >= 3:
            db_name = vertex_names[0]
            tb_name = vertex_names[1]
            col_name = vertex_names[-1] if vertex_type == "COLUMN" else ""

            if col_name not in partition_field:
                vertex_dict.setdefault(vertex_id, {"db": db_name, "tb": tb_name, "col": col_name})
    return vertex_dict


def parse_edge(edges):
    """
    解析边的函数

    参数:
    edges (list): 边的列表

    返回值:
    list: 边元素的列表,每个元素为一个元组,包含源节点列表、目标节点列表和表达式
    """
    edge_elem_list = []
    for edge in edges:
        source_arr = edge.get("sources", [])
        target_arr = edge.get("targets", [])
        expression = edge.get("expression", "")
        edge_type = edge.get("edgeType", "")
        edge_elem_list.append({"source": source_arr, "target": target_arr, "exp": expression, "type": edge_type})
    return edge_elem_list

def parse_lineage_log(content: list):
    column_info_dict = {}

    # 去重数据
    for (line_number, line) in content:
        try:
            lineage_dict = json.loads(line)
            vertex_dict = parse_vertice(lineage_dict.get('vertices', []))
            edge_list = parse_edge(lineage_dict.get('edges', []))
            tb, column_info = get_column_depend(vertex_dict, edge_list)
            column_info_dict[tb] = column_info
        except JSONDecodeError as e:
            print("json解析错误: {}".format(line))
            print("该行错误位置: {}".format(line_number))

    return column_info_dict


if __name__ == '__main__':
    print("开始启动....")
    log_dict = {}
    for log_path in log_path_list:
        contents, file_size, index, new_file_name = read_hive_log(log_path)
        column_info_dicts = parse_lineage_log(contents)
        print("{} 文件执行完".format(log_path))
        log_dict.setdefault(log_path.split('/')[-1], dict(size=file_size, index=index, file=new_file_name))
    with open("./hash_index.log", 'w') as f:
        f.write(json.dumps(log_dict))
    print("执行结束...")

最近更新

  1. TCP协议是安全的吗?

    2024-05-10 07:58:11       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-05-10 07:58:11       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-05-10 07:58:11       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-05-10 07:58:11       18 阅读

热门阅读

  1. docker自定义网桥和容器的网络IP段

    2024-05-10 07:58:11       11 阅读
  2. 进度条(小程序)

    2024-05-10 07:58:11       8 阅读
  3. Python创建可点击网页

    2024-05-10 07:58:11       8 阅读
  4. 【Vue3】新组件

    2024-05-10 07:58:11       8 阅读
  5. 域名解析中,A记录和CNAME什么区别

    2024-05-10 07:58:11       11 阅读
  6. 常见的前端框架

    2024-05-10 07:58:11       11 阅读
  7. Jmeter压测问题汇总

    2024-05-10 07:58:11       10 阅读