import re
import time
import requests
from datetime import datetime, timezone
import pytz
from Utils.dingding import send_dingtalk_message # 確保這個(gè)路徑和函數(shù)名稱與您的實(shí)際情況匹配
def get_klines(symbol, interval, limit=500):
"""獲取K線數(shù)據(jù)"""
url = "https://fapi.binance.com/fapi/v1/klines"
params = {
'symbol': symbol,
'interval': interval,
'limit': limit,
}
response = requests.get(url, params=params)
return response.json()
def calculate_sma(prices):
"""計(jì)算簡單移動(dòng)平均線(SMA)"""
return sum(prices) / len(prices)
def collect_cross_boll_midline(klines):
"""收集穿越BOLL中線的信息"""
closes = [float(kline[4]) for kline in klines] # 獲取收盤價(jià)
volumes = [float(kline[5]) for kline in klines] # 獲取成交量
total_traded = [float(kline[7]) for kline in klines] # 獲取成交額
buy_volumes = [float(kline[9]) for kline in klines] # 獲取買單成交量
up_crosses = [] # 存儲(chǔ)向上穿越的信息
down_crosses = [] # 存儲(chǔ)向下穿越的信息
previous_state_above = None # 初始狀態(tài)未知
for i in range(20, len(closes)):
sma20 = calculate_sma(closes[i - 20:i]) # 計(jì)算BOLL中線
current_state_above = closes[i] > sma20 # 判斷當(dāng)前價(jià)格是否在BOLL中線之上
active_buy_value = buy_volumes[i] * closes[i] # 主動(dòng)買入成交額的近似計(jì)算
if current_state_above != previous_state_above:
if previous_state_above is not None:
direction = "上穿越" if current_state_above else "下穿越"
time = datetime.utcfromtimestamp(klines[i][0] / 1000)
time = time.replace(tzinfo=timezone.utc).astimezone(pytz.timezone('Asia/Shanghai'))
# info = f"{time.strftime('%Y-%m-%d %H:%M:%S')} {direction} BOLL中線的價(jià)格: {closes[i]}, 成交量: {volumes[i]}, 成交額: {total_traded[i]}, 主動(dòng)買入成交額: {active_buy_value}"
# 修改info的構(gòu)建方式惜傲,包含更多信息
info = {
'time': time.strftime('%Y-%m-%d %H:%M:%S'),
'timestamp': klines[i][0], # 添加Unix時(shí)間戳
'direction': direction,
'price': closes[i],
'volume': volumes[i],
'total_traded': total_traded[i],
'active_buy_value': active_buy_value,
}
if current_state_above:
up_crosses.append(info)
else:
down_crosses.append(info)
previous_state_above = current_state_above
return up_crosses, down_crosses
if __name__ == "__main__":
webhook = "https://oapi.dingtalk.com/robot/send?access_token=8a6ddcf98d3b47c63333580bfe9d0bad55b17272eea05cc9c0af7f7be4de070d"
symbol = "BTCUSDT"
interval = "5m"
# 初始化上一次穿越信息和發(fā)送標(biāo)志位
last_up_cross = None
last_down_cross = None
last_up_cross_15m = None
last_down_cross_15m = None
up_cross_5m_sent = False
down_cross_5m_sent = False
up_cross_15m_sent = False
down_cross_15m_sent = False
up_5m_last_sent_time = 0 #穿越5分鐘中線荚孵,成交額增加厅目,限頻持寄,5分鐘內(nèi)只發(fā)送1次
down_5m_last_sent_time = 0 #穿越5分鐘中線刀诬,成交額增加舷嗡,限頻盔几,5分鐘內(nèi)只發(fā)送1次
up_5min =0 #5分鐘向上穿越信息 5分鐘內(nèi)只發(fā)送1次
down_5min=0 #5分鐘向下穿越信息5分鐘內(nèi)只發(fā)送1次
up_15min=0 #15分鐘向上穿越信息 1分鐘內(nèi)只發(fā)送1次
down_15min = 0 # 15分鐘向下穿越信息1分鐘內(nèi)只發(fā)送1次
# 在主循環(huán)之前初始化變量以跟蹤同時(shí)穿越事件的發(fā)送狀態(tài)
last_sent_up_cross_5m_15m = None
last_sent_down_cross_5m_15m = None
sent_up_cross_5m_15m_notification = False
sent_down_cross_5m_15m_notification = False
last_cross_direction_5m = None
last_cross_direction_15m = None
last_cross_notification_time = 0 # 記錄上一次發(fā)送通知的時(shí)間
while True:
current_time = time.time()
# 5分鐘K線
klines = get_klines(symbol, interval)
up_crosses, down_crosses = collect_cross_boll_midline(klines)
current_up_cross = up_crosses[-1:] if up_crosses else None
current_down_cross = down_crosses[-1:] if down_crosses else None
# 獲取最新的上穿越信息
current_up_cross = up_crosses[-1] if up_crosses else None
# 獲取最新的下穿越信息
current_down_cross = down_crosses[-1] if down_crosses else None
# 15分鐘K線
klines_15m = get_klines(symbol, "15m")
up_crosses_15m, down_crosses_15m = collect_cross_boll_midline(klines_15m)
current_up_cross_15m = up_crosses_15m[-1:] if up_crosses_15m else None
current_down_cross_15m = down_crosses_15m[-1:] if down_crosses_15m else None
current_time = int(time.time() * 1000) # 獲取當(dāng)前時(shí)間的Unix時(shí)間戳(毫秒級(jí))
# 處理5分鐘穿越事件
if current_up_cross and current_up_cross != last_up_cross:
message = "5分鐘向上穿越信息:" + str(current_up_cross)
print(message)
if time.time() - up_5min > 300: # 限頻日月,5分鐘內(nèi)只發(fā)送1次
send_dingtalk_message(webhook=webhook, secret="", message=message)
up_5min = time.time() # 更新發(fā)送時(shí)間
print("5分鐘向上穿越信息,記錄的發(fā)送時(shí)間是:{}".format(up_5min))
last_up_cross = current_up_cross
if current_down_cross and current_down_cross != last_down_cross:
message = "5分鐘向下穿越信息:" + str(current_down_cross)
print(message)
if time.time() - down_5min > 120: # 限頻灼狰,5分鐘內(nèi)只發(fā)送1次
send_dingtalk_message(webhook=webhook, secret="", message=message)
down_5min = time.time() # 更新發(fā)送時(shí)間
print("5分鐘向下穿越信息,記錄的發(fā)送時(shí)間是:{}".format(down_5min))
last_down_cross = current_down_cross
# 處理15分鐘穿越事件
if current_up_cross_15m and current_up_cross_15m != last_up_cross_15m:
message = "15分鐘向上穿越信息:" + str(current_up_cross_15m)
print(message)
if time.time() - up_15min > 120: # 限頻惜浅,1分鐘內(nèi)只發(fā)送1次
send_dingtalk_message(webhook=webhook, secret="", message=message)
up_15min=time.time() # 更新發(fā)送時(shí)間
last_up_cross_15m = current_up_cross_15m
if current_down_cross_15m and current_down_cross_15m != last_down_cross_15m:
message = "15分鐘向下穿越信息:" + str(current_down_cross_15m)
print(message)
if time.time() - down_15min > 60: # 限頻,1分鐘內(nèi)只發(fā)送1次
send_dingtalk_message(webhook=webhook, secret="", message=message)
down_15min=time.time() # 更新發(fā)送時(shí)間
last_down_cross_15m = current_down_cross_15m
#處理5分鐘和15分鐘同時(shí)向上的事件
# 5分鐘K線
klines_5m = get_klines(symbol, "5m")
up_crosses_5m, down_crosses_5m = collect_cross_boll_midline(klines_5m)
# 獲取最新的5分鐘穿越信息
if up_crosses_5m:
current_up_cross_5m = up_crosses_5m[-1]
last_cross_direction_5m = "up"
elif down_crosses_5m:
current_down_cross_5m = down_crosses_5m[-1]
last_cross_direction_5m = "down"
# 15分鐘K線
klines_15m = get_klines(symbol, "15m")
up_crosses_15m, down_crosses_15m = collect_cross_boll_midline(klines_15m)
# 獲取最新的15分鐘穿越信息
if up_crosses_15m:
current_up_cross_15m = up_crosses_15m[-1]
last_cross_direction_15m = "up"
elif down_crosses_15m:
current_down_cross_15m = down_crosses_15m[-1]
last_cross_direction_15m = "down"
current_time_seconds = time.time() # 獲取當(dāng)前時(shí)間的Unix時(shí)間戳(秒級(jí))
# 檢查最近的5分鐘和15分鐘穿越方向是否一致
if last_cross_direction_5m and last_cross_direction_15m and ((current_time_seconds - last_cross_notification_time) > 120):
if last_cross_direction_5m == last_cross_direction_15m:
direction = "向上" if last_cross_direction_5m == "up" else "向下"
message = f"同時(shí)在5分鐘和15分鐘K線上檢測到{direction}穿越事件"
print(message)
# 發(fā)送消息
send_dingtalk_message(webhook=webhook, secret="", message=message)
last_cross_notification_time = current_time_seconds # 更新最后一次發(fā)送通知的時(shí)間
# 重置方向伏嗜,避免重復(fù)發(fā)送相同方向的消息(可選)
last_cross_direction_5m = None
last_cross_direction_15m = None
# 比較時(shí)間戳找到最接近當(dāng)前時(shí)間的事件
time_diff_up = abs(current_time - last_up_cross['timestamp'])
time_diff_down = abs(current_time - last_down_cross['timestamp'])
if time_diff_up < time_diff_down:
current_total_traded = current_up_cross['total_traded']
print("最近的事件是上穿越坛悉,發(fā)生在:", last_up_cross['time'])
# 檢查并發(fā)送釘釘消息的邏輯需要調(diào)整
if last_up_cross is None or current_total_traded > last_up_cross['total_traded']:
print("上一次的成交額是:{}".format(last_up_cross['total_traded']))
if time.time() - up_5m_last_sent_time > 60: # 限頻,5分鐘內(nèi)只發(fā)送1次
message = "5分鐘向上穿越信息承绸,并且成交額增加:" + str(current_up_cross)
print(message)
send_dingtalk_message(webhook=webhook, secret="", message=message)
up_5m_last_sent_time = time.time() # 更新發(fā)送時(shí)間
else:
print("最近的事件是下穿越裸影,發(fā)生在:", last_down_cross['time'])
if not last_down_cross or current_down_cross['total_traded'] > last_down_cross['total_traded']:
message = "5分鐘向下穿越,并且成交額增加:" + str(current_down_cross)
if time.time() - down_5m_last_sent_time > 60: # 限頻军熏,5分鐘內(nèi)只發(fā)送1次
print(message)
send_dingtalk_message(webhook=webhook, secret="", message=message)
down_5m_last_sent_time = time.time() # 更新發(fā)送時(shí)間
time.sleep(50)
5分鐘15分鐘都穿越 代碼模式
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進(jìn)店門焦影,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人封断,你說我怎么就攤上這事斯辰。” “怎么了坡疼?”我有些...
- 文/不壞的土叔 我叫張陵彬呻,是天一觀的道長。 經(jīng)常有香客問我,道長闸氮,這世上最難降的妖魔是什么剪况? 我笑而不...
- 正文 為了忘掉前任,我火速辦了婚禮湖苞,結(jié)果婚禮上拯欧,老公的妹妹穿的比我還像新娘详囤。我一直安慰自己财骨,他們只是感情好,可當(dāng)我...
- 文/花漫 我一把揭開白布藏姐。 她就那樣靜靜地躺著隆箩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪羔杨。 梳的紋絲不亂的頭發(fā)上捌臊,一...
- 文/蒼蘭香墨 我猛地睜開眼举庶,長吁一口氣:“原來是場噩夢啊……” “哼执隧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起户侥,我...
- 序言:老撾萬榮一對情侶失蹤镀琉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后蕊唐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體屋摔,經(jīng)...
- 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
- 正文 我和宋清朗相戀三年替梨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了凡壤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
- 正文 年R本政府宣布,位于F島的核電站铜幽,受9級(jí)特大地震影響滞谢,放射性物質(zhì)發(fā)生泄漏串稀。R本人自食惡果不足惜,卻給世界環(huán)境...
- 文/蒙蒙 一狮杨、第九天 我趴在偏房一處隱蔽的房頂上張望母截。 院中可真熱鬧,春花似錦橄教、人聲如沸清寇。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽华烟。三九已至,卻和暖如春持灰,著一層夾襖步出監(jiān)牢的瞬間盔夜,已是汗流浹背。 一陣腳步聲響...
- 正文 我出身青樓妥泉,卻偏偏與公主長得像椭微,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子涛漂,可洞房花燭夜當(dāng)晚...
推薦閱讀更多精彩內(nèi)容
- 編程5分鐘赏表,查代碼2小時(shí)! 在程序員的編程生涯,免不了和BUG打交道匈仗,甚至有些程序員被BUG虐殺的苦不堪言瓢剿。 當(dāng)有...
- 先來看一下效果圖: 1.支持導(dǎo)航欄顏色自定義2.支持返回按鈕自定義3.支持導(dǎo)航欄右側(cè)按鈕自定義4.支持導(dǎo)航欄標(biāo)題自...
- 每年到這個(gè)時(shí)候犹菱,身邊就會(huì)有很多人開始咳嗽拾稳、咳痰、流鼻涕腊脱、打噴嚏的访得,有些是感冒、有些是哮喘陕凹、有些是氣管炎悍抑,有些的鼻炎...
- Git是個(gè)了不起但卻復(fù)雜的源代碼管理系統(tǒng)。它能支持復(fù)雜的任務(wù)捆姜,卻因此經(jīng)常被認(rèn)為太過復(fù)雜而不適用于簡單的日常工作传趾。讓...