元數(shù)據(jù)管理系統(tǒng) Amundsen 安裝及使用

1麦箍、線上安裝環(huán)境

3、安裝docker

安裝docker-ce

  • 安裝/升級(jí)Docker客戶端息裸,安裝必要的一些系統(tǒng)工具舀锨。

    yum update -y

    yum install -y yum-utils device-mapper-persistent-data lvm2

  • 添加軟件源信息

    yum-config-manager --add-repo [http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo](http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo)

  • 更新并安裝

    yum makecache fast
    yum -y install docker-ce

  • 開啟Docker服務(wù)

    service docker start

配置鏡像加速器岭洲,為docker容器設(shè)置默認(rèn)網(wǎng)段

  • 添加docker 配置文件,若已存在坎匿,則修改配置文件

tee /etc/docker/daemon.json <<-'EOF'
{
  "debug" : true,
  "registry-mirrors": ["https://dpayzz9i.mirror.aliyuncs.com"],
  "default-address-pools" : [
    {
      "base" : "192.168.0.0/16",
      "size" : 24
    }
  ]
}
EOF
  • 重啟盾剩,并設(shè)置開機(jī)自啟

    systemctl daemon-reload
    systemctl restart docker
    systemctl enable docker

docker-compose安裝

  • 下載

    curl -L "https://get.daocloud.io/docker/compose/releases/download/1.27.3/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

  • 加上可執(zhí)行權(quán)限:

    chmod +x /usr/local/bin/docker-compose

修改docker數(shù)據(jù)路徑,修改到數(shù)據(jù)盤

默認(rèn)是在根目錄:/var/lib/docker 替蔬,修改到數(shù)據(jù)盤: /data/docker 告私,使用軟鏈接方式。

service docker stop

mv /var/lib/docker /data/
ln -sf /data/docker /var/lib/docker
service docker start

4承桥、安裝python3.7

5驻粟、安裝Amundsen

確保您至少有 3GB 可用空間供 docker 使用。

通過git克隆Amundsen:

