在SREWorks社區(qū)聚集了很多進(jìn)行運(yùn)維數(shù)倉(cāng)建設(shè)的同學(xué)给僵,大家都會(huì)遇到類(lèi)似的挑戰(zhàn)和問(wèn)題:
- 數(shù)倉(cāng)中存儲(chǔ)大量數(shù)據(jù)消耗成本晨雳,但很多存儲(chǔ)的數(shù)據(jù)卻并沒(méi)有消費(fèi)撒会。
- 進(jìn)數(shù)倉(cāng)的ETL學(xué)習(xí)成本高、管理成本高蔬啡,相關(guān)同學(xué)配合度低诲侮,以及上游結(jié)構(gòu)改動(dòng)后ETL卻遲遲無(wú)人調(diào)整。
- 數(shù)倉(cāng)中數(shù)據(jù)的時(shí)效性星爪、準(zhǔn)確性問(wèn)題浆西,導(dǎo)致很多場(chǎng)景無(wú)法完全依賴(lài)數(shù)倉(cāng)展開(kāi)粉私。
上面的種種讓推廣數(shù)倉(cāng)的同學(xué)很犯難:明明花了大力氣構(gòu)建了統(tǒng)一數(shù)倉(cāng)顽腾,但卻又受限于各種問(wèn)題,無(wú)法讓其價(jià)值得到完全的落地诺核。本文旨在闡述一種基于LLM的數(shù)倉(cāng)構(gòu)建方案抄肖,從架構(gòu)層面解決上述的三個(gè)問(wèn)題。
一窖杀、方案設(shè)計(jì)
從需求出發(fā)漓摩,再次思考一下我們進(jìn)行運(yùn)維數(shù)倉(cāng)構(gòu)建的初衷:用一句SQL可以查詢(xún)或統(tǒng)計(jì)到所有我們關(guān)注的運(yùn)維對(duì)象的情況。雖然有很多方案能做入客,但總結(jié)一下管毙,就是這樣兩種抽象模型:Push 或 Pull。
- Push的方式是我們?nèi)ブ鲃?dòng)管理數(shù)據(jù)的ETL鏈路桌硫,比如使用Flink夭咬、MaxCompute等大數(shù)據(jù)方案將數(shù)據(jù)進(jìn)行加工放到數(shù)倉(cāng)中。在需要查詢(xún)的時(shí)候铆隘,直接SELECT數(shù)倉(cāng)就能出結(jié)果卓舵。這類(lèi)方案的問(wèn)題在于:1. ETL管理維護(hù)成本高。2. 數(shù)據(jù)準(zhǔn)確性較數(shù)據(jù)源有所下降膀钠。
- Pull的方式是我們不去主動(dòng)拉所有的數(shù)據(jù)掏湾,在執(zhí)行時(shí)候再去各個(gè)數(shù)據(jù)源找數(shù)據(jù),比較具有代表性的就是Presto肿嘲。這種方案的優(yōu)點(diǎn)就是不用進(jìn)行ETL管理以及數(shù)據(jù)準(zhǔn)確性較好融击,畢竟是實(shí)時(shí)拉的。但問(wèn)題就在于這種方案把復(fù)雜性都后置到了查詢(xún)那一刻雳窟,查詢(xún)速度過(guò)慢就成了問(wèn)題尊浪。
那么是否有一種方案能將這兩種模型結(jié)合起來(lái),取其中的優(yōu)點(diǎn)呢?經(jīng)過(guò)這段時(shí)間對(duì)于大模型熟悉际长,我認(rèn)為這個(gè)方案是可行的耸采,于是嘗試設(shè)計(jì)了一下流程圖:
二、基于LLM的SQL預(yù)查詢(xún)
相信大家在使用了類(lèi)似Presto的聯(lián)邦查詢(xún)(Federated Query)工育,都會(huì)對(duì)此印象深刻虾宇,原本要好多個(gè)for循環(huán)的代碼,放到里面只要一個(gè)select-join就能解決如绸。但Presto本身的定位就是為分析型的負(fù)載設(shè)計(jì)嘱朽,我們無(wú)法把它置于一些高頻查詢(xún)的關(guān)鍵鏈路上。
聯(lián)邦查詢(xún)的SQL和for循環(huán)的代碼怔接,看起來(lái)似乎只隔了一層紗搪泳,現(xiàn)在大模型的出現(xiàn)就直接把這層紗給捅破了。我們的思路也非常簡(jiǎn)單:既然大模型可以非常方便地把用戶(hù)需求轉(zhuǎn)換成SQL扼脐,那么把用戶(hù)SQL轉(zhuǎn)換成代碼似乎也不是一件難事岸军。
import os
import sys
from openai import OpenAI
import traceback
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
client = OpenAI()
def get_script(content):
return content.split("```python")[1].split("```")[0]
def execute_python(code_str: str):
stdout = StringIO()
stderr = StringIO()
return_head = 1000
context = {}
try:
# 重定向stdout和stderr,執(zhí)行代碼
with redirect_stdout(stdout), redirect_stderr(stderr):
exec(code_str, context)
except Exception:
stderr.write(traceback.format_exc())
# 獲取執(zhí)行后的stdout, stderr和context
stdout_value = stdout.getvalue()[0:return_head]
stderr_value = stderr.getvalue()[0:return_head]
return {"stdout": stdout_value.strip(), "stderr": stderr_value.strip()}
prompt = """
你是一個(gè)數(shù)據(jù)庫(kù)專(zhuān)家瓦侮,我會(huì)給你一段SQL艰赞,請(qǐng)你轉(zhuǎn)換成可執(zhí)行的Python代碼。
當(dāng)前有2個(gè)數(shù)據(jù)庫(kù)的連接信息肚吏,分別是:
1. 數(shù)據(jù)庫(kù)名稱(chēng) processes 連接串 mysql://root@test-db1.com:3306/processes
下面是這個(gè)數(shù)據(jù)庫(kù)的表結(jié)構(gòu)
```
CREATE TABLE `process_table` (
`process_name` varchar(100) DEFAULT NULL,
`start_time` datetime DEFAULT NULL,
`end_time` datetime DEFAULT NULL,
`server_name` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
```
2. 數(shù)據(jù)庫(kù)名稱(chēng) servers 連接串 mysql://root@test-db2.com:3306/servers
下面是這個(gè)數(shù)據(jù)庫(kù)的表結(jié)構(gòu)
···
CREATE TABLE `server_table` (
`server_name` varchar(100) DEFAULT NULL,
`ip` varchar(100) DEFAULT NULL,
`zone` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
···
在編寫(xiě)Python代碼的時(shí)候方妖,不要把所有的數(shù)據(jù)庫(kù)的信息都傳入,請(qǐng)根據(jù)SQL的內(nèi)容按需傳入罚攀。
返回結(jié)果中只有Python代碼党觅,如要描述信息全部放在注釋中。Python的執(zhí)行結(jié)果需要是JSON格式的數(shù)據(jù)斋泄。
下面用戶(hù)會(huì)給出你需要轉(zhuǎn)換的SQL:
"""
query_sql = "select * from processes.process_table a join servers.server_table b on a.server_name = b.server_name where b.zone = 'ZoneA';"
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": query_sql}
]
res = client.chat.completions.create(messages=messages, model="gpt-4")
print(res.choices[0].message.content)
exec_result = execute_python(get_script(res.choices[0].message.content))
print("result:")
print(exec_result)
if exec_result["stderr"] == "" and exec_result["stdout"] != "":
print(exec_result["stdout"])
sys.exit(0)
這個(gè)例子杯瞻,確實(shí)相當(dāng)簡(jiǎn)單直接,連langchain都不需要是己,我們基于OpenAI的接口直接實(shí)現(xiàn)聯(lián)邦查詢(xún)的能力:
- 事先將連接串和數(shù)據(jù)結(jié)構(gòu)準(zhǔn)備好又兵,輸入給LLM。
- 將SQL語(yǔ)句輸入給LLM卒废,確保SQL中的表名和數(shù)據(jù)結(jié)構(gòu)中的表名一致沛厨,避免LLM混淆。
- 將LLM給出的代碼直接執(zhí)行摔认,便獲得了查詢(xún)結(jié)果逆皮。
下面我們來(lái)看看運(yùn)行的結(jié)果
```python
# Import required libraries
import pandas as pd
from sqlalchemy import create_engine
# Create connection engines
engine1 = create_engine('mysql://root@test-db1.com:3306/processes')
engine2 = create_engine('mysql://root@test-db2.com:3306/servers')
# Query data from process_table in processes database
df1 = pd.read_sql('SELECT * FROM `process_table`', engine1)
# Query data from server_table in servers database
df2 = pd.read_sql('SELECT * FROM `server_table` WHERE `zone` = "ZoneA"', engine2)
# Merge both dataframes on server_name
result = pd.merge(df1, df2, how='inner', on='server_name')
# Convert dataframe to JSON
out_json = result.to_json(orient='records')
print(out_json)
```
關(guān)于此代碼:
我們首先導(dǎo)入了必要的庫(kù),然后使用提供的憑據(jù)創(chuàng)建了兩個(gè)數(shù)據(jù)庫(kù)連接参袱。然后电谣,我們分別從兩個(gè)數(shù)據(jù)庫(kù)中查詢(xún)數(shù)據(jù)秽梅。接著,我們根據(jù)"server_name"合并這兩個(gè)數(shù)據(jù)框剿牺,最后我們將結(jié)果轉(zhuǎn)換為JSON格式企垦。
result:
{'stdout': '[{"process_name":"Process1","start_time":1703259365000,"end_time":1703266565000,"server_name":"Server1","zone":"ZoneA"},{"process_name":"Process2","start_time":1703262965000,"end_time":1703270165000,"server_name":"Server2","zone":"ZoneA"}]', 'stderr': ''}
[{"process_name":"Process1","start_time":1703259365000,"end_time":1703266565000,"server_name":"Server1","zone":"ZoneA"},{"process_name":"Process2","start_time":1703262965000,"end_time":1703270165000,"server_name":"Server2","zone":"ZoneA"}]
真實(shí)運(yùn)行起來(lái),確實(shí)LLM給的代碼比較隨機(jī)晒来,一會(huì)兒使用pandas處理數(shù)據(jù)钞诡,一會(huì)兒使用pymysql處理數(shù)據(jù),存在非常大的不確定性湃崩,但是結(jié)果是確定的荧降。多試幾次之后,我們發(fā)現(xiàn)這個(gè)結(jié)果還是不穩(wěn)定攒读,有時(shí)候會(huì)寫(xiě)一些存在瑕疵的代碼朵诫,導(dǎo)致報(bào)錯(cuò)””猓基于我們?cè)谏弦黄呀?jīng)講清楚的思維鏈的模型剪返,我們可以給它加上一個(gè)報(bào)錯(cuò)反饋鏈路,讓它自行修改問(wèn)題代碼泌辫。
for i in range(3):
print("第", i + 1, "次重試")
messages = [
{"role": "system", "content": prompt + "\n" + query_sql},
]
if exec_result["stderr"] != "":
messages.append({"role": "user", "content": res.choices[0].message.content + "\n\n" + exec_result["stderr"] + "\n執(zhí)行報(bào)錯(cuò)随夸,請(qǐng)根據(jù)報(bào)錯(cuò)修正,再次生成代碼"})
else:
messages.append({"role": "user", "content": res.choices[0].message.content + "\n\n" + "執(zhí)行沒(méi)有任何返回震放,請(qǐng)?jiān)俅紊纱a"})
res = client.chat.completions.create(messages=messages, model="gpt-4")
print(res.choices[0].message.content)
exec_result = execute_python(get_script(res.choices[0].message.content))
print("result:")
print(exec_result)
if exec_result["stderr"] == "" and exec_result["stdout"] != "":
print(exec_result["stdout"])
sys.exit(0)
print("查詢(xún)失敗")
有了這層錯(cuò)誤反饋之后,我們會(huì)發(fā)現(xiàn)這個(gè)查詢(xún)就非常穩(wěn)定了驼修,雖然有些時(shí)候LLM產(chǎn)生的代碼會(huì)出錯(cuò)殿遂,但是通過(guò)報(bào)錯(cuò)信息自行修改優(yōu)化之后,能夠保持產(chǎn)出結(jié)果穩(wěn)定(不過(guò)自動(dòng)修改報(bào)錯(cuò)的查詢(xún)乙各,時(shí)延明顯會(huì)比較長(zhǎng)一些)墨礁。
總計(jì) | 一次正確 | 二次正確 | 三次正確 | 失敗 | |||||
---|---|---|---|---|---|---|---|---|---|
次數(shù) | 20 | 7 | 35% | 9 | 45% | 0 | 0 | 4 | 20% |
平均耗時(shí) | 43.0s | 13.2s | 45.3s | N/A | 91.2s |
從20次的測(cè)試中,可以看出一次查詢(xún)的成功率在30%左右耳峦,通過(guò)報(bào)錯(cuò)反饋優(yōu)化恩静,成功率就能達(dá)到80%。 通過(guò)觀察每個(gè)查詢(xún)語(yǔ)句蹲坷,基本可以發(fā)現(xiàn)使用pandas的代碼編寫(xiě)準(zhǔn)確率高很多驶乾,后續(xù)如果需要優(yōu)化prompt,可以在再增加一些使用依賴(lài)庫(kù)上的指引循签,編寫(xiě)成功率會(huì)更高级乐。同時(shí)我們也發(fā)現(xiàn),如果有些代碼一開(kāi)始方向就錯(cuò)誤的話(huà)县匠,通過(guò)報(bào)錯(cuò)反饋優(yōu)化也救不回來(lái)风科,三次成功率為零就是一個(gè)很好的說(shuō)明撒轮。當(dāng)前測(cè)試用的LLM推理速度較慢,如果本地化部署LLM理論上推理速度還能更快不少贼穆。
當(dāng)前题山,基于LLM的查詢(xún)表現(xiàn)上可以和Presto已經(jīng)比較近似了,但有些地方會(huì)比Presto要更強(qiáng):
- 數(shù)據(jù)源擴(kuò)展:presto需要進(jìn)行適配器的開(kāi)發(fā)才能對(duì)接其他數(shù)據(jù)源故痊,而LLM的方案你只要教會(huì)LLM怎么查詢(xún)特定數(shù)據(jù)源就行了臀蛛,事實(shí)上可能都不用教,因?yàn)樗袔缀跛械木幊讨R(shí)崖蜜。
- 白盒化以及復(fù)雜查詢(xún)優(yōu)化:針對(duì)復(fù)雜場(chǎng)景如果存在一些查詢(xún)準(zhǔn)確性問(wèn)題浊仆,需要去preso引擎中排查原因并不簡(jiǎn)單。但LLM的方案是按照人可閱讀的代碼來(lái)了豫领,你可以要求它按照你熟悉的編程語(yǔ)言編寫(xiě)抡柿,你甚至可以要求它寫(xiě)的代碼每行都加上注釋。
當(dāng)然等恐,和Presto一樣洲劣,基于LLM的查詢(xún)方案,只能被放到預(yù)查詢(xún)中课蔬,在生產(chǎn)鏈路中并不能每次都讓LLM去生成查詢(xún)代碼囱稽,這太慢了。那么有沒(méi)有辦法讓它的查詢(xún)提速呢二跋?可以的战惊。還記得我們?cè)谖恼麻_(kāi)頭提過(guò)的Push和Pull的模式嗎?聯(lián)邦查詢(xún)是基于Pull的模式展開(kāi)的扎即,而流式ETL是基于Push模式展開(kāi)的吞获,我們?nèi)绻巡樵?xún)語(yǔ)句直接翻譯成流式ETL的語(yǔ)句,預(yù)先將需要的數(shù)據(jù)處理到一個(gè)數(shù)據(jù)庫(kù)中谚鄙,那是不是就完全可以規(guī)避掉性能問(wèn)題了呢各拷?
三、基于LLM的流計(jì)算處理
和分析型的查詢(xún)相比闷营,流計(jì)算的數(shù)據(jù)同步邏輯顯然簡(jiǎn)單很多烤黍,只要分析SQL,按需求字段進(jìn)行同步即可傻盟。這里就不貼完整的代碼了速蕊,就把相關(guān)部分的prompt貼出來(lái)。
當(dāng)前有2個(gè)數(shù)據(jù)庫(kù)的連接信息莫杈,分別是:
1. 數(shù)據(jù)庫(kù)名稱(chēng) processes 連接串 mysql://root@test-db1.com:3306/processes
下面是這個(gè)數(shù)據(jù)庫(kù)的表結(jié)構(gòu)
```
CREATE TABLE `process_table` (
`process_name` varchar(100) DEFAULT NULL,
`start_time` datetime DEFAULT NULL,
`end_time` datetime DEFAULT NULL,
`server_name` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
```
2. 數(shù)據(jù)庫(kù)名稱(chēng) servers 連接串 mysql://root@test-db2.com:3306/servers
下面是這個(gè)數(shù)據(jù)庫(kù)的表結(jié)構(gòu)
···
CREATE TABLE `server_table` (
`server_name` varchar(100) DEFAULT NULL,
`ip` varchar(100) DEFAULT NULL,
`zone` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
···
你是一個(gè)Flink專(zhuān)家互例,我會(huì)給你一段需求SQL,請(qǐng)你根據(jù)需求SQL筝闹,分析出哪些字段被需要媳叨,編寫(xiě)合適的Flink SQL預(yù)先處理到clickhouse中腥光。
原有的表在clickhouse中依然保持原有的表名,保持一對(duì)一的映射糊秆。
Flink SQL處理的目標(biāo)是使得用戶(hù)能在clickhouse中在完全不改動(dòng)需求SQL的情況下正確執(zhí)行武福。
在Flink SQL中只進(jìn)行數(shù)據(jù)同步,不要加數(shù)據(jù)處理邏輯痘番。
請(qǐng)?jiān)诮Y(jié)果中直接給出一份完整的Flink SQL代碼捉片,不要任何文字說(shuō)明,如果有說(shuō)明的地方在代碼中用注釋標(biāo)出汞舱。
下面用戶(hù)會(huì)給需求SQL:
select a.process_name,b.server_name from processes.process_table a join servers.server_table b on a.server_name = b.server_name where b.zone = 'ZoneA';
我們可以看到這份prompt和上一章生成查詢(xún)代碼的prompt幾乎沒(méi)有太大差別伍纫,只是對(duì)于其生成的目標(biāo)做了修改,明確告訴它要生成Flink SQL昂芜。下面就是LLM生成的Flink SQL:
-- 創(chuàng)建Flink SQL連接到數(shù)據(jù)庫(kù)processes的process_table
CREATE TABLE process_table (
process_name STRING,
server_name STRING
) WITH (
'connector' = 'mysql-cdc', -- 使用 MySQL CDC 連接器
'hostname' = 'test-db1.com', -- 源數(shù)據(jù)庫(kù)主機(jī)名
'port' = '3306', -- 源數(shù)據(jù)庫(kù)端口
'username' = 'root', -- 源數(shù)據(jù)庫(kù)用戶(hù)名
'database-name' = 'processes', -- 源數(shù)據(jù)庫(kù)名
'table-name' = 'process_table' -- 源數(shù)據(jù)庫(kù)表名
);
-- 創(chuàng)建Flink SQL連接到數(shù)據(jù)庫(kù)servers的server_table
CREATE TABLE server_table (
server_name STRING,
zone STRING
) WITH (
'connector' = 'mysql-cdc', -- 使用 MySQL CDC 連接器
'hostname' = 'test-db2.com', -- 源數(shù)據(jù)庫(kù)主機(jī)名
'port' = '3306', -- 源數(shù)據(jù)庫(kù)端口
'username' = 'root', -- 源數(shù)據(jù)庫(kù)用戶(hù)名
'database-name' = 'servers', -- 源數(shù)據(jù)庫(kù)名
'table-name' = 'server_table' -- 源數(shù)據(jù)庫(kù)表名
);
-- 創(chuàng)建到ClickHouse的連接莹规,保持和源數(shù)據(jù)庫(kù)對(duì)應(yīng)關(guān)系
CREATE TABLE clickhouse_process_table (
process_name STRING,
server_name STRING
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://localhost:8123',
'table-name' = 'process_table'
);
CREATE TABLE clickhouse_server_table (
server_name STRING,
zone STRING
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://localhost:8123',
'table-name' = 'server_table'
);
-- Flink SQL同步數(shù)據(jù)
INSERT INTO clickhouse_process_table
SELECT process_name, server_name
FROM process_table;
INSERT INTO clickhouse_server_table
SELECT server_name, zone
FROM server_table;
如我們所預(yù)期的,LLM給出了準(zhǔn)確的Flink SQL語(yǔ)句泌神,并且還根據(jù)查詢(xún)需求的SQL良漱,它只同步了會(huì)查到的字段,用不到的字段就沒(méi)有同步(比如server中的ip字段沒(méi)有用到)欢际。在這樣一條鏈路中母市,我們同樣可以類(lèi)似第三章使用的報(bào)錯(cuò)反饋?zhàn)詢(xún)?yōu)化的方式,提高生成代碼的穩(wěn)定性损趋,使得其生成的代碼可以直接在生產(chǎn)中部署運(yùn)行患久,在這里我們就不做過(guò)多展開(kāi)了。
四舶沿、總結(jié)
一份需求查詢(xún)SQL墙杯,利用LLM生成兩份代碼,一份用于Pull:直接查詢(xún)返回結(jié)果括荡,預(yù)查詢(xún)調(diào)試用;一份用于Push:構(gòu)建消費(fèi)鏈路進(jìn)實(shí)時(shí)數(shù)倉(cāng)溉旋』澹基于LLM,實(shí)現(xiàn)真正意義上從需求出發(fā)的ETL生產(chǎn)鏈路構(gòu)建观腊,大概包含如下優(yōu)點(diǎn):
- 避免ETL過(guò)程的過(guò)度加工:按需加字段邑闲,不會(huì)加工太多用不到字段浪費(fèi)計(jì)算、浪費(fèi)存儲(chǔ)梧油。
- 降低使用者維護(hù)ETL加工過(guò)程成本:雖然Flink SQL的可維護(hù)性已經(jīng)很好了苫耸,但是面向計(jì)算過(guò)程的SQL編寫(xiě)方式還是讓很多用戶(hù)不適應(yīng)。如果直接用查詢(xún)SQL來(lái)進(jìn)行自動(dòng)生成儡陨,就大大降低了維護(hù)的門(mén)檻褪子。
- 統(tǒng)一數(shù)據(jù)鏈路: 以查詢(xún)?yōu)轵?qū)動(dòng)的數(shù)據(jù)模型量淌,可以使得使用者始終面向數(shù)據(jù)源表進(jìn)行需求思考。ETL實(shí)時(shí)計(jì)算產(chǎn)生的數(shù)據(jù)會(huì)更像一個(gè)物化視圖嫌褪,這樣在做實(shí)時(shí)數(shù)據(jù)準(zhǔn)確性校驗(yàn)時(shí)也更加方便呀枢。
如果您當(dāng)前還在為數(shù)倉(cāng)的構(gòu)建所困擾,可以嘗試一下這個(gè)基于LLM的方案笼痛,歡迎大家在SREWorks數(shù)智運(yùn)維社區(qū)溝通交流裙秋。