1麦箍、線上安裝環(huán)境
- 路徑:amundsen: /opt/amundsen ,kibana: /opt/kibana-7.13.3-linux-x86_64
- amundsen web頁(yè)面: http://localhost:5000/
- kibana 訪問頁(yè)面: http://localhost:5601/app/dev_tools#/console (連接amundsen的es株婴,只用來查詢,請(qǐng)勿put數(shù)據(jù))
3、安裝docker
安裝docker-ce
-
安裝/升級(jí)Docker客戶端息裸,安裝必要的一些系統(tǒng)工具舀锨。
yum update -y
yum install -y yum-utils device-mapper-persistent-data lvm2
-
添加軟件源信息
yum-config-manager --add-repo [http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo](http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo)
-
更新并安裝
yum makecache fast
yum -y install docker-ce
-
開啟Docker服務(wù)
service docker start
配置鏡像加速器岭洲,為docker容器設(shè)置默認(rèn)網(wǎng)段
- 添加docker 配置文件,若已存在坎匿,則修改配置文件
tee /etc/docker/daemon.json <<-'EOF'
{
"debug" : true,
"registry-mirrors": ["https://dpayzz9i.mirror.aliyuncs.com"],
"default-address-pools" : [
{
"base" : "192.168.0.0/16",
"size" : 24
}
]
}
EOF
-
重啟盾剩,并設(shè)置開機(jī)自啟
systemctl daemon-reload
systemctl restart docker
systemctl enable docker
docker-compose安裝
-
下載
curl -L "https://get.daocloud.io/docker/compose/releases/download/1.27.3/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
-
加上可執(zhí)行權(quán)限:
chmod +x /usr/local/bin/docker-compose
修改docker數(shù)據(jù)路徑,修改到數(shù)據(jù)盤
默認(rèn)是在根目錄:/var/lib/docker 替蔬,修改到數(shù)據(jù)盤: /data/docker 告私,使用軟鏈接方式。
service docker stop
mv /var/lib/docker /data/
ln -sf /data/docker /var/lib/docker
service docker start
4承桥、安裝python3.7
5驻粟、安裝Amundsen
確保您至少有 3GB 可用空間供 docker 使用。
通過git克隆Amundsen:
git clone --recursive [https://github.com/amundsen-io/amundsen.git](https://github.com/amundsen-io/amundsen.git)
進(jìn)入克隆的目錄并在下面啟動(dòng)docker:
# For Neo4j Backend
docker-compose -f docker-amundsen.yml up
如有報(bào)錯(cuò)凶异,解決方法可參考 FAQ
后臺(tái)啟動(dòng)命令:
docker-compose -f docker-amundsen.yml up -d
- 啟動(dòng)成功后蜀撑,打開web頁(yè)面: http://localhost:5000/
修改es容器,添加ik中文分詞器
下載與es版本匹配的ik分詞器 (7.13.3) : https://github.com/medcl/elasticsearch-analysis-ik/releases?page=2
-
進(jìn)入es容器剩彬,創(chuàng)建/usr/share/elasticsearch/plugins/ik/ 文件夾
docker exec -it 3d701cddd320 /bin/bash
mkdir /usr/share/elasticsearch/plugins/ik/
-
將壓縮包復(fù)制到容器內(nèi)
docker cp ./elasticsearch-analysis-ik-7.13.3.zip 3d701cddd320:/usr/share/elasticsearch/plugins/ik/
-
進(jìn)入容器酷麦,解壓壓縮包,并刪除壓縮包
docker exec -it 3d701cddd320 /bin/bash
cd /usr/share/elasticsearch/plugins/ik/
unzip elasticsearch-analysis-ik-7.13.3.zip
rm -rf elasticsearch-analysis-ik-7.13.3.zip
exit
-
重啟es容器
docker stop 3d701cddd320
docker-compose -f docker-es.yml up -d
修改導(dǎo)入元數(shù)據(jù)代碼喉恋,修改mapping沃饶,添加ik分詞器粪摘,詳情查看svn代碼。重新導(dǎo)入元數(shù)據(jù)绍坝。
把修改過的es容器保存為鏡像
# 生成自己的鏡像:
docker commit 82fb415dcf0c my/elasticsearch:7.13.3
#修改docker-amundsen.yml 文件徘意,使用上面生成的鏡像
elasticsearch:
image: my/elasticsearch:7.13.3
# 把鏡像保存成文件
docker save -o my_es_docker_image.tar my/elasticsearch:7.13.3
添加prometheus+grafana監(jiān)控
prometheus+grafana搭建及配置參考
使用cadvisor服務(wù)監(jiān)控docker容器運(yùn)行,它本身也是一個(gè)容器轩褐。
運(yùn)行 cadvisor 容器
docker run \
--volume=/:/rootfs:ro \
--volume=/var/run:/var/run:ro \
--volume=/sys:/sys:ro \
--volume=/var/lib/docker/:/var/lib/docker:ro \
--volume=/dev/disk/:/dev/disk:ro \
--publish=8080:8080 \
--detach=true \
--name=cadvisor \
google/cadvisor:v0.24.1
添加prometheus配置
- targets: ['ip:8080']
labels:
instance: amundsen_docker
添加grafana監(jiān)控椎咧,導(dǎo)入code碼: 193
導(dǎo)入元數(shù)據(jù)
- 創(chuàng)建python3虛擬環(huán)境,安裝依賴
cd /data/service/amundsen
yum install gcc
yum install python3-devel mysql-devel
python3 -m venv venv
source venv/bin/activate
/data/service/amundsen/venv/bin/pip3 install --upgrade pip
/data/service/amundsen/venv/bin/pip3 install -r databuilder/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
/data/service/amundsen/venv/bin/python3 databuilder/setup.py install
/data/service/amundsen/venv/bin/pip3 install Mysqlclient -i https://mirrors.aliyun.com/pypi/simple/
/data/service/amundsen/venv/bin/pip3 install pyyaml -i https://mirrors.aliyun.com/pypi/simple/
- 腳本 部署路徑
amundsen/databuilder/my_metadata
- 使用python3虛擬環(huán)境運(yùn)行腳本把介,導(dǎo)入元數(shù)據(jù)
# 先導(dǎo)入 mysql
amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_mysql_loader.py
# 再導(dǎo)入 hive
amundsen/venv/bin/python3 /data/service/amundsen/databuilder/my_metadata/my_hive_loader.py
Kibana 簡(jiǎn)單語法
# 查詢所有 mappinf
GET _mapping
#查看elasticsearch版本
GET /
#查看健康狀況
GET /_cat/health?v
#查看節(jié)點(diǎn)
GET /_cat/nodes?v
#查看索引
GET /_cat/indices?v
#查看JVM內(nèi)存
GET /_nodes/stats/jvm?pretty
#查看磁盤
GET /_cat/allocation?v
#查看安裝插件
GET /_cat/plugins?v
#查詢?nèi)繑?shù)據(jù)
GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
{
"query" : {
"match_all" : {}
}
}
#DSL查詢語法
GET /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_search
{
"query" : {
"match" : { "cluster" : "ndz_fund"}
}
}
#統(tǒng)計(jì)index下的document數(shù)量 hive:837 ,mysql:14034
GET _cat/count/tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1?v
{
"query" : {
"match" : { "database": "mysql" }
}
}
#分詞效果測(cè)試:
POST _analyze
{
"analyzer": "ik_max_word",
"text": "借方本年累計(jì)發(fā)生額"
}
#分詞效果測(cè)試:
POST _analyze
{
"analyzer": "ik_smart",
"text": "借方本年累計(jì)發(fā)生額"
}
# 刪除勤讽,標(biāo)記為刪除,不會(huì)立即刪除
POST tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_delete_by_query
{
"query": {
"match": {
"database": "hive"
}
}
}
# 合并拗踢,刪除被標(biāo)記刪除的數(shù)據(jù)脚牍。高資源消耗動(dòng)作
POST /tablesa6d2d009-1a60-43ac-8eab-40bb00fab2a1/_forcemerge
導(dǎo)入腳本代碼
- my_util.py代碼
import textwrap
import yaml
import sys
# 讀取配置文件
def read_config(config_file):
with open(config_file, 'rb') as f:
config_data = list(yaml.safe_load_all(f))
if len(config_data) == 0:
print("------配置文件: {} 為空------".format(config_file))
sys.exit(1)
# print(config_data[0].get(key1))
return config_data[0]
# es mapping
YZF_TABLE_ES_INDEX_MAPPING = textwrap.dedent(
"""
{
"mappings":{
"table":{
"properties": {
"name": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"schema": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"display_name": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text",
"analyzer": "ik_max_word"
},
"column_names": {
"type":"text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"column_descriptions": {
"type": "text",
"analyzer": "ik_max_word"
},
"tags": {
"type": "keyword"
},
"badges": {
"type": "keyword"
},
"cluster": {
"type": "text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"database": {
"type": "text",
"analyzer": "simple",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"key": {
"type": "keyword"
},
"total_usage":{
"type": "long"
},
"unique_usage": {
"type": "long"
},
"programmatic_descriptions": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
}
"""
)
- my_mysql_loader.py 代碼
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.
"""
import logging
import sys
import textwrap
import uuid
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base
from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from my_util import YZF_TABLE_ES_INDEX_MAPPING
from my_util import read_config
import yaml
es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]
es = Elasticsearch([
{'host': es_host or 'localhost'},
])
DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()
NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
# 連接串后綴 &useSSL=false
mysql_conn_ = '?charset=utf8'
LOGGER = logging.getLogger(__name__)
def run_mysql_job(conn_str, connect_name):
#where_clause_suffix = textwrap.dedent("""
# where c.table_schema = 'yzf_biz'
#""")
where_clause_suffix = textwrap.dedent("""
where 1 = 1
""")
connect = conn_str + mysql_conn_
logging.info("Begin load mysql conn: {}".format(connect))
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = f'{tmp_folder}/nodes/'
relationship_files_folder = f'{tmp_folder}/relationships/'
job_config = ConfigFactory.from_dict({
f'extractor.mysql_metadata.{MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
f'extractor.mysql_metadata.{MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}': False,
f'extractor.mysql_metadata.{MysqlMetadataExtractor.CLUSTER_KEY}': connect_name,
f'extractor.mysql_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connect,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': connect_name, # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
return job
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
job_config = ConfigFactory.from_dict({
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
elasticsearch_client,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
elasticsearch_new_index_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
elasticsearch_doc_type_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
elasticsearch_index_alias,
})
# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
cypher_query)
if elasticsearch_mapping:
job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
elasticsearch_mapping)
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__":
# Uncomment next line to get INFO level logging
logging.basicConfig(level=logging.INFO)
conf_data = read_config('/data/service/amundsen/databuilder/yzf_metadata/mysql_connect.yaml')
mysql_conf = conf_data.get('conn')
for conn_name, mysql_list in mysql_conf.items():
for mysql_conn in enumerate(mysql_list):
loading_job = run_mysql_job(mysql_conn[1], conn_name)
loading_job.launch()
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
- my_hive_loader.py 代碼
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
"""
This is a example script which demo how to load data
into Neo4j and Elasticsearch without using an Airflow DAG.
"""
import logging
import sys
import textwrap
import uuid
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from sqlalchemy.ext.declarative import declarative_base
from databuilder.extractor.hive_table_metadata_extractor import HiveTableMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.models.table_metadata import DESCRIPTION_NODE_LABEL
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from databuilder.extractor.mysql_metadata_extractor import MysqlMetadataExtractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
from my_util import YZF_TABLE_ES_INDEX_MAPPING
es_host = None
neo_host = None
if len(sys.argv) > 1:
es_host = sys.argv[1]
if len(sys.argv) > 2:
neo_host = sys.argv[2]
es = Elasticsearch([
{'host': es_host or 'localhost'},
])
DB_FILE = '/tmp/test.db'
SQLITE_CONN_STRING = 'sqlite:////tmp/test.db'
Base = declarative_base()
NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
LOGGER = logging.getLogger(__name__)
# Todo: user provides a list of schema for indexing
SUPPORTED_HIVE_SCHEMAS = ['accounting_collect', 'accounting_company', 'accounting_report', 'ads', 'bi_dm_ods', \
'biz_dm', 'biz_dw', 'common', 'common_kudu', 'common_sim', 'companyinfo_ods', 'customer', \
'customer_kudu', 'datax', 'default', 'di', 'dingtalk', 'dingtalk_kudu', 'dwd', 'dwd_sim', \
'dws', 'fintax_account', 'fintax_application', 'fintax_asset', 'fintax_data_init',
'fintax_fund', 'fintax_invoice', \
'fintax_salary', 'fintax_statistics', 'fintax_stock', 'fintax_task', 'fintax_tax',
'fintax_user_point', 'flink_database', \
'invoice_lake', 'log_ods', 'monitor', 'octopus_ods', 'sale_ods', 'taxops_ods', 'ucenter',
'upm_paas', 'view', 'yzf_biz', \
'yzf_biz_init', 'yzf_common', 'yzf_config', 'yzf_report', 'yzf_report_init']
# Global used in all Hive metastore queries.
# String format - ('schema1', schema2', .... 'schemaN')
SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_HIVE_SCHEMAS))
# todo: connection string needs to change
def connection_string():
user = 'root'
host = 'ip'
port = '3306'
db = 'hivemetastore'
pd = '123456'
return "mysql://%s:%s@%s:%s/%s?charset=utf8" % (user, pd, host, port, db)
def create_table_wm_job(templates_dict):
sql = textwrap.dedent("""
SELECT From_unixtime(A0.create_time) as create_time,
'hive' as `database`,
C0.NAME as `schema`,
B0.tbl_name as table_name,
{func}(A0.part_name) as part_name,
{watermark} as part_type
FROM PARTITIONS A0
LEFT OUTER JOIN TBLS B0
ON A0.tbl_id = B0.tbl_id
LEFT OUTER JOIN DBS C0
ON B0.db_id = C0.db_id
WHERE C0.NAME IN {schemas}
AND B0.tbl_type IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
AND A0.PART_NAME NOT LIKE '%%__HIVE_DEFAULT_PARTITION__%%'
GROUP BY C0.NAME, B0.tbl_name
ORDER by create_time desc
""").format(func=templates_dict.get('agg_func'),
watermark=templates_dict.get('watermark_type'),
schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)
LOGGER.info('SQL query: %s', sql)
tmp_folder = '/var/tmp/amundsen/table_{hwm}'.format(hwm=templates_dict.get('watermark_type').strip("\""))
node_files_folder = f'{tmp_folder}/nodes'
relationship_files_folder = f'{tmp_folder}/relationships'
hwm_extractor = SQLAlchemyExtractor()
csv_loader = FsNeo4jCSVLoader()
task = DefaultTask(extractor=hwm_extractor,
loader=csv_loader,
transformer=NoopTransformer())
job_config = ConfigFactory.from_dict({
f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
f'extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
f'extractor.sqlalchemy.{SQLAlchemyExtractor.EXTRACT_SQL}': sql,
'extractor.sqlalchemy.model_class': 'databuilder.models.watermark.Watermark',
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag' # TO-DO unique tag must be added
})
job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
def run_hive_job():
"""
Launches data builder job that extracts table and column metadata from MySQL Hive metastore database,
and publishes to Neo4j.
@param kwargs:
@return:
"""
# Adding to where clause to scope schema, filter out temp tables which start with numbers and views
where_clause_suffix = textwrap.dedent("""
WHERE d.NAME IN {schemas}
AND t.TBL_NAME NOT REGEXP '^[0-9]+'
AND t.TBL_TYPE IN ( 'EXTERNAL_TABLE', 'MANAGED_TABLE' )
""").format(schemas=SUPPORTED_HIVE_SCHEMA_SQL_IN_CLAUSE)
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = f'{tmp_folder}/nodes/'
relationship_files_folder = f'{tmp_folder}/relationships/'
job_config = ConfigFactory.from_dict({
f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
f'extractor.hive_table_metadata.{HiveTableMetadataExtractor.CLUSTER_KEY}': 'tencent_hive_kudu',
f'extractor.hive_table_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES}': [DESCRIPTION_NODE_LABEL],
'publisher.neo4j.job_publish_tag': 'load_hive_unique_tag' # TO-DO unique tag must be added
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=HiveTableMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
return job
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
cypher_query=None,
elasticsearch_mapping=YZF_TABLE_ES_INDEX_MAPPING):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_search_index`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default)
it uses the `Table` query baked into the Extractor
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
# 臨時(shí)文件,可刪除巢墅,對(duì)查詢不影響
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4())
job_config = ConfigFactory.from_dict({
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}': model_name,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
elasticsearch_client,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
elasticsearch_new_index_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
elasticsearch_doc_type_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
elasticsearch_index_alias,
})
# only optionally add these keys, so need to dynamically `put` them
if cypher_query:
job_config.put(f'extractor.search_data.{Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY}',
cypher_query)
if elasticsearch_mapping:
job_config.put(f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY}',
elasticsearch_mapping)
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
return job
if __name__ == "__main__":
# Uncomment next line to get INFO level logging
logging.basicConfig(level=logging.INFO)
loading_job = run_hive_job()
loading_job.launch()
templates_dict = {'agg_func': 'min',
'watermark_type': '"low_watermark"',
'part_regex': '{{ ds }}'}
templates_dict.get('agg_func')
create_table_wm_job(templates_dict)
templates_dict = {'agg_func': 'max',
'watermark_type': '"high_watermark"',
'part_regex': '{{ ds }}'}
create_table_wm_job(templates_dict)
job_es_table = create_es_publisher_sample_job(
elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
job_es_table.launch()
- mysql_connect.yaml 配置文件
conn:
mysql1:
- mysql://root:123456@ip1:3306/<db>
mysql2:
- mysql://root:123456@ip2:3306/<db>
FAQ
1. docker-compose -f docker-amundsen.yml up啟動(dòng)容器報(bào)錯(cuò):
[2021-12-01T08:02:56,599][INFO ][o.e.b.BootstrapChecks ] [PD4Rw8t] bound or publishing to a non-loopback address, enforcing bootstrap checks
es_amundsen_atlas | ERROR: [1] bootstrap checks failed
es_amundsen_atlas | [1]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
增加堆內(nèi)存诸狭,修改 vim /etc/sysctl.conf ,添加內(nèi)容: vm.max_map_count = 262144
重加載修改配置: sysctl -p
重跑 docker-compose -f docker-amundsen.yml up
2.頁(yè)面搜索元數(shù)據(jù)信息君纫,中文注釋顯示亂碼
- 修改連接mysql url,在連接串后添加utf-8設(shè)置: ?charset=utf8
3.導(dǎo)入Hive的mysql元數(shù)據(jù)時(shí)報(bào)錯(cuò):
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1055, "Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'hive.A0.CREATE_TIME' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by")
[SQL:
SELECT From_unixtime(A0.create_time) as create_time,
....
-
參考https://blog.csdn.net/fansili/article/details/78664267驯遇,修改mysql配置:
mysql> set global sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';
mysql> set session sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';
4. 阿蒙森官方FAQ: https://www.amundsen.io/amundsen/installation/#troubleshooting
5.防火墻是關(guān)閉的,通過215.5還可以訪問不通阿蒙森機(jī)器的5000端口
解決辦法:先打開防火墻蓄髓,打開5000端口限制叉庐,再關(guān)閉防火墻。然后搜索正常訪問5000端口会喝。
systemctl start firewalld
firewall-cmd --zone=public --add-port=5000/tcp --permanent
firewall-cmd --reload
systemctl stop firewalld
systemctl disable firewalld