git clone --recursive [https://github.com/amundsen-io/amundsen.git](https://github.com/amundsen-io/amundsen.git)

進(jìn)入克隆的目錄并在下面啟動(dòng)docker:

# For Neo4j Backend
docker-compose -f docker-amundsen.yml up

如有報(bào)錯(cuò)凶异,解決方法可參考 FAQ

后臺(tái)啟動(dòng)命令:

docker-compose -f docker-amundsen.yml up -d

修改es容器,添加ik中文分詞器

  • 參考: https://zhuanlan.zhihu.com/p/377433737

  • 下載與es版本匹配的ik分詞器 (7.13.3) : https://github.com/medcl/elasticsearch-analysis-ik/releases?page=2

  • 進(jìn)入es容器剩彬,創(chuàng)建/usr/share/elasticsearch/plugins/ik/ 文件夾

    docker exec -it 3d701cddd320 /bin/bash
    mkdir /usr/share/elasticsearch/plugins/ik/

  • 將壓縮包復(fù)制到容器內(nèi)

    docker cp ./elasticsearch-analysis-ik-7.13.3.zip 3d701cddd320:/usr/share/elasticsearch/plugins/ik/

  • 進(jìn)入容器酷麦,解壓壓縮包,并刪除壓縮包

    docker exec -it 3d701cddd320 /bin/bash
    cd /usr/share/elasticsearch/plugins/ik/
    unzip elasticsearch-analysis-ik-7.13.3.zip
    rm -rf elasticsearch-analysis-ik-7.13.3.zip

    exit

  • 重啟es容器

    docker stop 3d701cddd320
    docker-compose -f docker-es.yml up -d

  • 修改導(dǎo)入元數(shù)據(jù)代碼喉恋,修改mapping沃饶,添加ik分詞器粪摘,詳情查看svn代碼。重新導(dǎo)入元數(shù)據(jù)绍坝。

  • 把修改過的es容器保存為鏡像

# 生成自己的鏡像:
docker commit 82fb415dcf0c  my/elasticsearch:7.13.3

#修改docker-amundsen.yml 文件徘意,使用上面生成的鏡像
elasticsearch:
      image: my/elasticsearch:7.13.3

# 把鏡像保存成文件
docker save -o my_es_docker_image.tar my/elasticsearch:7.13.3

添加prometheus+grafana監(jiān)控

prometheus+grafana搭建及配置參考
使用cadvisor服務(wù)監(jiān)控docker容器運(yùn)行,它本身也是一個(gè)容器轩褐。

運(yùn)行 cadvisor 容器

docker run \
--volume=/:/rootfs:ro \
--volume=/var/run:/var/run:ro \
--volume=/sys:/sys:ro \
--volume=/var/lib/docker/:/var/lib/docker:ro \
--volume=/dev/disk/:/dev/disk:ro \
--publish=8080:8080 \
--detach=true \
--name=cadvisor \
google/cadvisor:v0.24.1

添加prometheus配置

- targets: ['ip:8080']
  labels:
    instance: amundsen_docker

添加grafana監(jiān)控椎咧,導(dǎo)入code碼: 193

導(dǎo)入元數(shù)據(jù)

  • 創(chuàng)建python3虛擬環(huán)境,安裝依賴
cd /data/service/amundsen
yum install gcc
yum install python3-devel mysql-devel

python3 -m venv venv
source venv/bin/activate
/data/service/amundsen/venv/bin/pip3 install --upgrade pip

/data/service/amundsen/venv/bin/pip3 install -r databuilder/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
/data/service/amundsen/venv/bin/python3 databuilder/setup.py install

/data/service/amundsen/venv/bin/pip3 install Mysqlclient -i https://mirrors.aliyun.com/pypi/simple/

/data/service/amundsen/venv/bin/pip3 install pyyaml  -i https://mirrors.aliyun.com/pypi/simple/

  • 腳本 部署路徑
amundsen/databuilder/my_metadata
  • 使用python3虛擬環(huán)境運(yùn)行腳本把介,導(dǎo)入元數(shù)據(jù)
# 先導(dǎo)入 mysql
amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_mysql_loader.py

# 再導(dǎo)入 hive
amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_hive_loader.py

Kibana 簡(jiǎn)單語法

# 查詢所有 mappinf
GET _mapping


#查看elasticsearch版本
GET /

#查看健康狀況
GET /_cat/health?v

#查看節(jié)點(diǎn)
GET /_cat/nodes?v

#查看索引
GET /_cat/indices?v

#查看JVM內(nèi)存
GET /_nodes/stats/jvm?pretty

#查看磁盤
GET /_cat/allocation?v

#查看安裝插件
GET /_cat/plugins?v

#查詢?nèi)繑?shù)據(jù)
GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
{
    "query" : {
        "match_all" : {}
    }
}

#DSL查詢語法
GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
{
    "query" : {
        "match" : { "cluster" : "ndz_fund"}
    }
}



#統(tǒng)計(jì)index下的document數(shù)量 hive:837 ,mysql:14034
GET _cat/count/tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1?v
{
    "query" : {
        "match" : { "database": "mysql" }
    }
}

#分詞效果測(cè)試:
POST _analyze
{
  "analyzer": "ik_max_word",
  "text":     "借方本年累計(jì)發(fā)生額"
}

#分詞效果測(cè)試:
POST _analyze
{
  "analyzer": "ik_smart",
  "text":     "借方本年累計(jì)發(fā)生額"
}

# 刪除勤讽,標(biāo)記為刪除,不會(huì)立即刪除
POST tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_delete_by_query
{
  "query": { 
    "match": { 
       "database": "hive"
    }
  }
}

# 合并拗踢,刪除被標(biāo)記刪除的數(shù)據(jù)脚牍。高資源消耗動(dòng)作
POST /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_forcemerge


導(dǎo)入腳本代碼

  • my_util.py代碼
import textwrap
import yaml
import sys


# 讀取配置文件
def read_config(config_file):
    with open(config_file, 'rb') as f:
        config_data = list(yaml.safe_load_all(f))
        if len(config_data) == 0:
            print("------配置文件: {} 為空------".format(config_file))
            sys.exit(1)
        # print(config_data[0].get(key1))
        return config_data[0]


# es mapping
YZF_TABLE_ES_INDEX_MAPPING = textwrap.dedent(
    """
    {
    "mappings":{
        "table":{
          "properties": {
            "name": {
              "type":"text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "schema": {
              "type":"text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "display_name": {
              "type": "keyword"
            },
            "last_updated_timestamp": {
              "type": "date",
              "format": "epoch_second"
            },
            "description": {
              "type": "text",
              "analyzer": "ik_max_word"
            },
            "column_names": {
              "type":"text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "column_descriptions": {
              "type": "text",
              "analyzer": "ik_max_word"
            },
            "tags": {
              "type": "keyword"
            },
            "badges": {
              "type": "keyword"
            },
            "cluster": {
              "type": "text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "database": {
              "type": "text",
              "analyzer": "simple",
              "fields": {
                "raw": {
                  "type": "keyword"
                }
              }
            },
            "key": {
              "type": "keyword"
            },
            "total_usage":{
              "type": "long"
            },
            "unique_usage": {
              "type": "long"
            },
            "programmatic_descriptions": {
              "type": "text",
              "analyzer": "ik_max_word"
            }
          }
        }
      }
    }
    """
)
  • my_mysql_loader.py 代碼
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.

"""

import logging
import sys
import textwrap
import uuid

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base

from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from my_util import YZF_TABLE_ES_INDEX_MAPPING
from my_util import read_config

import yaml

es_host = None
neo_host = None
if len(sys.argv) > 1:
    es_host = sys.argv[1]
if len(sys.argv) > 2:
    neo_host = sys.argv[2]

es = Elasticsearch([
    {'host': es_host or 'localhost'},
])

DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()

NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'

neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = 'neo4j'
neo4j_password = 'test'

# 連接串后綴  &useSSL=false
mysql_conn_ = '?charset=utf8'

LOGGER = logging.getLogger(__name__)


def run_mysql_job(conn_str, connect_name):
    #where_clause_suffix = textwrap.dedent("""
    #    where c.table_schema = 'yzf_biz'
    #""")
    where_clause_suffix = textwrap.dedent("""
        where 1 = 1
    """)
    connect = conn_str + mysql_conn_
    logging.info("Begin load mysql conn: {}".format(connect))

    tmp_folder = '/var/tmp/amundsen/table_metadata'
    node_files_folder = f'{tmp_folder}/nodes/'
    relationship_files_folder = f'{tmp_folder}/relationships/'

    job_config = ConfigFactory.from_dict({
        f'extractor.mysql_metadata.{MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
        f'extractor.mysql_metadata.{MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}': False,
        f'extractor.mysql_metadata.{MysqlMetadataExtractor.CLUSTER_KEY}': connect_name,
        f'extractor.mysql_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connect,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
        f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': connect_name,  # should use unique tag here like {ds}
    })
    job = DefaultJob(conf=job_config,
                     task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()),
                     publisher=Neo4jCsvPublisher())
    return job


