背景
因?yàn)閿?shù)字貨幣交易所的k線數(shù)據(jù)接口都有返回?cái)?shù)量限制绎狭,對(duì)于想要拿到大量歷史數(shù)據(jù)進(jìn)行自己策略回測(cè)而言,數(shù)據(jù)樣本量太小推捐,回測(cè)是沒(méi)有意義的。所以侧啼,這里介紹一種方法牛柒,將交易所里的k線數(shù)據(jù)逐漸地保存到自己的數(shù)據(jù)庫(kù)中,以便日后需要回測(cè)的時(shí)候痊乾,可以有足夠多的歷史數(shù)據(jù)供自己使用皮壁。
Step1: 創(chuàng)建數(shù)據(jù)庫(kù)
CREATE DATABASE 你自己的數(shù)據(jù)庫(kù)名;
Step2: 創(chuàng)建表
CREATE TABLE `okex_swap` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`asset` varchar(18) NOT NULL COMMENT '資產(chǎn)類型',
`kline_type` varchar(4) NOT NULL COMMENT 'k線類型',
`candle_begin_time_GMT8` varchar(20) NOT NULL COMMENT '東8區(qū)時(shí)間',
`timestamp` varchar(15) NOT NULL COMMENT '東8區(qū)時(shí)間戳(ms)',
`open` varchar(64) NOT NULL COMMENT '開(kāi)盤(pán)價(jià)',
`high` varchar(64) NOT NULL COMMENT '最高價(jià)',
`low` varchar(64) NOT NULL COMMENT '最低價(jià)',
`close` varchar(64) NOT NULL COMMENT '收盤(pán)價(jià)',
`volume` varchar(64) NOT NULL COMMENT '交易量(按張折算)',
`currency_volume` varchar(64) NOT NULL COMMENT '交易量(按幣折算)',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `indx_time` (`timestamp`) USING BTREE,
KEY `indx_asset_type` (`asset`,`kline_type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Step3: 程序
import ccxt
from ccxt import Exchange
import pymysql as mysql
import pandas as pd
from datetime import timedelta
from time import sleep
import traceback
okex = ccxt.okex3()
instrument_id = 'BTC-USD-SWAP'
interval = 300 # 5分鐘k線數(shù)據(jù)
try:
while True:
# 獲取kline
klines = okex.swap_get_instruments_instrument_id_candles(
{
'instrument_id': instrument_id,
'granularity': interval
}
)
df = pd.DataFrame(klines, dtype=float)
df.rename(columns={0: 'MTS', 1: 'open', 2: 'high', 3: 'low', 4: 'close', 5: 'volume', 6: 'currency_volume'},inplace=True)
df['MTS'] = df['MTS'].map(lambda x: Exchange.parse8601(x))
df['candle_begin_time'] = pd.to_datetime(df['MTS'], unit='ms')
df['candle_begin_time_GMT8'] = df['candle_begin_time'] + timedelta(hours=8)
df = df[['candle_begin_time_GMT8', 'MTS', 'open', 'high', 'low', 'close', 'volume', 'currency_volume']]
# 創(chuàng)建數(shù)據(jù)庫(kù)鏈接
connection = mysql.connect(
host='填寫(xiě)你自己的數(shù)據(jù)庫(kù)鏈接地址',
user='填寫(xiě)你自己的數(shù)據(jù)庫(kù)用戶名',
password='填寫(xiě)你自己的數(shù)據(jù)庫(kù)密碼',
db='填寫(xiě)你自己的數(shù)據(jù)庫(kù)名',
charset='utf8',
cursorclass=mysql.cursors.DictCursor
)
# 遍歷df
for index, row in df.iterrows():
# 將df中的每一行數(shù)據(jù)逐條插入數(shù)據(jù)庫(kù)
with connection.cursor() as cursor:
sql = ''' INSERT IGNORE INTO `coin`.`okex_swap`( `asset`,`kline_type`,`candle_begin_time_GMT8`, `timestamp`, `open`, `high`, `low`, `close`, `volume`, `currency_volume`)
VALUES ( 'BTC-USD-SWAP', '5min','%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')
''' % (row['candle_begin_time_GMT8'], row['MTS'], row['open'], row['high'], row['low'], row['close'],row['volume'], row['currency_volume'])
cursor.execute(sql)
connection.commit()
print("第 %d 條數(shù)據(jù)保存成功" % index)
print("數(shù)據(jù)保存成功^_^")
sleep(5 * 60)
except Exception as err:
print("保存數(shù)據(jù)時(shí)發(fā)生異常: %s" % traceback.format_exc())
finally:
# 關(guān)閉數(shù)據(jù)庫(kù)連接資源
connection.close()
import ccxt
from ccxt import Exchange
import pymysql as mysql
import pandas as pd
from datetime import timedelta
from time import sleep
import traceback
okex = ccxt.okex3()
instrument_id = 'BTC-USD-SWAP'
interval = 300 # 5分鐘k線數(shù)據(jù)
try:
while True:
# 獲取kline
klines = okex.swap_get_instruments_instrument_id_candles(
{
'instrument_id': instrument_id,
'granularity': interval
}
)
df = pd.DataFrame(klines, dtype=float)
df.rename(columns={0: 'MTS', 1: 'open', 2: 'high', 3: 'low', 4: 'close', 5: 'volume', 6: 'currency_volume'},inplace=True)
df['MTS'] = df['MTS'].map(lambda x: Exchange.parse8601(x))
df['candle_begin_time'] = pd.to_datetime(df['MTS'], unit='ms')
df['candle_begin_time_GMT8'] = df['candle_begin_time'] + timedelta(hours=8)
df = df[['candle_begin_time_GMT8', 'MTS', 'open', 'high', 'low', 'close', 'volume', 'currency_volume']]
# df.sort_index(ascending=False, inplace=True)
# Connect to the database
connection = mysql.connect(
host='dev-mysql.mysql.rds.aliyuncs.com',
user='root',
password='4KkkZ7qja3OWju78rrkH',
db='coin',
charset='utf8',
cursorclass=mysql.cursors.DictCursor
)
list = []
for index, row in df.iterrows():
lst = []
lst.append(str(row['candle_begin_time_GMT8']))
lst.append(str(row['MTS']))
lst.append(str(row['open']))
lst.append(str(row['high']))
lst.append(str(row['low']))
lst.append(str(row['close']))
lst.append(str(row['volume']))
lst.append(str(row['currency_volume']))
list.append(lst)
print("第 %d 條數(shù)據(jù)添加成功" % index)
sql = '''
REPLACE INTO `coin`.`okex_swap`( `asset`,`kline_type`,`candle_begin_time_GMT8`, `timestamp`, `open`, `high`, `low`, `close`, `volume`, `currency_volume`)
VALUES ( 'BTC-USD-SWAP', '5min',%s, %s, %s, %s, %s, %s, %s, %s)
'''
with connection.cursor() as cursor:
cursor.executemany(sql,list)
connection.commit()
print("數(shù)據(jù)保存成功")
sleep(5*60)
except Exception as err:
print("保存數(shù)據(jù)時(shí)發(fā)生異常: %s" % traceback.format_exc())
finally:
connection.close()