量化投研配置本地數(shù)據(jù)庫是量化投資領(lǐng)域中非常重要的一環(huán)。通過搭建本地數(shù)據(jù)庫,可以方便地存儲和管理股票市場的日頻數(shù)據(jù),為后續(xù)的因子計算和策略研究提供支持岳服。
1. 數(shù)據(jù)庫搭建
首先我們拋出一個問題:真的需要數(shù)據(jù)庫嗎?
前段時間跑了一個回測代碼(這里用的是mysql):用海龜策略跑2011年到2022年12年的分鐘線數(shù)據(jù)希俩,源數(shù)據(jù)是csv文件吊宋,我們將其存儲到數(shù)據(jù)庫過程就非常耗時,從數(shù)據(jù)庫讀出來也耗費了很長時間颜武,大概有20多分鐘的樣子璃搜,但是如果我們將csv文件轉(zhuǎn)化為pickle二進制文件,從硬盤讀取到內(nèi)存只用2分鐘鳞上,而且內(nèi)存占用量也下降了2倍多这吻。如果說用金融時序數(shù)據(jù)庫的話速度可能會快一些。
缺點:IO操作非常耗時篙议,占用硬盤空間大唾糯。
優(yōu)點:方便數(shù)據(jù)管理添加維護、框架集成鬼贱,用戶操作方便趾断。
為了簡化數(shù)據(jù)庫搭建的過程,我們采用Veighna框架自帶的數(shù)據(jù)管理模塊來作為本地數(shù)據(jù)庫管理工具吩愧。
2. PostgreSQL
PostgreSQL是特性更為豐富的開源關(guān)系型數(shù)據(jù)庫,只推薦熟手使用增显。相比于MySQL雁佳,其特點如下:
- 采用多進程結(jié)構(gòu)脐帝;
- 支持通過擴展插件來新增功能。
Windows上PostgreSQL安裝配置教程:https://blog.csdn.net/my1324/article/details/103226622
3. Veighna數(shù)據(jù)庫架構(gòu)介紹
我們首先進入到vnpy\trader\database.py這個python文件中
3.1 數(shù)據(jù)結(jié)構(gòu)類
主要是BarOverview類和TickOverview類糖权,這兩個類封裝了底層接口數(shù)據(jù)結(jié)構(gòu)堵腹,提供數(shù)據(jù)視圖,用來區(qū)分存儲的數(shù)據(jù)星澳。
3.2 BaseDatabase(ABC)數(shù)據(jù)庫基類
BaseDatebase(ABC):是數(shù)據(jù)庫基類疚顷,封裝了常用的數(shù)據(jù)保存加載刪除等抽象方法,由子類實現(xiàn)禁偎。
定義了8個抽象方法腿堤,繼承了基類的子類都必須實現(xiàn)這些方法,從命名中我們也可以很清楚的知道每個方法的意思如暖。
返回的數(shù)據(jù)類型笆檀,比如BarData,這種自定義的數(shù)據(jù)結(jié)構(gòu)是在vnpy\trader\object.py文件中定義的盒至。
3.3 get_database()獲取數(shù)據(jù)庫服務(wù)模塊
def get_database() -> BaseDatabase:
""""""
# Return database object if already inited
global database
if database:
return database
# Read database related global setting
database_name: str = SETTINGS["database.name"]
module_name: str = f"vnpy_{database_name}"
# Try to import database module
try:
module: ModuleType = import_module(module_name)
except ModuleNotFoundError:
print(f"找不到數(shù)據(jù)庫驅(qū)動{module_name}酗洒,使用默認的SQLite數(shù)據(jù)庫")
module: ModuleType = import_module("vnpy_sqlite")
# Create database object from module
database = module.Database()
return database
可以看到,Veighna是從SETTINGS配置文件里面讀取設(shè)置的database名稱枷遂,通過import_module()導入該數(shù)據(jù)庫模塊樱衷,所以我們需要提前下載好對應(yīng)的vnpy_{}包,否則會默認使用vnpy_sqlite包作為項目數(shù)據(jù)庫服務(wù)模塊酒唉。
本文使用的是postgresql數(shù)據(jù)庫矩桂,因此需要運行下面這行代碼。
pip install vnpy_PostgreSQL Postgres
3.4 vnpy_postgresql數(shù)據(jù)庫實例模塊
首先找到vnpy_postgresql文件夾黔州,一般在你下載的虛擬環(huán)境envs\vnpy\Lib\site-packages\vnpy_postgresql下耍鬓。
可以使用pycharm中File——Settings——Project:xxx——project Structure導入這個模塊到你的項目結(jié)構(gòu)下。
我們點擊postgresql_database.py這個文件流妻,看到它的結(jié)構(gòu)如下牲蜀。
其實Veighna就是使用peewee這個python的ORM框架對數(shù)據(jù)結(jié)構(gòu)進行進一步封裝,對數(shù)據(jù)庫的操作映射成對類绅这、對象的操作涣达,避免了我們直接寫在SQL語句≈ま保看到這邊度苔,相信你對Veighna的數(shù)據(jù)管理已經(jīng)有了直觀的理解,下面我們就進行具體的配置操作浑度。
4.具體配置
4.1 創(chuàng)建.vntrader文件
.vntrader和run.py一定是同級目錄
程序加載setting里面的配置信息都是從這個文件夾里面的json文件獲取寇窑。
def _get_trader_dir(temp_name: str) -> Tuple[Path, Path]:
"""
Get path where trader is running in.
"""
cwd: Path = Path.cwd()
temp_path: Path = cwd.joinpath(temp_name)
# If .vntrader folder exists in current working directory,
# then use it as trader running path.
if temp_path.exists():
return cwd, temp_path
# Otherwise use home path of system.
home_path: Path = Path.home()
temp_path: Path = home_path.joinpath(temp_name)
# Create .vntrader folder under home path if not exist.
if not temp_path.exists():
temp_path.mkdir()
return home_path, temp_path
TRADER_DIR, TEMP_DIR = _get_trader_dir(".vntrader")
sys.path.append(str(TRADER_DIR))
我們可以在vnpy\trader\utility.py里面找到這串代碼,意思就是從當前工作目錄獲取.vntrader文件箩张,添加到系統(tǒng)環(huán)境變量里面甩骏。如果當前環(huán)境沒有這個文件窗市,就會到Path.home()里面找,也就是你c盤用戶目錄C:\Users\Administrator
4.2 運行下面代碼run.py
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy_datamanager import DataManagerApp
def main():
"""Start VeighNa Trader"""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_app(DataManagerApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if __name__ == "__main__":
main()
第一次運行報錯:
找不到數(shù)據(jù)庫驅(qū)動vnpy_{}饮笛,使用默認的SQLite數(shù)據(jù)庫
沒關(guān)系咨察,這是因為我們在.vntrader里面沒有配置要使用的數(shù)據(jù)庫,我們將下面這個文件保存到.vntrader文件夾下面福青。
vt_setting.json
{
"font.family": "微軟雅黑",
"font.size": 8,
"log.active": true,
"log.level": 50,
"log.console": true,
"log.file": true,
"email.server": "smtp.qq.com",
"email.port": 465,
"email.username": "",
"email.password": "",
"email.sender": "",
"email.receiver": "",
"datafeed.name": "",
"datafeed.username": "",
"datafeed.password": "",
"database.timezone": "Asia/Shanghai",
"database.name": "postgresql",
"database.database": "vnpy",
"database.host": "localhost",
"database.port": 5432,
"database.user":"postgres",
"database.password": "123456"
}
在SQL Shell(psql)下創(chuàng)建數(shù)據(jù)庫
重啟run.py文件
5.數(shù)據(jù)下載
5.1 環(huán)境配置
5.1.1 安裝vnpy_binance
pip install vnpy_binance
vnpy_binance下面有三個接口摄狱,使用時需要注意本接口:
- 只支持全倉保證金模式
- 只支持單向持倉模式
from vnpy_datamanager import DataManagerApp
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy_binance import (
BinanceSpotGateway, # 現(xiàn)貨交易
BinanceUsdtGateway, # 合約交易
BinanceInverseGateway # 用于對接幣安反向合約的交易接口
)
def main():
"""Start VeighNa Trader"""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_app(DataManagerApp)
main_engine.add_gateway(BinanceSpotGateway)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if __name__ == "__main__":
main()
5.1.1 API配置
記住自己的API和Secret key 并且不要泄露出去,否則會有資金安全風險无午。
5.1.3 啟動項目
首先 pip install requests這個包
運行上面代碼后媒役,發(fā)現(xiàn)報錯如下:在trader下的constant的Exchange類下添加一個常量
當我們看到這個界面后說明配置完成!
5.2 下載幣安合約數(shù)據(jù)
from datetime import datetime, timedelta
from typing import List
from vnpy_binance import BinanceSpotGateway,BinanceUsdtGateway
from vnpy.event import EventEngine, Event
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import get_database, BarOverview
from vnpy.trader.event import EVENT_LOG
from vnpy.trader.object import HistoryRequest
from vnpy.trader.utility import load_json
setting = {
"key": "",
"secret": "",
"服務(wù)器": "REAL",
"代理地址": "",
"代理端口":
}
"""
一鍵下載幣安現(xiàn)貨的行情數(shù)據(jù)
"""
# 用于初始化引擎和數(shù)據(jù)庫
class BinanceData:
def __init__(self, EventEngine):
self.binance = BinanceUsdtGateway(EventEngine, gateway_name="BINANCE_USDT")
self.binance.connect(setting=setting)
self.database = get_database()
self.proxies = {'https': '127.0.0.1:22307'}
def is_symbol_existed(self, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:
"""判斷下載的數(shù)據(jù)是否重復(fù)出現(xiàn)在數(shù)據(jù)庫中"""
bar_overview: List[BarOverview] = self.database.get_bar_overview()
for bar_view in bar_overview:
if bar_view.symbol != symbol:
continue
if bar_view.symbol == symbol and bar_view.interval != interval:
continue
if bar_view.symbol == symbol and bar_view.interval == interval:
if end < bar_view.start or start > bar_view.end:
continue
else:
return True
return False
def data_to_db(self, symbol: str, interval: Interval, start: datetime, end: datetime):
if self.is_symbol_existed(symbol, interval, start, end):
return
# 獲取新數(shù)據(jù)
print(f"{datetime.now()}正在獲取--{symbol}--k線數(shù)據(jù)")
req = HistoryRequest(
symbol=symbol,
exchange=Exchange.BINANCE,
start=start,
end=end,
interval=interval
)
bars = self.binance.query_history(req)
self.database.save_bar_data(bars)
print(f"{datetime.now()}--{symbol}--k線數(shù)據(jù)已被存入數(shù)據(jù)庫指厌!")
if __name__ == '__main__':
event = EventEngine()
binance = BinanceData(event)
event.start()
event.register(EVENT_LOG, lambda event: print(event.data))
symbols = ["BLURUSDT"]
for symbol in symbols:
binance.data_to_db(symbol, interval=Interval.HOUR, start=datetime(2015, 10, 5),
end=datetime.now())
event.stop()
5.3 下載幣安現(xiàn)貨數(shù)據(jù)
from datetime import datetime, timedelta
from typing import List
from vnpy_binance import BinanceSpotGateway
from vnpy.event import EventEngine, Event
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import get_database, BarOverview
from vnpy.trader.event import EVENT_LOG
from vnpy.trader.object import HistoryRequest
from vnpy.trader.utility import load_json
setting = {
"key": "",
"secret": "",
"服務(wù)器": "REAL",
"代理地址": "",
"代理端口":
}
"""
一鍵下載幣安現(xiàn)貨的行情數(shù)據(jù)
"""
# 用于初始化引擎和數(shù)據(jù)庫
class BinanceData:
def __init__(self, EventEngine):
self.binance = BinanceSpotGateway(EventEngine, gateway_name="BINANCE_SPOT")
self.binance.connect(setting=setting)
self.database = get_database()
self.proxies = {'https': '127.0.0.1:22307'}
def is_symbol_existed(self, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:
"""判斷下載的數(shù)據(jù)是否重復(fù)出現(xiàn)在數(shù)據(jù)庫中"""
bar_overview: List[BarOverview] = self.database.get_bar_overview()
for bar_view in bar_overview:
if bar_view.symbol != symbol:
continue
if bar_view.symbol == symbol and bar_view.interval != interval:
continue
if bar_view.symbol == symbol and bar_view.interval == interval:
if end < bar_view.start or start > bar_view.end:
continue
else:
return True
return False
def data_to_db(self, symbol: str, interval: Interval, start: datetime, end: datetime):
if self.is_symbol_existed(symbol, interval, start, end):
return
# 獲取新數(shù)據(jù)
print(f"{datetime.now()}正在獲取--{symbol}--k線數(shù)據(jù)")
req = HistoryRequest(
symbol=symbol,
exchange=Exchange.BINANCE,
start=start,
end=end,
interval=interval
)
bars = self.binance.query_history(req)
self.database.save_bar_data(bars)
print(f"{datetime.now()}--{symbol}--k線數(shù)據(jù)已被存入數(shù)據(jù)庫刊愚!")
if __name__ == '__main__':
event = EventEngine()
binance = BinanceData(event)
event.start()
event.register(EVENT_LOG, lambda event: print(event.data))
symbols = ["blurusdt"]
for symbol in symbols:
binance.data_to_db(symbol, interval=Interval.MINUTE, start=datetime(2015, 10, 5),
end=datetime.now()-timedelta(days=1))
event.stop()
5.4 下載期貨行情數(shù)據(jù)
from datetime import datetime
from typing import List
from vnpy_rqdata.rqdata_datafeed import RqdataDatafeed
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import get_database, BarOverview
from vnpy.trader.object import HistoryRequest
import rqdatac
# 初始化
rqdataDatafeed = RqdataDatafeed()
rqdataDatafeed.init()
database = get_database()
def query_traffic():
# 獲取流量使用情況
traffic_info = rqdatac.user.get_quota()
print(f"使用流量:{traffic_info['bytes_used'] / (2 ** 20)} Mb \n"
f"剩余流量:{(traffic_info['bytes_limit'] - traffic_info['bytes_used']) / (2 ** 20)} Mb ")
return (traffic_info['bytes_limit'] - traffic_info['bytes_used']) / (2 ** 20)
def is_symbol_existed(database, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:
"""判斷下載的數(shù)據(jù)是否重復(fù)出現(xiàn)在數(shù)據(jù)庫中"""
bar_overview: List[BarOverview] = database.get_bar_overview()
for bar_view in bar_overview:
if bar_view.symbol != symbol:
continue
if bar_view.symbol == symbol and bar_view.interval != interval:
continue
if bar_view.symbol == symbol and bar_view.interval == interval:
if end < bar_view.start or start > bar_view.end:
continue
else:
return True
return False
if __name__ == '__main__':
vt_symbols = [
"i99.DCE", "j99.DCE", "jm99.DCE", "rb99.SHFE", "hc99.SHFE"
]
for vt_symbol in vt_symbols:
symbol, exchange = vt_symbol.split(".")
start = datetime(2010, 1, 1)
end = datetime.now()
interval = Interval.DAILY
if is_symbol_existed(database, symbol, interval, start, end):
print(f"{symbol}的{Interval.MINUTE}數(shù)據(jù)重復(fù)出現(xiàn)在數(shù)據(jù)庫中")
print("-" * 40)
continue
if query_traffic() < 30:
print("-" * 40)
break
historyReq = HistoryRequest(
symbol=symbol,
exchange=Exchange(exchange),
start=start,
end=end,
interval=interval
)
bars = rqdataDatafeed.query_bar_history(historyReq)
database.save_bar_data(bars)
print(f"{vt_symbol}下載完成!")
print()
6.保存成本地csv文件
為了后面方便投研分析以及數(shù)據(jù)備份的需要踩验,我也給出了數(shù)據(jù)轉(zhuǎn)化成csv文件保存的代碼鸥诽。
import datetime
import os.path
from typing import List
from vnpy.trader.constant import Exchange, Interval
from vnpy_datamanager import ManagerEngine
from vnpy.trader.database import BarOverview
from vnpy.trader.engine import MainEngine, EventEngine
def output_data_to_csv(engine, exchange, interval):
bar_view:List[BarOverview] = engine.get_bar_overview()
for bar_ in bar_view:
bar_dict = bar_.__dict__.get("__data__")
if bar_dict.get("interval") == interval and bar_dict.get(
"exchange") == exchange:
print(
f"--------正在保存{bar_dict.get('symbol')}這個文件------------")
# 保存成csv文件
filepath = rf"D:\market\feature\{interval.value}"
if not os.path.exists(filepath):
os.makedirs(filepath)
filepath_name = os.path.join(filepath,
f"{bar_dict.get('symbol')}.{bar_dict.get('exchange').value}.csv")
flag = manage_engine.output_data_to_csv(file_path=filepath_name,
exchange=exchange,
symbol=bar_dict.get(
"symbol"),
interval=interval,
start=bar_dict.get(
"start"),
end=bar_dict.get("end"))
if flag:
print(f"{datetime.datetime.now()},{filepath_name}文件保存成功!")
else:
print(f"{datetime.datetime.now()},{filepath_name}文件保存失敗!")
if __name__ == '__main__':
main_engine = MainEngine()
event_engine = EventEngine()
manage_engine = ManagerEngine(main_engine, event_engine)
output_data_to_csv(manage_engine,
exchange=Exchange.SHFE, interval=Interval.DAILY)
main_engine.close()
7. 總結(jié)
文章介紹了在量化投資中配置本地數(shù)據(jù)庫的重要性,并使用Veighna框架和PostgreSQL數(shù)據(jù)庫來搭建和配置本地數(shù)據(jù)庫箕憾。首先討論了是否真的需要數(shù)據(jù)庫以及數(shù)據(jù)庫的優(yōu)缺點牡借。介紹了Veighna框架和PostgreSQL數(shù)據(jù)庫的特點和安裝配置方法。接下來詳細介紹了Veighna框架中的數(shù)據(jù)庫架構(gòu)和相關(guān)類的功能袭异。文章最后給出了具體的配置步驟钠龙,包括創(chuàng)建.vntrader文件夾和配置vt_setting.json文件,并演示了如何使用run.py運行程序御铃。提供了配置本地數(shù)據(jù)庫的詳細指南碴里,幫助量化投資者方便地存儲和管理市場數(shù)據(jù),為后續(xù)的因子計算和策略研究提供支持上真。