def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
                                   elasticsearch_doc_type_key='table',
                                   model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
                                   cypher_query=None,
                                   elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
    """
    :param elasticsearch_index_alias:  alias for Elasticsearch used in
                                       amundsensearchlibrary/search_service/config.py as an index
    :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
                                       `table_search_index`
    :param model_name:                 the Databuilder model class used in transporting between Extractor and Loader
    :param cypher_query:               Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
                                       it uses the `Table` query baked into the Extractor
    :param elasticsearch_mapping:      Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
                                       if None is given (default) it uses the `Table` query baked into the Publisher
    """
    # loader saves data to this location and publisher reads it from here
    extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

    task = DefaultTask(loader=FSElasticsearchJSONLoader(),
                       extractor=Neo4jSearchDataExtractor(),
                       transformer=NoopTransformer())

    # elastic search client instance
    elasticsearch_client = es
    # unique name of new index in Elasticsearch
    elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())

    job_config = ConfigFactory.from_dict({
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
            elasticsearch_client,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
            elasticsearch_new_index_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
            elasticsearch_doc_type_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
            elasticsearch_index_alias,
    })

    # only optionally add these keys, so need to dynamically `put` them
    if cypher_query:
        job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
                       cypher_query)
    if elasticsearch_mapping:
        job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
                       elasticsearch_mapping)

    job = DefaultJob(conf=job_config,
                     task=task,
                     publisher=ElasticsearchPublisher())
    return job


