5分鐘15分鐘都穿越 代碼模式

import re
import time
import requests
from datetime import datetime, timezone
import pytz
from Utils.dingding import send_dingtalk_message  # 確保這個(gè)路徑和函數(shù)名稱與您的實(shí)際情況匹配

def get_klines(symbol, interval, limit=500):
   """獲取K線數(shù)據(jù)"""
   url = "https://fapi.binance.com/fapi/v1/klines"
   params = {
       'symbol': symbol,
       'interval': interval,
       'limit': limit,
   }
   response = requests.get(url, params=params)
   return response.json()

def calculate_sma(prices):
   """計(jì)算簡單移動(dòng)平均線(SMA)"""
   return sum(prices) / len(prices)

def collect_cross_boll_midline(klines):
   """收集穿越BOLL中線的信息"""
   closes = [float(kline[4]) for kline in klines]  # 獲取收盤價(jià)
   volumes = [float(kline[5]) for kline in klines]  # 獲取成交量
   total_traded = [float(kline[7]) for kline in klines]  # 獲取成交額
   buy_volumes = [float(kline[9]) for kline in klines]  # 獲取買單成交量
   up_crosses = []  # 存儲(chǔ)向上穿越的信息
   down_crosses = []  # 存儲(chǔ)向下穿越的信息
   previous_state_above = None  # 初始狀態(tài)未知

   for i in range(20, len(closes)):
       sma20 = calculate_sma(closes[i - 20:i])  # 計(jì)算BOLL中線
       current_state_above = closes[i] > sma20  # 判斷當(dāng)前價(jià)格是否在BOLL中線之上
       active_buy_value = buy_volumes[i] * closes[i]  # 主動(dòng)買入成交額的近似計(jì)算

       if current_state_above != previous_state_above:
           if previous_state_above is not None:
               direction = "上穿越" if current_state_above else "下穿越"
               time = datetime.utcfromtimestamp(klines[i][0] / 1000)
               time = time.replace(tzinfo=timezone.utc).astimezone(pytz.timezone('Asia/Shanghai'))
               # info = f"{time.strftime('%Y-%m-%d %H:%M:%S')} {direction} BOLL中線的價(jià)格: {closes[i]}, 成交量: {volumes[i]}, 成交額: {total_traded[i]}, 主動(dòng)買入成交額: {active_buy_value}"
               # 修改info的構(gòu)建方式惜傲,包含更多信息
               info = {
                   'time': time.strftime('%Y-%m-%d %H:%M:%S'),
                   'timestamp': klines[i][0],  # 添加Unix時(shí)間戳
                   'direction': direction,
                   'price': closes[i],
                   'volume': volumes[i],
                   'total_traded': total_traded[i],
                   'active_buy_value': active_buy_value,
               }
               if current_state_above:
                   up_crosses.append(info)
               else:
                   down_crosses.append(info)

           previous_state_above = current_state_above

   return up_crosses, down_crosses

