# -*- coding: utf-8 -*-
# 時間工具
import time
import numpy as nu
import util
class Date(object):
"""
日期對象
"""
__seconds = 0
def __init__(self, seconds=None):
"""
初始化時間對象
:param seconds: 時間戳秒
"""
self.setTime(seconds)
def setTime(self, seconds=None):
"""
設置時間戳
:param seconds:
:return:
"""
if seconds is None:
seconds = self.now()
self.__seconds = seconds
def timeFormat(self, format='%Y-%m-%d %H:%M:%S', seconds=None):
"""
時間格式化
:param format: 格式化方案
:param seconds: 秒 如果不填則默認為當前對象秒
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime(format, time.localtime(seconds))
def getSecond(self, seconds=None):
"""
獲取秒鐘
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%S', time.localtime(seconds))
def getSeparate(self, seconds=None):
"""
獲取分鐘
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%M', time.localtime(seconds))
def getHour(self, seconds=None):
"""
獲取小時
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%H', time.localtime(seconds))
def getDay(self, seconds=None):
"""
獲取月中的日
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%d', time.localtime(seconds))
def getMonth(self, seconds=None):
"""
獲取月
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%m', time.localtime(seconds))
def getYear(self, seconds=None):
"""
獲取年
:param seconds:
:return:
"""
seconds = self.__check_seconds(seconds)
return time.strftime('%Y', time.localtime(seconds))
def __check_seconds(self, seconds=None):
"""
校驗時間戳-秒 參數(shù)
:param seconds:
:return:
"""
if seconds is None:
if self.__seconds is None:
seconds = self.now()
else:
seconds = self.__seconds
return seconds
@staticmethod
def now(mode='s'):
"""
獲取時間戳方案
:param mode: s 秒 ms 毫秒 ns 納秒
:return:
"""
time_mode = nu.array(util.TimeMode)
if not (time_mode == mode).any():
mode = util.TimeMode[0]
ns = str(time.time_ns())
if mode == util.TimeMode[0]:
return int(ns[:len(ns) - 9])
if mode == util.TimeMode[1]:
return int(ns[:len(ns) - 6])
if mode == util.TimeMode[2]:
return int(ns)
# -*- coding: utf-8 -*-
# 異常對象組
class AuthException(Exception):
"""
自定義異常對象 - 帶消息參數(shù)方案
"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
# -*- coding: utf-8 -*-
# 隨機工具
from exceptions import AuthException
from util.time_util import Date
class OrderNumber(object):
"""
訂單號生成對象
"""
# 開始時間戳
__twepoch = 1420041600000
# 機器id所占的位數(shù)
__worker_id_bits = 5
# 數(shù)據(jù)標識id所占的位數(shù)
__datacenter_id_bits = 5
# 支持的最大機器id
__max_worker_id = -1 ^ (-1 << __worker_id_bits)
# 支持的最大標識id
__max_datacenter_id = -1 ^ (-1 << __datacenter_id_bits)
# 序列在id中占的位數(shù)
__sequence_bits = 12
# 機器id向左偏移位
__worker_id_shift = __sequence_bits
# 數(shù)據(jù)標識id向左偏移數(shù)
__datacenter_id_shift = __sequence_bits + __worker_id_shift
# 時間戳向左偏移數(shù)
__timestamp_left_shift = __sequence_bits + __worker_id_bits + __datacenter_id_bits
# 生成序列的掩碼
__sequence_mask = -1 ^ (-1 << __sequence_bits)
# 當前工作的機器id
__worker_id = 0
# 毫秒內(nèi)序列
__datacenter_id = 0
# 毫秒內(nèi)序列
__sequence = 0
# 上一次生成id時的時間戳
__last_timestamp = -1
def __init__(self, worker_id=0, datacenter_id=0):
"""
構造函數(shù)
:param worker_id: 工作id
:param datacenter_id: 數(shù)據(jù)中心id
"""
if worker_id > self.__max_worker_id or worker_id < 0:
raise AuthException("worker Id can't be greater than {} or less than 0".format(self.__max_worker_id))
if datacenter_id > self.__max_datacenter_id or datacenter_id < 0:
raise AuthException(
"datacenter Id can't be greater than %d or less than 0".format(self.__max_datacenter_id))
self.__worker_id = worker_id
self.__datacenter_id = datacenter_id
@staticmethod
def __time_gen():
"""
獲取當前時間(毫秒級)
:return:
"""
return Date().now('ms')
def __time_next_millis(self, last_timestamp: int):
"""
阻塞到下一毫秒級,直到獲取到新的時間戳
:param last_timestamp: 上一次生成id的時間戳
:return:
"""
timestamp = self.__time_gen()
while timestamp <= last_timestamp:
timestamp = self.__time_gen()
return timestamp
def next_id(self):
"""
獲取下一個id
:return:
"""
timestamp = self.__time_gen()
if timestamp < self.__last_timestamp:
raise AuthException("Clock moved backwards. Refusing to generate id for {} milliseconds".format(
self.__last_timestamp - timestamp))
if self.__last_timestamp == timestamp:
self.__sequence = (self.__sequence + 1) & self.__sequence_mask
if self.__sequence == 0:
timestamp = self.__time_next_millis(self.__last_timestamp)
else:
self.__sequence = 0
self.__last_timestamp = timestamp
return ((timestamp - self.__twepoch) << self.__timestamp_left_shift) | (
self.__datacenter_id << self.__datacenter_id_shift) | (
self.__worker_id << self.__worker_id_shift) | self.__sequence
# demo
# import util.random_util as ur
# import threading
#
#
# orders = []
# threads = []
#
#
# def for_run(worker_id=0, datacenter_id=0, thread_num=1):
# order = ur.OrderNumber(worker_id, datacenter_id)
# for i in range(0, 2000):
# order_number = str(order.next_id())
# orders.append(order_number)
# print('進程{}: 生成的訂單號為:'.format(str(thread_num)) + order_number)
#
#
# if __name__ == '__main__':
# for j in range(0, 30):
# t = threading.Thread(target=for_run, args=(j, 0, j))
# threads.append(t)
# for t in threads:
# t.setDaemon(True)
# t.start()
# for t in threads:
# t.join()
# print(len(orders))
# orders = list(set(orders))
# print(len(orders))