1霞玄、需求背景
app需要推送骤铃、如下線通知、折扣通知等
2坷剧、實(shí)現(xiàn)方式
采取將推送任務(wù)放在任務(wù)表中惰爬,然后在管理平臺構(gòu)建推送(異步)任務(wù),讀取推送任務(wù)表听隐,進(jìn)行實(shí)時推送
3补鼻、表設(shè)計(jì)
推送內(nèi)容表
class PushContentInfo(db.Model):
__tablename__ = 'tb_push_content_info'
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
content_title = db.Column(db.String(64), nullable=False, info={'label': '內(nèi)容標(biāo)題'})
push_title = db.Column(db.String(64), nullable=False, info={'label': '推送標(biāo)題'})
push_content = db.Column(db.String(255), nullable=False, info={'label': '推送內(nèi)容'})
redirect_type = db.Column(db.Integer, nullable=False, info={'label': '跳轉(zhuǎn)位置'})
redirect_url = db.Column(db.String(255), nullable=False, info={'label': '跳轉(zhuǎn)url'})
push_target = db.Column(db.String(64), nullable=False, info={'label': '推送目標(biāo)'})
message_save_hours = db.Column(db.Integer, nullable=False, default=0, info={'label': '消息離線保存時間'})
add_time = db.Column(db.DateTime, nullable=False, default=db.func.now(), info={'label': '創(chuàng)建時間'})
update_time = db.Column(db.DateTime, nullable=False, default=db.func.now(), info={'label': '更新時間'})
推送任務(wù)表
class PushNotifyRecord(db.Model):
__tablename__ = 'tb_push_notify_record'
id = db.Column(db.Integer, autoincrement=True, primary_key=True)
uid = db.Column(db.String(64), nullable=False, info={'label': '用戶id'})
# device_code = db.Column(db.String(64), nullable=False, info={'label': '設(shè)備碼'})
notify_app_type = db.Column(db.String(32), nullable=False, default='all', info={'label': '推送目標(biāo)'})
push_content_id = db.Column(db.Integer, nullable=False, default=0, info={'label': '推送目標(biāo)'})
task_status = db.Column(db.Integer, nullable=False, default=0, info={'label': '規(guī)則狀態(tài)'})
add_time = db.Column(db.DateTime, nullable=False, default=db.func.now(), info={'label': '創(chuàng)建時間'})
update_time = db.Column(db.DateTime, nullable=False, default=db.func.now(), info={'label': '更新時間'})
注意:友盟推送只需要用戶id,因?yàn)橛衙丝梢詣e名推送、給每個用戶綁定唯一的別名(使用uid進(jìn)行綁定)
谷歌推送需要單獨(dú)的設(shè)備碼风范,由設(shè)備提供咨跌,每個設(shè)備具備單個設(shè)備碼,如果需要但用戶多設(shè)備推送硼婿,需要進(jìn)行用戶和設(shè)備的關(guān)聯(lián)
異步任務(wù):
@app.task()
def scan_for_push_to_user():
"""
定時通知記錄表,用于通知用戶
:return:
"""
# 獲取待推送記錄
print 'start scan_for_push_to_user'
rs = redis.StrictRedis.from_url(settings.REDIS_URL)
ret = rs.set('qms_push_user_lock', 1, nx=True, ex=20 * 60)
if not ret:
logger.info('[scan_for_push_to_user] being locked')
return
push_task_dict = {}
need_push_records = PushNotifyRecord.objects.filter(task_status=0)
for item_record in need_push_records:
push_task_dict.setdefault(item_record.notify_app_type + '-' + str(item_record.push_content_id), []).append(
item_record
)
for item_task in push_task_dict:
list_length = len(push_task_dict[item_task])
logger.info('[scan_for_push_to_user] task %s push member count %s', item_task, list_length)
limit = 1000
step = list_length / limit + 1
push_app_type, push_content_id = item_task.split('-')
for i in range(step):
start = i * limit
end = start + limit
if start >= list_length:
break
task_id_list = [x.id for x in push_task_dict[item_task][start:end]]
uid_list = [x.uid for x in push_task_dict[item_task][start:end]]
ret = PushNotifyRecord.objects.filter(id__in=task_id_list, task_status=0).update(task_status=1)
if ret != len(task_id_list):
break
push_to_user.delay(task_id_list, uid_list, push_app_type, push_content_id)
# 開始友盟推送
logger.info('u_push------start')
push_content_info = PushContentInfo.objects.get(id=push_content_id)
go_url = push_content_info.redirect_url
task_id_list = [x.id for x in push_task_dict[item_task]]
# 獲取到需要單獨(dú)設(shè)備發(fā)送的數(shù)據(jù)
push_notify_wait = PushNotifyRecord.objects.filter(
id__in=task_id_list,
task_status=1
).all()
task_code_list = []
task_uid_list = []
for i in push_notify_wait:
if i.device_code:
task_code_list.append(i.uid + ':' + i.device_code)
else:
task_uid_list.append(i.uid)
if push_app_type == 'android' or push_app_type == 'all':
android_push.u_group_push(task_code_list,
push_content_info.push_title,
push_content_info.push_content,
'qy_uid_device_android',
go_url, redirect_type=push_content_info.redirect_type)
android_push.u_group_push(task_uid_list,
push_content_info.push_title,
push_content_info.push_content,
'qy_android',
go_url, redirect_type=push_content_info.redirect_type)
if push_app_type == 'ios' or push_app_type == 'all':
ios_push.u_group_push(task_code_list,
push_content_info.push_title,
push_content_info.push_content,
'qy_uid_device_ios',
go_url, redirect_type=push_content_info.redirect_type,
production_mode=settings.U_PUSH_PRODUCTION_MODE)
ios_push.u_group_push(task_uid_list,
push_content_info.push_title,
push_content_info.push_content,
'qy_ios',
go_url, redirect_type=push_content_info.redirect_type,
production_mode=settings.U_PUSH_PRODUCTION_MODE)
PushNotifyRecord.objects.filter(id__in=task_id_list, task_status=1).update(task_status=2)
rs.delete('qms_push_user_lock')
@app.task()
def push_to_user(task_id_list, uid_list, push_app_type, push_content_id):
if uid_list:
logger.info('[g_push_to_user] receive task %s-%s', push_app_type, push_content_id)
push_content_info = PushContentInfo.objects.get(id=push_content_id)
if push_content_info.db_push_target != 'APP':
return True
# 獲取到需要單獨(dú)設(shè)備發(fā)送的數(shù)據(jù)
push_notify_wait = PushNotifyRecord.objects.filter(
id__in=task_id_list,
task_status=1
).all()
member_objs = GoogleMemberDeviceToken.objects.filter(uid__in=uid_list).all()
member_dict = {}
for i in member_objs:
member_dict.setdefault(i.uid, []).append({'code': i.device_code, 'token': i.device_token})
task_token_list = []
for i in push_notify_wait:
if push_app_type == 'android' or push_app_type == 'all':
if i.device_code:
for j in member_dict[i.uid]:
if i.device_code == j['code']:
task_token_list.append(j['token'])
else:
for j in member_dict[i.uid]:
task_token_list.append(j['token'])
for i in task_token_list:
android_push.g_token_push(
i,
push_content_info.push_title,
push_content_info.push_content,
push_content_info.message_save_hours * 60 * 60
)
PushNotifyRecord.objects.filter(id__in=task_id_list, task_status=1).update(task_status=2)
@app.task()
def expired_user_push():
mc = MemberClient()
start_time = (datetime.datetime.now() - datetime.timedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:00')
end_time = (datetime.datetime.now() - datetime.timedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:59')
success, expired_users = mc.query_expired_users(start_time, end_time)
logger.info('response data: %s', expired_users)
if not success:
return True
uids = expired_users.get('uid_list')
if not uids:
return True
t_content_record, _ = PushContentInfo.objects.get_or_create(
content_title='會員權(quán)益到期通知',
push_title='會員權(quán)益到期通知',
push_content='VIP體驗(yàn)權(quán)益已到期锌半,新用戶充值VIP盡享驚喜折扣,去看看吧~',
redirect_url='sixfast://com.xg.push/main?type=qeeyou://personal_center',
)
v_content_record, _ = PushContentInfo.objects.get_or_create(
content_title='會員權(quán)益到期通知',
push_title='會員權(quán)益到期通知',
push_content='您的VIP權(quán)益已到期寇漫,立即續(xù)費(fèi)VIP刊殉,享受極速回國體驗(yàn)~',
redirect_url='sixfast://com.xg.push/main?type=qeeyou://personal_center',
)
for member_id in expired_users.get('uid_list'):
success, result = mc.query(member_id)
if not success:
continue
duration_account = result.get('duration_account')
max_time = datetime.datetime(year=1990, month=1, day=1)
max_time_duration_type = 'OVIP'
for account in duration_account:
duration_time = datetime.datetime.strptime(account.get('duration_expire_at'), "%Y-%m-%d %H:%M:%S")
if duration_time > max_time:
max_time = duration_time
max_time_duration_type = account.get('duration_type')
if max_time > datetime.datetime.now():
continue
if max_time_duration_type in ("TVIP", ):
push_record = PushNotifyRecord(
uid=member_id,
notify_app_type='all',
push_content_id=t_content_record.id
)
else:
push_record = PushNotifyRecord(
uid=member_id,
notify_app_type='all',
push_content_id=v_content_record.id
)
push_record.save()
return True