if __name__ == "__main__":
    # Uncomment next line to get INFO level logging
    logging.basicConfig(level=logging.INFO)

    conf_data = read_config('/data/service/amundsen/databuilder/yzf_metadata/mysql_connect.yaml')
    mysql_conf = conf_data.get('conn')
    for conn_name, mysql_list in mysql_conf.items():
        for mysql_conn in enumerate(mysql_list):
            loading_job = run_mysql_job(mysql_conn[1], conn_name)
            loading_job.launch()

            job_es_table = create_es_publisher_sample_job(
                elasticsearch_index_alias='table_search_index',
                elasticsearch_doc_type_key='table',
                model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
            job_es_table.launch()
  • my_hive_loader.py 代碼
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.

"""

import logging
import sys
import textwrap
import uuid

from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base

from databuilder.extractor.hive_table_metadata_extractor import HiveTableMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.models.table_metadata import DESCRIPTION_NODE_LABEL
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from my_util import YZF_TABLE_ES_INDEX_MAPPING

es_host = None
neo_host = None
if len(sys.argv) > 1:
    es_host = sys.argv[1]
if len(sys.argv) > 2:
    neo_host = sys.argv[2]

es = Elasticsearch([
    {'host': es_host or 'localhost'},
])

DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()

NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'

neo4j_endpoint = NEO4J_ENDPOINT

neo4j_user = 'neo4j'
neo4j_password = 'test'

LOGGER = logging.getLogger(__name__)

# Todo: user provides a list of schema for indexing

SUPPORTED_HIVE_SCHEMAS = ['accounting_collect', 'accounting_company', 'accounting_report', 'ads', 'bi_dm_ods', \
                          'biz_dm', 'biz_dw', 'common', 'common_kudu', 'common_sim', 'companyinfo_ods', 'customer', \
                          'customer_kudu', 'datax', 'default', 'di', 'dingtalk', 'dingtalk_kudu', 'dwd', 'dwd_sim', \
                          'dws', 'fintax_account', 'fintax_application', 'fintax_asset', 'fintax_data_init',
                          'fintax_fund', 'fintax_invoice', \
                          'fintax_salary', 'fintax_statistics', 'fintax_stock', 'fintax_task', 'fintax_tax',
                          'fintax_user_point', 'flink_database', \
                          'invoice_lake', 'log_ods', 'monitor', 'octopus_ods', 'sale_ods', 'taxops_ods', 'ucenter',
                          'upm_paas', 'view', 'yzf_biz', \
                          'yzf_biz_init', 'yzf_common', 'yzf_config', 'yzf_report', 'yzf_report_init']
# Global used in all Hive metastore queries.
# String format - ('schema1', schema2', .... 'schemaN')
SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_HIVE_SCHEMAS))


# todo: connection string needs to change
def connection_string():
    user = 'root'
    host = 'ip'
    port = '3306'
    db = 'hivemetastore'
    pd = '123456'
    return "mysql://%s:%s@%s:%s/%s?charset=utf8" % (user, pd, host, port, db)


def create_table_wm_job(templates_dict):
    sql = textwrap.dedent("""
        SELECT From_unixtime(A0.create_time) as create_time,
               'hive'                        as `database`,
               C0.NAME                       as `schema`,
               B0.tbl_name as table_name,
               {func}(A0.part_name) as part_name,
               {watermark} as part_type
        FROM   PARTITIONS A0
               LEFT OUTER JOIN TBLS B0
                            ON A0.tbl_id = B0.tbl_id
               LEFT OUTER JOIN DBS C0
                            ON B0.db_id = C0.db_id
        WHERE  C0.NAME IN {schemas}
               AND B0.tbl_type IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
               AND A0.PART_NAME NOT LIKE '%%__HIVE_DEFAULT_PARTITION__%%'
        GROUP  BY C0.NAME, B0.tbl_name
        ORDER by create_time desc
    """).format(func=templates_dict.get('agg_func'),
                watermark=templates_dict.get('watermark_type'),
                schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)

    LOGGER.info('SQL query: %s', sql)
    tmp_folder = '/var/tmp/amundsen/table_{hwm}'.format(hwm=templates_dict.get('watermark_type').strip("\""))
    node_files_folder = f'{tmp_folder}/nodes'
    relationship_files_folder = f'{tmp_folder}/relationships'

    hwm_extractor = SQLAlchemyExtractor()
    csv_loader = FsNeo4jCSVLoader()

    task = DefaultTask(extractor=hwm_extractor,
                       loader=csv_loader,
                       transformer=NoopTransformer())

    job_config = ConfigFactory.from_dict({
        f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
        f'extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
        f'extractor.sqlalchemy.{SQLAlchemyExtractor.EXTRACT_SQL}': sql,
        'extractor.sqlalchemy.model_class': 'databuilder.models.watermark.Watermark',
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
        f'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag'  # TO-DO unique tag must be added
    })
    job = DefaultJob(conf=job_config,
                     task=task,
                     publisher=Neo4jCsvPublisher())
    job.launch()


def run_hive_job():
    """
        Launches data builder job that extracts table and column metadata from MySQL Hive metastore database,
        and publishes to Neo4j.
        @param kwargs:
        @return:
        """

    # Adding to where clause to scope schema, filter out temp tables which start with numbers and views
    where_clause_suffix = textwrap.dedent("""
            WHERE d.NAME IN {schemas}
            AND t.TBL_NAME NOT REGEXP '^[0-9]+'
            AND t.TBL_TYPE IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
        """).format(schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)

    tmp_folder = '/var/tmp/amundsen/table_metadata'
    node_files_folder = f'{tmp_folder}/nodes/'
    relationship_files_folder = f'{tmp_folder}/relationships/'

    job_config = ConfigFactory.from_dict({
        f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
        f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
        f'extractor.hive_table_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
        f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
        f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES}': [DESCRIPTION_NODE_LABEL],
        'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag'  # TO-DO unique tag must be added
    })

    job = DefaultJob(conf=job_config,
                     task=DefaultTask(extractor=HiveTableMetadataExtractor(), loader=FsNeo4jCSVLoader()),
                     publisher=Neo4jCsvPublisher())
    return job


def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
                                   elasticsearch_doc_type_key='table',
                                   model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
                                   cypher_query=None,
                                   elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
    """
    :param elasticsearch_index_alias:  alias for Elasticsearch used in
                                       amundsensearchlibrary/search_service/config.py as an index
    :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
                                       `table_search_index`
    :param model_name:                 the Databuilder model class used in transporting between Extractor and Loader
    :param cypher_query:               Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
                                       it uses the `Table` query baked into the Extractor
    :param elasticsearch_mapping:      Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
                                       if None is given (default) it uses the `Table` query baked into the Publisher
    """
    # loader saves data to this location and publisher reads it from here
    # 臨時(shí)文件,可刪除巢墅,對(duì)查詢不影響
    extracted_search_data_path = '/var/tmp/amundsen/search_data.json'

    task = DefaultTask(loader=FSElasticsearchJSONLoader(),
                       extractor=Neo4jSearchDataExtractor(),
                       transformer=NoopTransformer())

    # elastic search client instance
    elasticsearch_client = es
    # unique name of new index in Elasticsearch
    elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())

    job_config = ConfigFactory.from_dict({
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
        f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
        f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
            elasticsearch_client,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
            elasticsearch_new_index_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
            elasticsearch_doc_type_key,
        f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
            elasticsearch_index_alias,
    })

    # only optionally add these keys, so need to dynamically `put` them
    if cypher_query:
        job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
                       cypher_query)
    if elasticsearch_mapping:
        job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
                       elasticsearch_mapping)

    job = DefaultJob(conf=job_config,
                     task=task,
                     publisher=ElasticsearchPublisher())
    return job


if __name__ == "__main__":
    # Uncomment next line to get INFO level logging
    logging.basicConfig(level=logging.INFO)

    loading_job = run_hive_job()
    loading_job.launch()

    templates_dict = {'agg_func': 'min',
                      'watermark_type': '"low_watermark"',
                      'part_regex': '{{ ds }}'}
    templates_dict.get('agg_func')

    create_table_wm_job(templates_dict)

    templates_dict = {'agg_func': 'max',
                      'watermark_type': '"high_watermark"',
                      'part_regex': '{{ ds }}'}

    create_table_wm_job(templates_dict)

    job_es_table = create_es_publisher_sample_job(
        elasticsearch_index_alias='table_search_index',
        elasticsearch_doc_type_key='table',
        model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
    job_es_table.launch()
  • mysql_connect.yaml 配置文件
conn:
  mysql1:
    - mysql://root:123456@ip1:3306/<db>
  mysql2:
    - mysql://root:123456@ip2:3306/<db>

FAQ

1. docker-compose -f docker-amundsen.yml up啟動(dòng)容器報(bào)錯(cuò):

[2021-12-01T08:02:56,599][INFO ][o.e.b.BootstrapChecks ] [PD4Rw8t] bound or publishing to a non-loopback address, enforcing bootstrap checks
es_amundsen_atlas | ERROR: [1] bootstrap checks failed
es_amundsen_atlas | [1]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

  • 增加堆內(nèi)存诸狭,修改 vim /etc/sysctl.conf ,添加內(nèi)容: vm.max_map_count = 262144

  • 重加載修改配置: sysctl -p

  • 重跑 docker-compose -f docker-amundsen.yml up

2.頁(yè)面搜索元數(shù)據(jù)信息君纫,中文注釋顯示亂碼

  • 修改連接mysql url,在連接串后添加utf-8設(shè)置: ?charset=utf8

3.導(dǎo)入Hive的mysql元數(shù)據(jù)時(shí)報(bào)錯(cuò):

sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1055, "Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'hive.A0.CREATE_TIME' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by")
[SQL:
SELECT From_unixtime(A0.create_time) as create_time,

....

  • 參考https://blog.csdn.net/fansili/article/details/78664267驯遇,修改mysql配置:

    mysql> set global sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

    mysql> set session sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

4. 阿蒙森官方FAQ: https://www.amundsen.io/amundsen/installation/#troubleshooting

5.防火墻是關(guān)閉的,通過215.5還可以訪問不通阿蒙森機(jī)器的5000端口

解決辦法:先打開防火墻蓄髓,打開5000端口限制叉庐,再關(guān)閉防火墻。然后搜索正常訪問5000端口会喝。

systemctl start firewalld
firewall-cmd --zone=public --add-port=5000/tcp --permanent
firewall-cmd --reload
systemctl stop firewalld
systemctl disable firewalld

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末陡叠,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子肢执,更是在濱河造成了極大的恐慌枉阵,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蔚万,死亡現(xiàn)場(chǎng)離奇詭異岭妖,居然都是意外死亡临庇,警方通過查閱死者的電腦和手機(jī)反璃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來假夺,“玉大人淮蜈,你說我怎么就攤上這事∫丫恚” “怎么了梧田?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我裁眯,道長(zhǎng)鹉梨,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任穿稳,我火速辦了婚禮存皂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘逢艘。我一直安慰自己旦袋,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布它改。 她就那樣靜靜地躺著疤孕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪央拖。 梳的紋絲不亂的頭發(fā)上祭阀,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天,我揣著相機(jī)與錄音鲜戒,去河邊找鬼柬讨。 笑死,一個(gè)胖子當(dāng)著我的面吹牛袍啡,可吹牛的內(nèi)容都是我干的踩官。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼境输,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼蔗牡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起嗅剖,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤辩越,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后信粮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體黔攒,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年强缘,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了督惰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡旅掂,死狀恐怖赏胚,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情商虐,我是刑警寧澤觉阅,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布歧蒋,位于F島的核電站层释,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜奥秆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一扑眉、第九天 我趴在偏房一處隱蔽的房頂上張望裳涛。 院中可真熱鬧咐吼,春花似錦、人聲如沸咳蔚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽谈火。三九已至侈询,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間糯耍,已是汗流浹背扔字。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留温技,地道東北人革为。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像舵鳞,于是被迫代替她去往敵國(guó)和親震檩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容