以下代碼是模擬了一個 M/M/c/k/n 系統(tǒng)
M 顧客以指數(shù)分布達到
M 服務(wù)顧客的事件服從指數(shù)分布
c 隊列人數(shù)上限
k 服務(wù)臺的數(shù)目
n 最長等待時間
'''
基礎(chǔ)知識:
1. random.expovariate(miu) 生成均值為 1/miu 的指數(shù)分布的隨機數(shù)
2. 泊松過程的強度參數(shù)的意義:如果泊松過程的強度參數(shù)為 lambda,則在單位時間上新增一次的概率為 lambda梧税,lambda 越大事件越可能發(fā)生
3. 泊松事件的事件間隔彼此獨立且服從參數(shù)為 lambda 的指數(shù)分布
4. ρ = λ/μ
5. 平均等待時間 = ρ/(μ-λ)
6. 平均隊列長度(包含正在被服務(wù)的人) = λ/(μ-λ)
'''
'''
實現(xiàn)的細節(jié):
1. 統(tǒng)計函數(shù)沒有將仿真結(jié)束時沒有被服務(wù)完的人算入
'''
import simpy
import random
from time import time
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
## 隨機種子
randomSeed = time() # time()
## 指數(shù)分布的均值
miuService = 1 # 單位時間平均離開 1 個人
lambdaReachInterval = 0.5 # 單位時間平均來 0.5 個人
## 服務(wù)臺的數(shù)目
numService = 1
## 仿真程序運行的時間 min
Until = 100
## 系統(tǒng)容量
systemCapacity = None # None 表示無容量限制 max(10,numService)
## 最大等待時間 超過這個事件之后顧客會離開隊伍
maxWaiteTime = Until
## 初始隊列長度
initLen = 1
## 客戶類
class Customer(object):
def __init__(self,index_,startTime_,queueLenStart_,reachInterval_):
self.index = index_ # 第幾個來到隊列中來的
self.startTime = startTime_ # 開始時間
self.getServedTime = None # 開始被服務(wù)的時間
self.endTime = None # 結(jié)束時間
self.queueLenStart = queueLenStart_ # 開始排隊時隊列長度
self.queueLenEnd = None # 結(jié)束排隊時隊列長度
self.reachInterval = reachInterval_ # 空閑了多長時間本 customer 才到達
## 顧客列表
customerList = []
## 當(dāng)前隊列長度
queueLen = 0
class System(object):
def __init__(self, env, numService,miuService_):
self.env = env
self.service = simpy.Resource(env, numService)
self.miuService = miuService_
def beingServed(self):
# 服務(wù)事件為均值為 miuService 的指數(shù)分布
yield self.env.timeout(random.expovariate(self.miuService))
def inoutQueue(env, moviegoer, sys):
# 等待被服務(wù)
with sys.service.request() as request: #觀眾向收銀員請求購票
yield request | env.timeout(maxWaiteTime) #觀眾等待收銀員完成面的服務(wù)野来,超過最長等待事件maxCashierTime就會離開
global customerList
customerList[moviegoer].getServedTime = env.now
yield env.process(sys.beingServed())
# 完善統(tǒng)計資料
global queueLen
queueLen -= 1
customerList[moviegoer].endTime = env.now
customerList[moviegoer].queueLenEnd = queueLen
def runSys(env, numService,miuService):
sys = System(env, numService, miuService)
global initLen,customerList
moviegoer = initLen
for moviegoer in range(initLen):#初始化設(shè)置,開始的隊伍長度為 initLen
customerList.append(Customer(moviegoer,env.now,initLen,0))
env.process(inoutQueue(env, moviegoer, sys))
global queueLen
queueLen = initLen
while True:
reachInterval_ = random.expovariate(lambdaReachInterval)
yield env.timeout(reachInterval_) # 顧客到達時間滿足 lambdaReachInterval 的指數(shù)分布
if systemCapacity == None or queueLen <= systemCapacity:
moviegoer += 1
queueLen += 1
customerList.append(Customer(moviegoer,env.now,queueLen,reachInterval_))
env.process(inoutQueue(env, moviegoer, sys))
def plotSimRes(customerList):
#! 初始設(shè)置
# 用于正常顯示中文標(biāo)簽
plt.rcParams['font.sans-serif']=['SimHei']
# 用來正常顯示負號
plt.rcParams['axes.unicode_minus']=False
def plotTime_Service(customerList):
plt.figure(figsize=(14,7)) # 新建一個畫布
plt.xlabel('時間/min')
plt.ylabel('用戶序列')
servedUser = 0
for customer in customerList:
y = [customer.index]*2
# 等待時間
if customer.endTime == None:
customer.endTime = Until
color = 'r'
else:
color = 'b'
servedUser += 1
x = [customer.startTime,customer.endTime]
plt.plot(x,y,color=color)
# 被服務(wù)的時間
if customer.getServedTime != None and customer.endTime != Until:
color = 'g'
x = [customer.getServedTime,customer.endTime]
plt.plot(x,y,color=color)
plt.title("時間-隊列-服務(wù)圖 服務(wù)的用戶數(shù):%d" % servedUser)
def plotQueueLen_time(customerList):
plt.figure(figsize=(14,7)) # 新建一個畫布
plt.xlabel('時間/min')
plt.ylabel('隊列長度/人')
queueLenList = []
for customer in customerList:
queueLenList.append([customer.startTime,customer.queueLenStart])
queueLenList.append([customer.endTime,customer.queueLenEnd])
queueLenList.sort()
preTime = 0
preLen = 0
integralQueueLen = 0
maxLen = 0
global Until
timeInCount = Until
for each in queueLenList:
if each[1] != None:
x = [each[0]] * 2
y = [0,each[1]]
plt.plot(x,y,color='b')
plt.plot(each[0],each[1],'bo')
else:
timeInCount = preTime
break # 沒有把仿真結(jié)束時未被服務(wù)完的人算進來
integralQueueLen += (each[0] - preTime) * preLen
preTime = each[0]
preLen = each[1]
maxLen = max(maxLen,each[1])
averageQueueLen = integralQueueLen / timeInCount
plt.title("時間-隊列長度圖 平均隊列長度:%f" % averageQueueLen)
def plotWaiteTime_time(customerList):
plt.figure(figsize=(14,7)) # 新建一個畫布
plt.xlabel('時間/min')
plt.ylabel('等待時間/min')
queueLenList = []
peopleInCount = 0
for customer in customerList:
if customer.getServedTime != None:
peopleInCount += 1
queueLenList.append([customer.startTime,customer.getServedTime - customer.startTime])
queueLenList.sort()
integralWaiteTime = 0
maxWaiteTime = 0
for each in queueLenList:
x = [each[0]] * 2
y = [0,each[1]]
integralWaiteTime += each[1]
maxWaiteTime = max(maxWaiteTime,each[1])
plt.plot(x,y,color='b')
plt.plot(each[0],each[1],'bo')
averageWaiteTime = integralWaiteTime / peopleInCount
plt.title("時間-等待時間圖 平均等待時間:%f" % averageWaiteTime)
def plotWaiteTime_time_QueueLen(customerList):
fig = plt.figure(figsize=(14,7)) # 新建一個畫布
ax = fig.gca(projection='3d')
plt.xlabel('時間/min')
plt.ylabel('隊列長度/人')
ax.set_zlabel('等待時間/min')
plt.title("時間-隊列長度-等待時間圖")
queueLenList = [] # 格式:時間 隊列長度 等待時間
global Until
for customer in customerList:
if customer.getServedTime != None: # 沒有把仿真結(jié)束時未被服務(wù)完的人算進來
queueLenList.append([customer.startTime,customer.queueLenStart,customer.getServedTime-customer.startTime])
queueLenList.sort(key=lambda x:x[0])
for each in queueLenList:
if each[1] != None:
x = [each[0]]*2
y = [each[1]]*2
z = [0,each[2]]
ax.plot(x,y,z,color='b')
ax.scatter(x[1],y[1],z[1],c='b')
plotTime_Service(customerList)
plotQueueLen_time(customerList)
plotWaiteTime_time(customerList)
# plotWaiteTime_time_QueueLen(customerList)
plt.show()
def main():
random.seed(randomSeed)
#運行模擬場景
env = simpy.Environment()
env.process(runSys(env, numService,miuService))
env.run(until=Until)
#查看統(tǒng)計結(jié)果
plotSimRes(customerList)
main()