在企業(yè)的實際生產(chǎn)中,我們的算法體系需要被設計成軟件才能被使用萍恕。也就是說我在這里編寫的所有代碼章郁,實際上是軟件用戶界面背后的邏輯。這一章我也只是按照UML類圖整理一下業(yè)務邏輯制市,開發(fā)圖形界面并不是我的專業(yè)抬旺。
可以看一下下圖所示的求解器Solver的結(jié)構(gòu)。要想利用Solver來獲取解決方案祥楣,首先需要一個輸入數(shù)據(jù)InputData开财,并且需要將這個輸入數(shù)據(jù)經(jīng)過合理的處理汉柒,將其轉(zhuǎn)變成輸出數(shù)據(jù)OutputData。有了數(shù)據(jù)以后责鳍,還需要利用算法將這些數(shù)據(jù)逐步分析和計算碾褂。
到目前為止,我們已經(jīng)完成了數(shù)據(jù)模型的建立和處理历葛,以及用算法構(gòu)建(Konstruktiv)和評估(Evaluation)正塌。前面的章節(jié)也提到過,啟發(fā)式算法構(gòu)建的解決方案未必是最優(yōu)解恤溶,因此本章我們來講解如何逐步優(yōu)化啟發(fā)式算法提供的解決方案乓诽,并把到目前為止已經(jīng)涉及到的算法和結(jié)構(gòu)體系化。
1 Solution的容器:SolutionPool
上一章我們已經(jīng)利用啟發(fā)式算法得出了不同的解決方案咒程,例如NEH算法總能提供比較接近最優(yōu)解的方案鸠天,但是NEH算法并沒有顧及由于延期交貨導致的罰款問題。如果考慮罰款帶來的額外成本帐姻,NEH的方案未必比得上EDD稠集。
因此我們需要把這些備選方案存放在一個類中,這個類就是SolutionPool饥瓷。初步設想這個類應該有一個類型為列表的屬性巍杈,專門用于存放Solution,還要有一個AddSolution成員函數(shù)扛伍,可以將Solution加到容器中筷畦。此外,我還希望SolutionPool能夠告訴我刺洒,哪個方案能輸出最短的生產(chǎn)時間鳖宾。(當然也可以考慮再加入一個函數(shù),用來處理延期罰款)逆航。
class SolutionPool:
def __init__(self):
self.Solutions = [] # SolutionPool 有一個定語叫做Solutions鼎文,是個list,列表中的元素是Solution
def AddSolution(self, newSolution):
self.Solutions.append(newSolution)
def GetLowestMakespanSolution(self):
self.Solutions.sort(key = lambda solution: solution.Makespan) # sort solutions according to makespan默認從小到大
return self.Solutions[0] # 首個元素是時長最短的
# 存疑[已解決]:solution對象只有經(jīng)過EvaluationLogic之后因俐,solution的定語makespan才會被賦值D赐铩!抹剩!
# 參見OutputData和EvaluationLogic文件
2 把所有啟發(fā)式算法寫入ConstructiveHeuristics
這個類存在的目的就是通過這個類構(gòu)造出對象撑帖,再用對象調(diào)用成員函數(shù)(啟發(fā)式算法),然后返回一個Solution澳眷。這個Solution在返回時已經(jīng)執(zhí)行了EvaluationLogic胡嘿,也就是說StartTimes和EndTimes等屬性已經(jīng)被賦值。在類定義的最后還要把每種啟發(fā)式算法生成的Solution加入到SOlutionPool中钳踊。
*注意:上一章強調(diào)過衷敌,類的成員函數(shù)定義時勿侯,一定要把self作為第一個參數(shù),否則會輸出參數(shù)數(shù)量不一致的錯誤
class ConstructiveHeuristics: # 由多個不同的啟發(fā)式算法構(gòu)成
def __init__(self, evaluationLogic, solutionPool):
self.RandomSeed = 2021
self.RandomRetries = 10
self.EvaluationLogic = evaluationLogic
self.SolutionPool = solutionPool
# FCFS
def FirstComeFirstServe(self, jobList):
tmpPermutation = [*range(len(jobList))] # *的作用是將range創(chuàng)建的可迭代對象進行解包缴罗。如果省略星號助琐,會創(chuàng)建一個列表鳖藕,只包含一個元素邑飒,該元素就是range對象
tmpSolution = Solution(jobList, tmpPermutation) # 構(gòu)造Solution對象彤恶。注意該構(gòu)造方法的使用寇僧,參見OutputJobs中的必選參數(shù)(無默認值)
self.EvaluationLogic.DefineStartEnd(tmpSolution) # 此處的EvaluationL是個對象 ?皮壁?,所以才能調(diào)用EL類中的成員函數(shù)。這一步是為了計算該temSolution的總時間
return tmpSolution
# SPT
def ShortestProcessingTime(self, jobList, allMachines = False): # 注意只有類的成員變量才需要self禀横,函數(shù)的本地變量不需要
jobPool = []
if allMachines:
for i in range(len(jobList)):
jobPool.append((i, sum(jobList[i].ProcessingTime(x) for x in range(len(jobList[i].Operations)))))
else:
for i in range(len(jobList)):
jobPool.append((i, jobList[i].ProcessingTime(0)))
jobPool.sort(key= lambda t: t[1])
tmpPermutation = [x[0] for x in jobPool]
tmpSolution = Solution(jobList, tmpPermutation)
self.EvaluationLogic.DefineStartEnd(tmpSolution)
return tmpSolution
# LPT 最費時間的先做
def LongestProcessingTime(self, jobList, allMachines = False):
jobPool = []
if allMachines:
for i in range(len(jobList)):
jobPool.append((i, sum(jobList[i].ProcessingTime(x) for x in range(len(jobList[i].Operations)))))
else:
for i in range(len(jobList)):
jobPool.append((i, jobList[i].ProcessingTime(0)))
jobPool.sort(key= lambda t: -t[1]) # 按照jobPool中,元組的第二個元素的相反數(shù)進行排列粥血。排在前面的絕對值更大柏锄,也就是時間更長
tmpPermutation = [x[0] for x in jobPool] # 這一步是把jobPool中每個元組的首位元素,即jobid 放在列表中作為Permutation
tmpSolution = Solution(jobList, tmpPermutation)
self.EvaluationLogic.DefineStartEnd(tmpSolution) # 現(xiàn)在已經(jīng)生成了solution复亏,需要用EvaluationLogic類來調(diào)用EL類中的成員函數(shù)趾娃,以計算該solution的總時長
return tmpSolution
# ROS 隨機完成job
def ROS(self, jobList, x, seed):
np.random.seed(seed) ###
tmpSolution = Solution(jobList, 0)
bestCmax = np.inf # unendless 無窮大的浮點數(shù)
for i in range(x):
tmpPermuation = np.random.permutation(len(jobList)) #np.random.permutation生成隨機數(shù)列,長度是len()
# tmpSolution.SetPermutation(tmpPermuation) # SetPermutation是Solution類的成員函數(shù)
tmpSolution.Permutation = tmpPermuation # 這樣比上一行更好
self.EvaluationLogic.DefineStartEnd(tmpSolution)
if (tmpSolution.Makespan < bestCmax):
bestCmax = tmpSolution.Makespan
bestPerm = tmpPermuation
bestSol = Solution(jobList, bestPerm) # ROS中還要篩選出最好的哪個隨機數(shù)列
self.EvaluationLogic.DefineStartEnd(bestSol)
return bestSol
# NEH(要首先確定最佳插入順序) ### NEH的插入函數(shù)寫在EvaluationLogic類中缔御。這樣EL類既可以確定起止時間也能提供最佳插入順序
# def DetermineBestInsertion(solution, jobToInsert): #jobToInsert是指需要被插入的那個job
# ###
# # insert job at front of permutation
# solution.Permutation.insert(0, jobToInsert) # insert函數(shù)抬闷,在索引位置插入數(shù)據(jù),索引后面的往后順移一位耕突。初始肯定在第0位插入
# bestPermutation = deepcopy(solution.Permutation)
# EvaluationLogic().DefineStartEnd(solution)
# bestCmax = solution.Makespan
# ###
# # swap job i to each position and check for improvement
# lengthPermutation = len(solution.Permutation) - 1 #減去1是為了得到使用insert時的索引號
# for j in range(0, lengthPermutation):
# solution.Permutation[j], solution.Permutation[j + 1] = solution.Permutation[j+1], solution.Permutation[j]
# EvaluationLogic().DefineStartEnd(solution)
# if(solution.Makespan < bestCmax):
# bestCmax = solution.Makespan
# bestPermutation = [x for x in solution.Permutation]
# solution.Makespan = bestCmax
# solution.Permutation = bestPermutation
def NEH(self, jobList):
jobPool = []
tmpPerm = []
bestCmax = 0
# Calculate sum of processing times and sort
for i in range(len(jobList)):
jobPool.append((i,sum(jobList[i].ProcessingTime(x) for x in range(len(jobList[i].Operations)))))
jobPool.sort(key=lambda x: x[1], reverse=True)
# Initalize input
tmpNEHOrder = [x[0] for x in jobPool]
tmpPerm.append(tmpNEHOrder[0])
tmpSolution = Solution(jobList,tmpPerm)
# Add next jobs in a loop and check all permutations
for i in range(1,len(tmpNEHOrder)):
# add next job to end and calculate makespan
self.EvaluationLogic.DetermineBestInsertion(tmpSolution, tmpNEHOrder[i])
return tmpSolution
def Run(self, inputData, solutionMethod):
print('Generating an initial solution according to ' + solutionMethod + '.') # 能直接用加號連結(jié)說明solutionMethod是字符串笤成,即算法的名字
solution = None
if solutionMethod == 'FCFS':
solution = self.FirstComeFirstServe(inputData.InputJobs)
elif solutionMethod == 'SPT':
solution = self.ShortestProcessingTime(inputData.InputJobs)
elif solutionMethod == 'LPT':
solution = self.LeastProcessingTime(inputData.InputJobs)
elif solutionMethod == 'ROS':
solution = self.ROS(inputData.InputJobs, self.RandomRetries, self.RandomSeed)
elif solutionMethod == 'NEH':
solution = self.NEH(inputData.InputJobs)
else:
print('Unkown constructive solution method: ' + solutionMethod + '.')
self.SolutionPool.AddSolution(solution) #這里的定語solution Pool屬于solutionPool類,因此可用該類的成員函數(shù)
3 創(chuàng)建Solver類
Solver求解器位于類圖的最頂端眷茁,因此這個類至關(guān)重要炕泳。通過觀察類圖我們得知,Solver應該有以下三個函數(shù):
- ConstructionPhase(constructiveSolutionMethod)
- ImprovementPhase(startSolution, algorithm)
- RunLocalSearch(constructiveSolutionMethod, algorithm)
由于現(xiàn)在我們還處在構(gòu)建階段ConstructionPhase上祈,現(xiàn)在只討論這一個函數(shù)培遵。
在構(gòu)建階段,我們只需要往成員函數(shù)ConstructionPhase中輸入一個啟發(fā)式算法的名稱登刺,然后利用ConstructiveHeuristic的實例去調(diào)用Run函數(shù)籽腕,當然我們可以使用不同的算法多次使用Run寫入Solver的SolutionPool,最后輸出那個總時長最短的方案
class Solver:
def __init__(self, inputData, seed):
self.InputData = inputData
self.Seed = seed
self.RNG = numpy.random.default_rng(seed) #隨機數(shù)生成器
self.EvaluationLogic = EvaluationLogic(inputData) # 定語是El對象纸俭,可以調(diào)用類中的方法节仿??
self.SolutionPool = SolutionPool() # 該構(gòu)造方法沒有必選參數(shù)
self.ConstructiveHeuristic = ConstructiveHeuristics(self.EvaluationLogic, self.SolutionPool) #
def ConstructionPhase(self, constructiveSolutionMethod):
self.ConstructiveHeuristic.Run(self.InputData, constructiveSolutionMethod) #ConstructiveHeuristic實際上就是一個solution
bestInitalSolution = self.SolutionPool.GetLowestMakespanSolution() # solutionPool中掉蔬,最短時長即為最優(yōu)解 -->每次調(diào)用這個成員函數(shù)廊宪,SP中只有一個SOlution矾瘾?
print("Constructive solution found.")
print(bestInitalSolution)
return bestInitalSolution
# 這里我從外部導包
from Solver import *
# 實例化輸入數(shù)據(jù)
data = InputData("InputFlowshopSIST.json")
# 實例化Solver類
solver = Solver(data, 2048) #創(chuàng)建一個solver對象
# 用Solver對象調(diào)用ConstructionPhase
solver.ConstructionPhase('FCFS')
solver.ConstructionPhase('NEH')
solver.ConstructionPhase('LPT')
4 優(yōu)化算法:Neighbourhoods and exchanges
啟發(fā)式算法得出的初始最優(yōu)解很有可能與真實的最優(yōu)解有很大的偏差,因此我們需要對其進行優(yōu)化箭启。我們首選的優(yōu)化算法是鄰域和交換壕翩,原因是這種方法最簡單。
鄰域包含一個或多個解決方案(鄰域解決方案)傅寡,這些解決方案可以通過交換從一個給定的解決方案到達放妈。
在詳細講述鄰域交換之前,先來用兩種方式評估一下一個FCFS方案荐操。
假定現(xiàn)在我們有一個FCFS的方案芜抒,有下面兩種方法來評估這個方案的好壞:
*注意:方法1中,可以直接用之前創(chuàng)建的Solver對象來調(diào)用評估函數(shù)托启,也可以像方法2那樣宅倒,直接調(diào)用評估函數(shù),但是評估函數(shù)需要一個必選參數(shù)——InputJobs類的對象屯耸。
現(xiàn)在考慮FCFS前兩個元素互換位置的情況:
swapMovePermutation = list(permutationFCFS) #create a copy of the permutation
swapMovePermutation[0] = permutationFCFS[1] # 交換順序可以通過元素的賦值來實現(xiàn),這點在講NEH插入的時候用到過
swapMovePermutation[1] = permutationFCFS[0]
swapMoveSolution = Solution(data.InputJobs, swapMovePermutation)
solver.EvaluationLogic.DefineStartEnd(swapMoveSolution)
print(swapMoveSolution)
*通過對比拐迁,僅僅將前兩個元素互換,總時長跟之前相比縮短了疗绣。由此可見线召,互換確實能夠產(chǎn)生有意義的結(jié)果。
為了找到進一步的改進多矮,有必要更廣泛地探索鄰域的情況缓淹。初始解決方案的交換鄰域中的所有交換都應該被創(chuàng)建和評估。為了這個目的塔逃,我們創(chuàng)建了新的類SwapMove割卖。
class SwapMove: # 跟上一個單元格原理類似,列表元素的互換
# 注意這個類中的數(shù)據(jù)類型患雏,swapMove對象有個Permutation屬性鹏溯,我們實際上交換的也正是這個屬性。后面調(diào)用的時候會用到
def __init__(self, initialPermutation, indexA, indexB):
self.Permutation = list(initialPermutation) # create a copy of the permutation
self.IndexA = indexA
self.IndexB = indexB #先初始化成員變量(定語)淹仑,這是面向?qū)ο蟊仨毜囊徊?
# 對成員變量進行修改丙挽,因此不需要返回值
self.Permutation[indexA] = initialPermutation[indexB]
self.Permutation[indexB] = initialPermutation[indexA]
*注意:在我們定義類的時候,通常越普遍越好匀借,例如這個SwapMove類颜阐,它的作用在于借助initialPermutation,把屬性Permutation中位于indexA和B的兩個元素互換吓肋。這兩個index可以隨意指定凳怨,至于指定哪些值,則需要按照具體需求給出參數(shù)
現(xiàn)在我們來把FCFS中的排列進行換位,這里需要注意對稱性肤舞,因為我們把1紫新,2兩個元素換位后,再換回來是完全沒有意義的李剖。
swapMoves = []
for i in range(len(permutationFCFS)):
for j in range(len(permutationFCFS)):
if i < j:
swapMove = SwapMove(permutationFCFS, i, j)
swapMoves.append(swapMove)
print(len(swapMoves)) # 輸出共有多少種交換的情況
# 注意這里swapmoves不能直接打印芒率,因為里面裝的是類SwapMove的對象
答案是55種。
簡單分析一下篙顺,這個問題相當于從n=11個訂單中隨機不重復選出兩個訂單進行換位
現(xiàn)在再來用math模塊檢驗一下:
import math # 這一步的目的是輸出一共有幾次換位
expectedNumberOfMoves = math.comb(len(data.InputJobs), 2) #binomial coefficient "n choose k" 直接輸出換位幾次
actualNumberOfMoves = len(swapMoves) #swapMoves中容納了所有換位的可能情況
print(f'there schould exist {expectedNumberOfMoves} moves , and this is {actualNumberOfMoves == expectedNumberOfMoves}')
新的交換已被創(chuàng)建偶芍,因此有55個新的候選解決方案。現(xiàn)在必須對新的候選解決方案進行評估德玫,以確定是否其中至少有一個解決方案帶來了改進匪蟀。我們將解決方案保存在一個列表中,以便我們以后可以訪問它們宰僧,并確定找到的最佳候選解決方案材彪。
# 把交換后的解決方案放進列表中并輸出55個方案及其總時長
swapMoveSolutions = []
for move in swapMoves:
# 使用SwapMove類的Permutation屬性來構(gòu)建Solution對象
swapMoveSolution = Solution(data.InputJobs, move.Permutation)
solver.EvaluationLogic.DefineStartEnd(swapMoveSolution)
swapMoveSolutions.append(swapMoveSolution)
print('initial solution: ' + str(permutationFCFS))
for solution in swapMoveSolutions:
print(solution)
我們現(xiàn)在想要找到備選方案中的最佳解決方案,并檢查它是否比FCFS解決方案更好撒桨。
可見查刻,換位后我們得出了比FCFS更好的結(jié)果键兜。那么這個更好的結(jié)果究竟在哪些位置發(fā)生了變化凤类?試著輸出有差異的元素的序號:
5 創(chuàng)建SwapNeighborhood類
我們現(xiàn)在把這個邏輯構(gòu)建到一個上位的 SwapNeighborhood類中,代表一個鄰域的互換普气。我們應如何進行谜疤?SwapNeighborhood類的各個組成部分是什么?需要執(zhí)行哪些邏輯步驟现诀?
"""這個類包括了所有從n個訂單中選兩個的可能排列"""
class SwapNeighborhood:
def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
self.InputData = inputData
self.Permutation = initialPermutation
self.EvaluationLogic = evaluationLogic
self.SolutionPool = solutionPool ## 跟下面的SwapMoveSolutions區(qū)分開R目摹!W醒亍W!封锉!
self.Moves = []
self.SwapMoveSolutions = []
"""生成所有可能的交換排列"""
def DiscoverMoves(self):
for i in range(len(self.Permutation)):
for j in range(len(self.Permutation)):
if i < j:
swapMove = SwapMove(self.Permutation, i, j)
self.Moves.append(swapMove) # Self.Moves列表绵跷,里面裝的是SwapMove對象
"""把生成的所有排列self.Moves中的每次交換,都進行實例化后評估成福。這里注意SwapMove類的屬性碾局。找出總時長最短"""
def EvaluateMovesBestImprovement(self):
for move in self.Moves:
swapMoveSolution = Solution(self.InputData.InputJobs, move.Permutation)
self.EvaluationLogic.DefineStartEnd(swapMoveSolution)
self.SwapMoveSolutions.append(swapMoveSolution)
"""另一種評估方式:只評估到第一個優(yōu)化現(xiàn)有方案的交換排列"""
def EvaluateMovesFirstImprovement(self):
bestObjective = self.SolutionPool.GetLowestMakespanSolution().Makespan
for move in self.Moves:
swapMoveSolution = Solution(self.InputData.InputJobs, move.Permutation)
self.EvaluationLogic.DefineStartEnd(swapMoveSolution)
self.SwapMoveSolutions.append(swapMoveSolution)
if swapMoveSolution.Makespan < bestObjective:
# 終止鄰域評估,因為已經(jīng)找到了第一個優(yōu)化的結(jié)果:比pool中的最優(yōu)結(jié)果還要好
return
"""定義了兩個評估函數(shù)奴艾,現(xiàn)在還需要定義一個接口净当,利用這個接口去調(diào)用兩個評估函數(shù)"""
def EvaluateMoves(self, evaluationStrategy):
if evaluationStrategy == 'BestImprovement':
self.EvaluateMovesBestImprovement()
elif evaluationStrategy == 'FirstImprovement':
self.EvaluateMovesFirstImprovement()
else:
print(f'Evaluation strategy {evaluationStrategy} not implemented! ')
# 這個函數(shù)不涉及賦值操作,也不用返回。每次調(diào)用這個函數(shù)像啼,要么調(diào)用其他函數(shù)俘闯,要么打印提示。因此用print就行不用return
"""兩種評估方案各自產(chǎn)生的結(jié)果都被存放在self.SwapMoveSolutions中÷窈希現(xiàn)在要通過MakeBestMove函數(shù)來輸出最優(yōu)解"""
def MakeBestMove(self):
self.SwapMoveSolutions.sort(key = lambda solution: solution.Makespan) # 這里的solution是可以直接調(diào)用Makespan的备徐,因為在兩個評估方案中都是用了EvaluationLogic
bestNeighborhoodSolution = self.SwapMoveSolutions[0]
return bestNeighborhoodSolution
"""使用clear函數(shù)清空列表等容器。猜想:如果我們更換另一種排列方式甚颂,那么之前保存的moves和SwapMoveSolutions就沒用了"""
def Update(self, permutation):
self.Permutation = permutation # 修改類的屬性蜜猾,修改后前面的函數(shù)執(zhí)行都會改變,因此要清空兩個列表屬性
self.Moves.clear()
self.SwapMoveSolutions.clear()
"""進行本地搜索locaksearch振诬,用手頭現(xiàn)有的解決方案去跟經(jīng)過交換的方案進行比較蹭睡,默認交換后會有更好的方案(True)"""
def LocalSearch(self, neighborhoodEvaluationStrategy, solution):
hasSolutionImproved = True
while hasSolutionImproved:
self.Update(solution.Permutation) # 清空兩個列表
self.DiscoverMoves() # 填充Moves列表
self.EvaluateMoves(neighborhoodEvaluationStrategy) # 填充SwapMoveSolutions列表
bestNeighborhoodSolution = self.MakeBestMove()
if bestNeighborhoodSolution.Makespan < solution.Makespan:
print('New best solution has been found!!!')
print(bestNeighborhoodSolution) # 對Solution類的對象執(zhí)行print,會執(zhí)行__str__
self.SolutionPool.AddSolution(bestNeighborhoodSolution)
solution.Permutation = bestNeighborhoodSolution.Permutation
solution.Makespan = bestNeighborhoodSolution.Makespan
else:
print(f'Reached local optimum of {self.Type} neighborhood. Stop local search. ')
hasSolutionImproved = False
我們將邏輯外包給Neighborhood.py
模塊赶么,并將鄰域嵌入ImprovementAlgorithm
類中肩豁,這個類控制優(yōu)化程序的邏輯。
Neighborhood.py
文件中辫呻,已經(jīng)包含了上一節(jié)寫的代碼清钥。不過略有優(yōu)化,SwapMove被定義成BaseNeighborhood的子類放闺,這樣SwapMove可以繼承所有成員函數(shù)并根據(jù)子類的特有需求重寫了父類的成員函數(shù)
from OutputData import *
from copy import deepcopy
class BaseNeighborhood:
def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
self.InputData = inputData
self.Permutation = initialPermutation
self.EvaluationLogic = evaluationLogic
self.SolutionPool = solutionPool
self.Moves = []
self.MoveSolutions = []
self.Type = 'None'
def DiscoverMoves(self):
raise Exception('DiscoverMoves() is not implemented for the abstract BaseNeighborhood class.')
def EvaluateMoves(self, evaluationStrategy):
if evaluationStrategy == 'BestImprovement':
self.EvaluateMovesBestImprovement()
elif evaluationStrategy == 'FirstImprovement':
self.EvaluateMovesFirstImprovement()
else:
raise Exception(f'Evaluation strategy {evaluationStrategy} not implemented.')
""" Evaluate all moves. """
def EvaluateMovesBestImprovement(self):
for move in self.Moves:
moveSolution = Solution(self.InputData.InputJobs, move.Permutation)
self.EvaluationLogic.DefineStartEnd(moveSolution)
self.MoveSolutions.append(moveSolution)
""" Evaluate all moves until the first one is found that improves the best solution found so far. """
def EvaluateMovesFirstImprovement(self):
bestObjective = self.SolutionPool.GetLowestMakespanSolution().Makespan
for move in self.Moves:
moveSolution = Solution(self.InputData.InputJobs, move.Permutation)
if self.Type == 'BestInsertion':
self.EvaluationLogic.DetermineBestInsertionAccelerated(moveSolution, move.removedJob)
else:
self.EvaluationLogic.DefineStartEnd(moveSolution)
self.MoveSolutions.append(moveSolution)
if moveSolution.Makespan < bestObjective:
# abort neighborhood evaluation because an improvement has been found
return
def MakeBestMove(self):
self.MoveSolutions.sort(key = lambda solution: solution.Makespan) # sort solutions according to makespan
bestNeighborhoodSolution = self.MoveSolutions[0]
return bestNeighborhoodSolution
def Update(self, permutation):
self.Permutation = permutation
self.Moves.clear()
self.MoveSolutions.clear()
def LocalSearch(self, neighborhoodEvaluationStrategy, solution):
hasSolutionImproved = True
while hasSolutionImproved:
self.Update(solution.Permutation)
self.DiscoverMoves()
self.EvaluateMoves(neighborhoodEvaluationStrategy)
bestNeighborhoodSolution = self.MakeBestMove()
if bestNeighborhoodSolution.Makespan < solution.Makespan:
# print("New best solution has been found!")
print(bestNeighborhoodSolution)
self.SolutionPool.AddSolution(bestNeighborhoodSolution)
solution.Permutation = bestNeighborhoodSolution.Permutation
solution.Makespan = bestNeighborhoodSolution.Makespan
else:
print(f"Reached local optimum of {self.Type} neighborhood. Stop local search.")
hasSolutionImproved = False
""" Represents the swap of the element at IndexA with the element at IndexB for a given permutation (= solution). """
class SwapMove:
def __init__(self, initialPermutation, indexA, indexB):
self.Permutation = list(initialPermutation) # create a copy of the permutation
self.IndexA = indexA
self.IndexB = indexB
self.Permutation[indexA] = initialPermutation[indexB]
self.Permutation[indexB] = initialPermutation[indexA]
""" Contains all $n choose 2$ swap moves for a given permutation (= solution). """
class SwapNeighborhood(BaseNeighborhood):
def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
super().__init__(inputData, initialPermutation, evaluationLogic, solutionPool)
self.Type = 'Swap'
""" Generate all $n choose 2$ moves. """
def DiscoverMoves(self):
for i in range(len(self.Permutation)):
for j in range(len(self.Permutation)):
if i < j:
swapMove = SwapMove(self.Permutation, i, j)
self.Moves.append(swapMove)
""" Represents the insertion of the element at IndexA at the new position IndexB for a given permutation (= solution). """
class InsertionMove:
def __init__(self, initialPermutation, indexA, indexB):
self.Permutation = [] # create a copy of the permutation
self.IndexA = indexA
self.IndexB = indexB
for k in range(len(initialPermutation)):
if k == indexA:
continue
self.Permutation.append(initialPermutation[k])
self.Permutation.insert(indexB, initialPermutation[indexA])
""" Contains all $(n - 1)^2$ insertion moves for a given permutation (= solution). """
class InsertionNeighborhood(BaseNeighborhood):
def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
super().__init__(inputData, initialPermutation, evaluationLogic, solutionPool)
self.Type = 'Insertion'
def DiscoverMoves(self):
for i in range(len(self.Permutation)):
for j in range(len(self.Permutation)):
if i == j or i == j + 1:
continue
insertionMove = InsertionMove(self.Permutation, i, j)
self.Moves.append(insertionMove)
6 優(yōu)化程序的流程邏輯
我們的優(yōu)化程序是基于鄰域的祟昭,目前我只介紹了SwapNeighborhood這一中鄰域類型。而在ImprovementAlgorithmus類中,我們可以使用多種鄰域類型來創(chuàng)建鄰域Neighborhood怖侦,并把這些鄰域存入字典篡悟,鍵是鄰域的類型,值是鄰域的對象匾寝。最后在ImprovementAlgorithmus的子類IterativeImprovement中搬葬,用鍵把字典遍歷即可得到solution。
"""這是多種不同優(yōu)化算法的基本類艳悔,特定的優(yōu)化算法可以繼承這個基本類"""
class ImprovementAlgorithmus:
def __init__(self, inputData, neighborhoodEvaluationStrategy = 'BestImprovement', neighborhoodTypes = ['Swap']):
self.InputData = inputData # neighborhoodTypes實際上有多種急凰,也可以是Insertion,TwoEdgeExchange猜年。抡锈。。
self.EvaluationLogic = None
self.SolutionPool = None
self.RNG = None # 表示隨機數(shù)
self.NeighborhoodEvaluationStrategy = neighborhoodEvaluationStrategy
self.NeighborhoodTypes = neighborhoodTypes
self.Neighborhoods = {}
def Initialize(self, evaluationLogic, solutionPool, rng = None):
self.EvaluationLogic = evaluationLogic
self.SolutionPool = solutionPool
self.RNG = rng
def InitializeNeighborhoods(self, solution): # 字典Neighborhood中码倦,可能有兩個鍵Insertion和Swap企孩,添加新數(shù)據(jù)時,需要用到相應的構(gòu)造器
self.Neighborhoods['Swap'] = SwapNeighborhood(self.InputData, solution.Permutation, self.EvaluationLogic, self.SolutionPool)
# add further neighborhoods 詳見class ImprovementAlgorithm文件
現(xiàn)在袁稽,優(yōu)化算法 ImprovementAlgorithm
類包含了基于鄰域的優(yōu)化程序的所有基本組件勿璃。這些可以在不同的算法中使用。在這里,我們創(chuàng)建了一個 "迭代優(yōu)化 "類IterativeImprovement
补疑,通過連續(xù)的局部搜索來迭代改進解決方案歧沪。
""" Iterative improvement algorithm through sequential variable neighborhood descent. """
class IterativeImprovement(ImprovementAlgorithm): # einzelne Nachbaren
def __init__(self, inputData, neighborhoodEvaluationStrategy = 'BestImprovement', neighborhoodTypes = ['Swap']):
super().__init__(inputData, neighborhoodEvaluationStrategy, neighborhoodTypes)
def Run(self, solution): #startSolution ?
self.InitializeNeighborhoods(solution) # 初始化即構(gòu)造對象
# According to "Hansen et al. (2017): Variable neighorhood search", this is equivalent to the
# sequential variable neighborhood descent with a pipe neighborhood change step.
for neighborhoodType in self.NeighborhoodTypes:
neighborhood = self.Neighborhoods[neighborhoodType]
neighborhood.LocalSearch(self.NeighborhoodEvaluationStrategy, solution)
return solution
這里Run函數(shù)的用法可以自己思考莲组,前面講過诊胞。InitializeNeighborhoods實際上會用當前參數(shù)中的solution往Neighborhoods字典中填充鄰域?qū)ο蟆W值渲杏辛藘?nèi)容我們才能針對不同類型的鄰域執(zhí)行l(wèi)ocalsearch
現(xiàn)在對Solver類進行擴展锹杈,增加ImprovementPhase()
和RunLocalSearch()
兩個函數(shù)
class Solver:
def __init__(self, inputData, seed):
self.InputData = inputData
self.Seed = seed
self.RNG = numpy.random.default_rng(seed) #隨機數(shù)生成器
self.EvaluationLogic = EvaluationLogic(inputData) # 定語是El對象撵孤,可以調(diào)用類中的方法
self.SolutionPool = SolutionPool()
self.ConstructiveHeuristic = ConstructiveHeuristics(self.EvaluationLogic, self.SolutionPool)
def ConstructionPhase(self, constructiveSolutionMethod):
self.ConstructiveHeuristic.Run(self.InputData, constructiveSolutionMethod)
bestInitalSolution = self.SolutionPool.GetLowestMakespanSolution()
print("Constructive solution found.")
print(bestInitalSolution)
return bestInitalSolution
def ImprovementPhase(self, startSolution, algorithm):
algorithm.Initialize(self.EvaluationLogic, self.SolutionPool, self.RNG)
bestSolution = algorithm.Run(startSolution)
print("Best found Solution.")
print(bestSolution)
def RunLocalSearch(self, constructiveSolutionMethod, algorithm):
startSolution = self.ConstructionPhase(constructiveSolutionMethod)
self.ImprovementPhase(startSolution, algorithm)
*注意:
- 在ImprovementPhase中,有個Initialize函數(shù)竭望,這里對應的是IterativeImprovement類或其他子類的對象邪码,可以調(diào)用他們父類的Initialize函數(shù)和子類中的Run函數(shù)。子類Run函數(shù)實質(zhì)上就是把鄰域進行局部搜索咬清,并返回最優(yōu)解闭专。--> 其實這里已經(jīng)是在RunLocalSearch了,那么為什么還要定義一個叫做RunLocalSearch的函數(shù)旧烧,并在這個函數(shù)中調(diào)用ImprovementPhase影钉?
- 原因在于,這樣做我們在調(diào)用函數(shù)的時候能少寫參數(shù)掘剪。例如平委,在RunLocalSearch中我們會首先構(gòu)建startSolution,也就是啟發(fā)式算法的方案杖小,然后把這個方案作為參數(shù)輸入到優(yōu)化步驟--ImprovementPhase肆汹。編程要理解這種思想愚墓。
*:到目前為止予权,solver類的作用就是通過ConstructionPhase創(chuàng)建初始解決方案,然后把這個方案進行評估浪册。評估時要用到兩個函數(shù)扫腺,但是我們只需要調(diào)用RunLocalSearch這一個函數(shù)就夠了
7 運行實例
from Solver import *
from ImprovementAlgorithm import *
data = InputData("InputFlowshopSIST.JSON")
solver = Solver(data, 2048)
improvementAlgorithm = IterativeImprovement(data)
solver.RunLocalSearch('FCFS', improvementAlgorithm)
*注意:操作流程就是先構(gòu)建一個solver對象,然后用這個solver對象調(diào)用RunLocalSearch村象,第一個參數(shù)是啟發(fā)式算法的名字笆环,之前我們也是這樣定義的;第二個參數(shù)是優(yōu)化算法的對象或?qū)嵗裾摺T跇?gòu)造IterativeImprovement對象時躁劣,我們只有一個必選參數(shù),也就是data(inputData)库菲,可選參數(shù)的值取決于我們想用哪種優(yōu)化方式(best還是first)账忘,以及哪種鄰域類型
*上圖就是用FCFS產(chǎn)生初始方案,再用SwapNeighborhood和BestImprovement得出最優(yōu)解的過程。
問題
到目前為止鳖擒,我們已經(jīng)了解了兩個鄰域的評估策略溉浙。
- Best Improvement(廣泛搜索):對鄰域的所有交換進行評估。有最大改進的交換被執(zhí)行蒋荚。
- First Improvement(選擇性搜索):對一個鄰域的交換進行評估戳稽,直到發(fā)現(xiàn)優(yōu)化。一旦發(fā)現(xiàn)有改進期升,就會中斷評估惊奇,并立即執(zhí)行發(fā)現(xiàn)的交換。因此播赁,更好的交換方案被忽略
你認為赊时,采用哪種評價策略能取得更好的效果?兩種評價策略的優(yōu)勢和劣勢是什么行拢?
from Solver import *
data = InputData("InputFlowshopSIST.json")
constructiveSolutionMethod = 'FCFS'
solverBestImprovement = Solver(data, 2048)
bestImprovementAlgorithm = IterativeImprovement(data, 'BestImprovement')
solverBestImprovement.RunLocalSearch(constructiveSolutionMethod, bestImprovementAlgorithm)
solverFirstImprovement = Solver(data, 2048)
bestImprovementAlgorithm = IterativeImprovement(data, 'FirstImprovement')
solverFirstImprovement.RunLocalSearch(constructiveSolutionMethod, bestImprovementAlgorithm)
localOptimumBestImprovement = solverBestImprovement.SolutionPool.GetLowestMakespanSolution()
localOptimumFirstImprovement = solverFirstImprovement.SolutionPool.GetLowestMakespanSolution()
print(f"Best improvement makespan: {localOptimumBestImprovement.Makespan}")
print(f"First improvement makespan: {localOptimumFirstImprovement.Makespan}")
isBestImprovementMakespanLower = localOptimumBestImprovement.Makespan < localOptimumFirstImprovement.Makespan
print(f"The best improvement strategy achieves a lower makespan: {isBestImprovementMakespanLower}")
from Solver import *
data = InputData("InputFlowshopSIST.json")
improvementAlgorithm = IterativeImprovement(data, 'BestImprovement', ['Swap', 'Insertion'])
solver = Solver(data, 1024)
solver.RunLocalSearch('LPT', improvementAlgorithm)