if __name__ == "__main__":
   webhook = "https://oapi.dingtalk.com/robot/send?access_token=8a6ddcf98d3b47c63333580bfe9d0bad55b17272eea05cc9c0af7f7be4de070d"
   symbol = "BTCUSDT"
   interval = "5m"

   # 初始化上一次穿越信息和發(fā)送標(biāo)志位
   last_up_cross = None
   last_down_cross = None
   last_up_cross_15m = None
   last_down_cross_15m = None

   up_cross_5m_sent = False
   down_cross_5m_sent = False
   up_cross_15m_sent = False
   down_cross_15m_sent = False

   up_5m_last_sent_time = 0 #穿越5分鐘中線荚孵,成交額增加厅目,限頻持寄,5分鐘內(nèi)只發(fā)送1次
   down_5m_last_sent_time = 0 #穿越5分鐘中線刀诬,成交額增加舷嗡,限頻盔几,5分鐘內(nèi)只發(fā)送1次

   up_5min =0 #5分鐘向上穿越信息 5分鐘內(nèi)只發(fā)送1次
   down_5min=0 #5分鐘向下穿越信息5分鐘內(nèi)只發(fā)送1次
   up_15min=0 #15分鐘向上穿越信息 1分鐘內(nèi)只發(fā)送1次
   down_15min = 0  # 15分鐘向下穿越信息1分鐘內(nèi)只發(fā)送1次

   # 在主循環(huán)之前初始化變量以跟蹤同時(shí)穿越事件的發(fā)送狀態(tài)
   last_sent_up_cross_5m_15m = None
   last_sent_down_cross_5m_15m = None
   sent_up_cross_5m_15m_notification = False
   sent_down_cross_5m_15m_notification = False

   last_cross_direction_5m = None
   last_cross_direction_15m = None

   last_cross_notification_time = 0  # 記錄上一次發(fā)送通知的時(shí)間


   while True:
       current_time = time.time()

       # 5分鐘K線
       klines = get_klines(symbol, interval)
       up_crosses, down_crosses = collect_cross_boll_midline(klines)
       current_up_cross = up_crosses[-1:] if up_crosses else None
       current_down_cross = down_crosses[-1:] if down_crosses else None

       # 獲取最新的上穿越信息
       current_up_cross = up_crosses[-1] if up_crosses else None
       # 獲取最新的下穿越信息
       current_down_cross = down_crosses[-1] if down_crosses else None

       # 15分鐘K線
       klines_15m = get_klines(symbol, "15m")
       up_crosses_15m, down_crosses_15m = collect_cross_boll_midline(klines_15m)
       current_up_cross_15m = up_crosses_15m[-1:] if up_crosses_15m else None
       current_down_cross_15m = down_crosses_15m[-1:] if down_crosses_15m else None

       current_time = int(time.time() * 1000)  # 獲取當(dāng)前時(shí)間的Unix時(shí)間戳(毫秒級(jí))

       # 處理5分鐘穿越事件
       if current_up_cross and current_up_cross != last_up_cross:
           message = "5分鐘向上穿越信息:" + str(current_up_cross)
           print(message)
           if time.time() - up_5min > 300:  # 限頻日月,5分鐘內(nèi)只發(fā)送1次
               send_dingtalk_message(webhook=webhook, secret="", message=message)
               up_5min = time.time()  # 更新發(fā)送時(shí)間
               print("5分鐘向上穿越信息,記錄的發(fā)送時(shí)間是:{}".format(up_5min))
           last_up_cross = current_up_cross

       if current_down_cross and current_down_cross != last_down_cross:
           message = "5分鐘向下穿越信息:" + str(current_down_cross)
           print(message)
           if time.time() - down_5min > 120:  # 限頻灼狰,5分鐘內(nèi)只發(fā)送1次
               send_dingtalk_message(webhook=webhook, secret="", message=message)
               down_5min = time.time()  # 更新發(fā)送時(shí)間
               print("5分鐘向下穿越信息,記錄的發(fā)送時(shí)間是:{}".format(down_5min))
           last_down_cross = current_down_cross

       # 處理15分鐘穿越事件
       if current_up_cross_15m and current_up_cross_15m != last_up_cross_15m:
           message = "15分鐘向上穿越信息:" + str(current_up_cross_15m)
           print(message)
           if time.time() - up_15min > 120:  # 限頻惜浅,1分鐘內(nèi)只發(fā)送1次
               send_dingtalk_message(webhook=webhook, secret="", message=message)
               up_15min=time.time()  # 更新發(fā)送時(shí)間
           last_up_cross_15m = current_up_cross_15m

       if current_down_cross_15m and current_down_cross_15m != last_down_cross_15m:
           message = "15分鐘向下穿越信息:" + str(current_down_cross_15m)
           print(message)
           if time.time() - down_15min > 60:  # 限頻,1分鐘內(nèi)只發(fā)送1次
               send_dingtalk_message(webhook=webhook, secret="", message=message)
               down_15min=time.time()  # 更新發(fā)送時(shí)間
           last_down_cross_15m = current_down_cross_15m

       #處理5分鐘和15分鐘同時(shí)向上的事件
       # 5分鐘K線
       klines_5m = get_klines(symbol, "5m")
       up_crosses_5m, down_crosses_5m = collect_cross_boll_midline(klines_5m)
       # 獲取最新的5分鐘穿越信息
       if up_crosses_5m:
           current_up_cross_5m = up_crosses_5m[-1]
           last_cross_direction_5m = "up"
       elif down_crosses_5m:
           current_down_cross_5m = down_crosses_5m[-1]
           last_cross_direction_5m = "down"

       # 15分鐘K線
       klines_15m = get_klines(symbol, "15m")
       up_crosses_15m, down_crosses_15m = collect_cross_boll_midline(klines_15m)
       # 獲取最新的15分鐘穿越信息
       if up_crosses_15m:
           current_up_cross_15m = up_crosses_15m[-1]
           last_cross_direction_15m = "up"
       elif down_crosses_15m:
           current_down_cross_15m = down_crosses_15m[-1]
           last_cross_direction_15m = "down"

       current_time_seconds = time.time()  # 獲取當(dāng)前時(shí)間的Unix時(shí)間戳(秒級(jí))

       # 檢查最近的5分鐘和15分鐘穿越方向是否一致
       if last_cross_direction_5m and last_cross_direction_15m and ((current_time_seconds - last_cross_notification_time) > 120):
           if last_cross_direction_5m == last_cross_direction_15m:
               direction = "向上" if last_cross_direction_5m == "up" else "向下"
               message = f"同時(shí)在5分鐘和15分鐘K線上檢測到{direction}穿越事件"
               print(message)
               # 發(fā)送消息
               send_dingtalk_message(webhook=webhook, secret="", message=message)
               last_cross_notification_time = current_time_seconds  # 更新最后一次發(fā)送通知的時(shí)間
               # 重置方向伏嗜,避免重復(fù)發(fā)送相同方向的消息(可選)
               last_cross_direction_5m = None
               last_cross_direction_15m = None


       # 比較時(shí)間戳找到最接近當(dāng)前時(shí)間的事件
       time_diff_up = abs(current_time - last_up_cross['timestamp'])
       time_diff_down = abs(current_time - last_down_cross['timestamp'])

       if time_diff_up < time_diff_down:
           current_total_traded = current_up_cross['total_traded']
           print("最近的事件是上穿越坛悉,發(fā)生在:", last_up_cross['time'])
           # 檢查并發(fā)送釘釘消息的邏輯需要調(diào)整
           if last_up_cross is None or current_total_traded > last_up_cross['total_traded']:

               print("上一次的成交額是:{}".format(last_up_cross['total_traded']))
               if time.time() - up_5m_last_sent_time > 60:  # 限頻,5分鐘內(nèi)只發(fā)送1次
                   message = "5分鐘向上穿越信息承绸,并且成交額增加:" + str(current_up_cross)
                   print(message)
                   send_dingtalk_message(webhook=webhook, secret="", message=message)
                   up_5m_last_sent_time = time.time()  # 更新發(fā)送時(shí)間
       else:
           print("最近的事件是下穿越裸影,發(fā)生在:", last_down_cross['time'])

           if not last_down_cross or current_down_cross['total_traded'] > last_down_cross['total_traded']:
               message = "5分鐘向下穿越,并且成交額增加:" + str(current_down_cross)
               if time.time() - down_5m_last_sent_time > 60:  # 限頻军熏,5分鐘內(nèi)只發(fā)送1次
                   print(message)
                   send_dingtalk_message(webhook=webhook, secret="", message=message)
                   down_5m_last_sent_time = time.time()  # 更新發(fā)送時(shí)間


       time.sleep(50)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末轩猩,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子荡澎,更是在濱河造成了極大的恐慌均践,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件摩幔,死亡現(xiàn)場離奇詭異彤委,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)或衡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進(jìn)店門焦影,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人封断,你說我怎么就攤上這事斯辰。” “怎么了坡疼?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵彬呻,是天一觀的道長。 經(jīng)常有香客問我,道長闸氮,這世上最難降的妖魔是什么剪况? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮湖苞,結(jié)果婚禮上拯欧,老公的妹妹穿的比我還像新娘详囤。我一直安慰自己财骨,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布藏姐。 她就那樣靜靜地躺著隆箩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪羔杨。 梳的紋絲不亂的頭發(fā)上捌臊,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天,我揣著相機(jī)與錄音兜材,去河邊找鬼理澎。 笑死,一個(gè)胖子當(dāng)著我的面吹牛曙寡,可吹牛的內(nèi)容都是我干的糠爬。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼举庶,長吁一口氣:“原來是場噩夢啊……” “哼执隧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起户侥,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤镀琉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后蕊唐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體屋摔,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年替梨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了凡壤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,503評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡耙替,死狀恐怖亚侠,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情俗扇,我是刑警寧澤硝烂,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站铜幽,受9級(jí)特大地震影響滞谢,放射性物質(zhì)發(fā)生泄漏串稀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一狮杨、第九天 我趴在偏房一處隱蔽的房頂上張望母截。 院中可真熱鬧,春花似錦橄教、人聲如沸清寇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽华烟。三九已至,卻和暖如春持灰,著一層夾襖步出監(jiān)牢的瞬間盔夜,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工堤魁, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留喂链,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓妥泉,卻偏偏與公主長得像椭微,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子涛漂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容