7 啟發(fā)式求解器的軟件結(jié)構(gòu)和優(yōu)化算法

在企業(yè)的實際生產(chǎn)中,我們的算法體系需要被設計成軟件才能被使用萍恕。也就是說我在這里編寫的所有代碼章郁,實際上是軟件用戶界面背后的邏輯。這一章我也只是按照UML類圖整理一下業(yè)務邏輯制市,開發(fā)圖形界面并不是我的專業(yè)抬旺。

可以看一下下圖所示的求解器Solver的結(jié)構(gòu)。要想利用Solver來獲取解決方案祥楣,首先需要一個輸入數(shù)據(jù)InputData开财,并且需要將這個輸入數(shù)據(jù)經(jīng)過合理的處理汉柒,將其轉(zhuǎn)變成輸出數(shù)據(jù)OutputData。有了數(shù)據(jù)以后责鳍,還需要利用算法將這些數(shù)據(jù)逐步分析和計算碾褂。

到目前為止,我們已經(jīng)完成了數(shù)據(jù)模型的建立和處理历葛,以及用算法構(gòu)建(Konstruktiv)和評估(Evaluation)正塌。前面的章節(jié)也提到過,啟發(fā)式算法構(gòu)建的解決方案未必是最優(yōu)解恤溶,因此本章我們來講解如何逐步優(yōu)化啟發(fā)式算法提供的解決方案乓诽,并把到目前為止已經(jīng)涉及到的算法和結(jié)構(gòu)體系化。

Solver結(jié)構(gòu)
UML-類圖

1 Solution的容器:SolutionPool

上一章我們已經(jīng)利用啟發(fā)式算法得出了不同的解決方案咒程,例如NEH算法總能提供比較接近最優(yōu)解的方案鸠天,但是NEH算法并沒有顧及由于延期交貨導致的罰款問題。如果考慮罰款帶來的額外成本帐姻,NEH的方案未必比得上EDD稠集。

因此我們需要把這些備選方案存放在一個類中,這個類就是SolutionPool饥瓷。初步設想這個類應該有一個類型為列表的屬性巍杈,專門用于存放Solution,還要有一個AddSolution成員函數(shù)扛伍,可以將Solution加到容器中筷畦。此外,我還希望SolutionPool能夠告訴我刺洒,哪個方案能輸出最短的生產(chǎn)時間鳖宾。(當然也可以考慮再加入一個函數(shù),用來處理延期罰款)逆航。

class SolutionPool:
    def __init__(self):
        self.Solutions = []  # SolutionPool 有一個定語叫做Solutions鼎文,是個list,列表中的元素是Solution

    def AddSolution(self, newSolution): 
        self.Solutions.append(newSolution)

    def GetLowestMakespanSolution(self):
        self.Solutions.sort(key = lambda solution: solution.Makespan) # sort solutions according to makespan默認從小到大
        return self.Solutions[0]  # 首個元素是時長最短的


        # 存疑[已解決]:solution對象只有經(jīng)過EvaluationLogic之后因俐,solution的定語makespan才會被賦值D赐铩!抹剩!
        # 參見OutputData和EvaluationLogic文件

2 把所有啟發(fā)式算法寫入ConstructiveHeuristics

這個類存在的目的就是通過這個類構(gòu)造出對象撑帖,再用對象調(diào)用成員函數(shù)(啟發(fā)式算法),然后返回一個Solution澳眷。這個Solution在返回時已經(jīng)執(zhí)行了EvaluationLogic胡嘿,也就是說StartTimes和EndTimes等屬性已經(jīng)被賦值。在類定義的最后還要把每種啟發(fā)式算法生成的Solution加入到SOlutionPool中钳踊。
*注意:上一章強調(diào)過衷敌,類的成員函數(shù)定義時勿侯,一定要把self作為第一個參數(shù),否則會輸出參數(shù)數(shù)量不一致的錯誤

class ConstructiveHeuristics: # 由多個不同的啟發(fā)式算法構(gòu)成
    def __init__(self, evaluationLogic, solutionPool):
        self.RandomSeed = 2021
        self.RandomRetries = 10
        self.EvaluationLogic = evaluationLogic
        self.SolutionPool = solutionPool
# FCFS
    def FirstComeFirstServe(self, jobList):
        tmpPermutation = [*range(len(jobList))] # *的作用是將range創(chuàng)建的可迭代對象進行解包缴罗。如果省略星號助琐,會創(chuàng)建一個列表鳖藕,只包含一個元素邑飒,該元素就是range對象

        tmpSolution = Solution(jobList, tmpPermutation) # 構(gòu)造Solution對象彤恶。注意該構(gòu)造方法的使用寇僧,參見OutputJobs中的必選參數(shù)(無默認值)
        self.EvaluationLogic.DefineStartEnd(tmpSolution) # 此處的EvaluationL是個對象  ?皮壁?,所以才能調(diào)用EL類中的成員函數(shù)。這一步是為了計算該temSolution的總時間

        return tmpSolution

# SPT
    def ShortestProcessingTime(self, jobList, allMachines = False): # 注意只有類的成員變量才需要self禀横,函數(shù)的本地變量不需要
        jobPool = []
        if allMachines:
            for i in range(len(jobList)):
                jobPool.append((i, sum(jobList[i].ProcessingTime(x) for x in range(len(jobList[i].Operations)))))
        else:
            for i in range(len(jobList)):
                jobPool.append((i, jobList[i].ProcessingTime(0)))
            
        jobPool.sort(key= lambda t: t[1])
        tmpPermutation = [x[0] for x in jobPool]

        tmpSolution = Solution(jobList, tmpPermutation)
        self.EvaluationLogic.DefineStartEnd(tmpSolution)

        return tmpSolution

# LPT 最費時間的先做
    def LongestProcessingTime(self, jobList, allMachines = False):
        jobPool = []
        if allMachines:
            for i in range(len(jobList)):
                jobPool.append((i, sum(jobList[i].ProcessingTime(x) for x in range(len(jobList[i].Operations)))))
        else:
            for i in range(len(jobList)):
                jobPool.append((i, jobList[i].ProcessingTime(0)))
            
        jobPool.sort(key= lambda t: -t[1]) # 按照jobPool中,元組的第二個元素的相反數(shù)進行排列粥血。排在前面的絕對值更大柏锄,也就是時間更長
        tmpPermutation = [x[0] for x in jobPool] # 這一步是把jobPool中每個元組的首位元素,即jobid 放在列表中作為Permutation

        tmpSolution = Solution(jobList, tmpPermutation)
        self.EvaluationLogic.DefineStartEnd(tmpSolution) # 現(xiàn)在已經(jīng)生成了solution复亏,需要用EvaluationLogic類來調(diào)用EL類中的成員函數(shù)趾娃,以計算該solution的總時長

        return tmpSolution

# ROS 隨機完成job
    def ROS(self, jobList, x, seed):
        np.random.seed(seed) ###
        tmpSolution = Solution(jobList, 0)
        bestCmax = np.inf # unendless 無窮大的浮點數(shù)

        for i in range(x):
            tmpPermuation = np.random.permutation(len(jobList)) #np.random.permutation生成隨機數(shù)列,長度是len()
            # tmpSolution.SetPermutation(tmpPermuation) # SetPermutation是Solution類的成員函數(shù)
            tmpSolution.Permutation = tmpPermuation # 這樣比上一行更好

            self.EvaluationLogic.DefineStartEnd(tmpSolution)

            if (tmpSolution.Makespan < bestCmax):
                bestCmax = tmpSolution.Makespan
                bestPerm = tmpPermuation
    
        bestSol = Solution(jobList, bestPerm)  # ROS中還要篩選出最好的哪個隨機數(shù)列
        self.EvaluationLogic.DefineStartEnd(bestSol)

        return bestSol    

# NEH(要首先確定最佳插入順序)  ### NEH的插入函數(shù)寫在EvaluationLogic類中缔御。這樣EL類既可以確定起止時間也能提供最佳插入順序
    # def DetermineBestInsertion(solution, jobToInsert): #jobToInsert是指需要被插入的那個job
    #     ###
    #     # insert job at front of permutation
    #     solution.Permutation.insert(0, jobToInsert)  # insert函數(shù)抬闷,在索引位置插入數(shù)據(jù),索引后面的往后順移一位耕突。初始肯定在第0位插入
    #     bestPermutation = deepcopy(solution.Permutation)
    
    #     EvaluationLogic().DefineStartEnd(solution)
    #     bestCmax = solution.Makespan

    #     ###
    #     # swap job i to each position and check for improvement
    #     lengthPermutation = len(solution.Permutation) - 1 #減去1是為了得到使用insert時的索引號
    #     for j in range(0, lengthPermutation):
    #         solution.Permutation[j], solution.Permutation[j + 1] = solution.Permutation[j+1], solution.Permutation[j]
    #         EvaluationLogic().DefineStartEnd(solution)
    #         if(solution.Makespan < bestCmax):
    #             bestCmax = solution.Makespan
    #             bestPermutation = [x for x in solution.Permutation]

    #     solution.Makespan = bestCmax
    #     solution.Permutation = bestPermutation

    def NEH(self, jobList):
        jobPool = []
        tmpPerm = []
        bestCmax = 0
        # Calculate sum of processing times and sort
        for i in range(len(jobList)):
            jobPool.append((i,sum(jobList[i].ProcessingTime(x) for x in range(len(jobList[i].Operations)))))
        jobPool.sort(key=lambda x: x[1], reverse=True)

        # Initalize input
        tmpNEHOrder = [x[0] for x in jobPool]
        tmpPerm.append(tmpNEHOrder[0])
        tmpSolution = Solution(jobList,tmpPerm)

        # Add next jobs in a loop and check all permutations
        for i in range(1,len(tmpNEHOrder)):
            # add next job to end and calculate makespan
            self.EvaluationLogic.DetermineBestInsertion(tmpSolution, tmpNEHOrder[i])
    
        return tmpSolution


    def Run(self, inputData, solutionMethod):
        print('Generating an initial solution according to ' + solutionMethod + '.') # 能直接用加號連結(jié)說明solutionMethod是字符串笤成,即算法的名字

        solution = None

        if solutionMethod == 'FCFS':
            solution = self.FirstComeFirstServe(inputData.InputJobs)
        elif solutionMethod == 'SPT':
            solution = self.ShortestProcessingTime(inputData.InputJobs)
        elif solutionMethod == 'LPT':
            solution = self.LeastProcessingTime(inputData.InputJobs)
        elif solutionMethod == 'ROS':
            solution = self.ROS(inputData.InputJobs, self.RandomRetries, self.RandomSeed)
        elif solutionMethod == 'NEH':
            solution = self.NEH(inputData.InputJobs)
        else:
            print('Unkown constructive solution method: ' + solutionMethod + '.')

        self.SolutionPool.AddSolution(solution) #這里的定語solution Pool屬于solutionPool類,因此可用該類的成員函數(shù)

3 創(chuàng)建Solver類

Solver求解器位于類圖的最頂端眷茁,因此這個類至關(guān)重要炕泳。通過觀察類圖我們得知,Solver應該有以下三個函數(shù):

  • ConstructionPhase(constructiveSolutionMethod)
  • ImprovementPhase(startSolution, algorithm)
  • RunLocalSearch(constructiveSolutionMethod, algorithm)

由于現(xiàn)在我們還處在構(gòu)建階段ConstructionPhase上祈,現(xiàn)在只討論這一個函數(shù)培遵。
在構(gòu)建階段,我們只需要往成員函數(shù)ConstructionPhase中輸入一個啟發(fā)式算法的名稱登刺,然后利用ConstructiveHeuristic的實例去調(diào)用Run函數(shù)籽腕,當然我們可以使用不同的算法多次使用Run寫入Solver的SolutionPool,最后輸出那個總時長最短的方案

class Solver:
    def __init__(self, inputData, seed):
        self.InputData = inputData
        self.Seed = seed
        self.RNG = numpy.random.default_rng(seed)  #隨機數(shù)生成器

        self.EvaluationLogic = EvaluationLogic(inputData) # 定語是El對象纸俭,可以調(diào)用類中的方法节仿??
        self.SolutionPool = SolutionPool() # 該構(gòu)造方法沒有必選參數(shù)
        
        self.ConstructiveHeuristic = ConstructiveHeuristics(self.EvaluationLogic, self.SolutionPool)  # 

    def ConstructionPhase(self, constructiveSolutionMethod):
        self.ConstructiveHeuristic.Run(self.InputData, constructiveSolutionMethod) #ConstructiveHeuristic實際上就是一個solution

        bestInitalSolution = self.SolutionPool.GetLowestMakespanSolution()  # solutionPool中掉蔬,最短時長即為最優(yōu)解 -->每次調(diào)用這個成員函數(shù)廊宪,SP中只有一個SOlution矾瘾?

        print("Constructive solution found.")
        print(bestInitalSolution)

        return bestInitalSolution
# 這里我從外部導包
from Solver import *

# 實例化輸入數(shù)據(jù)
data  = InputData("InputFlowshopSIST.json")

# 實例化Solver類
solver  = Solver(data, 2048) #創(chuàng)建一個solver對象

# 用Solver對象調(diào)用ConstructionPhase
solver.ConstructionPhase('FCFS')
solver.ConstructionPhase('NEH')
solver.ConstructionPhase('LPT')
啟發(fā)式算法得出的初始最優(yōu)解

4 優(yōu)化算法:Neighbourhoods and exchanges

啟發(fā)式算法得出的初始最優(yōu)解很有可能與真實的最優(yōu)解有很大的偏差,因此我們需要對其進行優(yōu)化箭启。我們首選的優(yōu)化算法是鄰域和交換壕翩,原因是這種方法最簡單。

鄰域包含一個或多個解決方案(鄰域解決方案)傅寡,這些解決方案可以通過交換從一個給定的解決方案到達放妈。

在詳細講述鄰域交換之前,先來用兩種方式評估一下一個FCFS方案荐操。

假定現(xiàn)在我們有一個FCFS的方案芜抒,有下面兩種方法來評估這個方案的好壞:

1

2

*注意:方法1中,可以直接用之前創(chuàng)建的Solver對象來調(diào)用評估函數(shù)托启,也可以像方法2那樣宅倒,直接調(diào)用評估函數(shù),但是評估函數(shù)需要一個必選參數(shù)——InputJobs類的對象屯耸。

現(xiàn)在考慮FCFS前兩個元素互換位置的情況:

swapMovePermutation = list(permutationFCFS) #create a copy of the permutation
swapMovePermutation[0] = permutationFCFS[1]  # 交換順序可以通過元素的賦值來實現(xiàn),這點在講NEH插入的時候用到過
swapMovePermutation[1] = permutationFCFS[0]

swapMoveSolution = Solution(data.InputJobs, swapMovePermutation)


solver.EvaluationLogic.DefineStartEnd(swapMoveSolution)
print(swapMoveSolution)

前兩個元素互換

*通過對比拐迁,僅僅將前兩個元素互換,總時長跟之前相比縮短了疗绣。由此可見线召,互換確實能夠產(chǎn)生有意義的結(jié)果。

為了找到進一步的改進多矮,有必要更廣泛地探索鄰域的情況缓淹。初始解決方案的交換鄰域中的所有交換都應該被創(chuàng)建和評估。為了這個目的塔逃,我們創(chuàng)建了新的類SwapMove割卖。

交換公式

class SwapMove: # 跟上一個單元格原理類似,列表元素的互換
# 注意這個類中的數(shù)據(jù)類型患雏,swapMove對象有個Permutation屬性鹏溯,我們實際上交換的也正是這個屬性。后面調(diào)用的時候會用到
    def __init__(self, initialPermutation, indexA, indexB):
        self.Permutation = list(initialPermutation) # create a copy of the permutation
        self.IndexA = indexA
        self.IndexB = indexB    #先初始化成員變量(定語)淹仑,這是面向?qū)ο蟊仨毜囊徊?
        # 對成員變量進行修改丙挽,因此不需要返回值
        self.Permutation[indexA] = initialPermutation[indexB]
        self.Permutation[indexB] = initialPermutation[indexA]

*注意:在我們定義類的時候,通常越普遍越好匀借,例如這個SwapMove類颜阐,它的作用在于借助initialPermutation,把屬性Permutation中位于indexA和B的兩個元素互換吓肋。這兩個index可以隨意指定凳怨,至于指定哪些值,則需要按照具體需求給出參數(shù)

現(xiàn)在我們來把FCFS中的排列進行換位,這里需要注意對稱性肤舞,因為我們把1紫新,2兩個元素換位后,再換回來是完全沒有意義的李剖。

swapMoves = []
for i in range(len(permutationFCFS)):
    for j in range(len(permutationFCFS)):
        if i < j:
            swapMove = SwapMove(permutationFCFS, i, j)
            swapMoves.append(swapMove)

print(len(swapMoves))  # 輸出共有多少種交換的情況
# 注意這里swapmoves不能直接打印芒率,因為里面裝的是類SwapMove的對象

答案是55種。
簡單分析一下篙顺,這個問題相當于從n=11個訂單中隨機不重復選出兩個訂單進行換位



現(xiàn)在再來用math模塊檢驗一下:

import math # 這一步的目的是輸出一共有幾次換位

expectedNumberOfMoves = math.comb(len(data.InputJobs), 2) #binomial coefficient "n choose k" 直接輸出換位幾次
actualNumberOfMoves = len(swapMoves) #swapMoves中容納了所有換位的可能情況

print(f'there schould exist {expectedNumberOfMoves} moves , and this is {actualNumberOfMoves == expectedNumberOfMoves}')
結(jié)果正確

新的交換已被創(chuàng)建偶芍,因此有55個新的候選解決方案。現(xiàn)在必須對新的候選解決方案進行評估德玫,以確定是否其中至少有一個解決方案帶來了改進匪蟀。我們將解決方案保存在一個列表中,以便我們以后可以訪問它們宰僧,并確定找到的最佳候選解決方案材彪。

# 把交換后的解決方案放進列表中并輸出55個方案及其總時長
swapMoveSolutions = []
for move in swapMoves:

    # 使用SwapMove類的Permutation屬性來構(gòu)建Solution對象
    swapMoveSolution = Solution(data.InputJobs, move.Permutation)
    solver.EvaluationLogic.DefineStartEnd(swapMoveSolution)
    
    swapMoveSolutions.append(swapMoveSolution)

print('initial solution: ' + str(permutationFCFS))

for solution in swapMoveSolutions:
    print(solution)
55個方案

我們現(xiàn)在想要找到備選方案中的最佳解決方案,并檢查它是否比FCFS解決方案更好撒桨。


可見查刻,換位后我們得出了比FCFS更好的結(jié)果键兜。那么這個更好的結(jié)果究竟在哪些位置發(fā)生了變化凤类?試著輸出有差異的元素的序號:


判斷兩個list有差別的元素索引

5 創(chuàng)建SwapNeighborhood類

我們現(xiàn)在把這個邏輯構(gòu)建到一個上位的 SwapNeighborhood類中,代表一個鄰域的互換N({swap}, s)普气。我們應如何進行谜疤?SwapNeighborhood類的各個組成部分是什么?需要執(zhí)行哪些邏輯步驟现诀?

"""這個類包括了所有從n個訂單中選兩個的可能排列"""
class SwapNeighborhood:
    def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
        self.InputData = inputData
        self.Permutation = initialPermutation
        self.EvaluationLogic = evaluationLogic
        self.SolutionPool = solutionPool  ## 跟下面的SwapMoveSolutions區(qū)分開R目摹!W醒亍W!封锉!

        self.Moves = []
        self.SwapMoveSolutions = []

    """生成所有可能的交換排列"""
    def DiscoverMoves(self):
        for i in range(len(self.Permutation)):
            for j in range(len(self.Permutation)):
                if i < j:
                    swapMove = SwapMove(self.Permutation, i, j)
                    self.Moves.append(swapMove)  # Self.Moves列表绵跷,里面裝的是SwapMove對象

    """把生成的所有排列self.Moves中的每次交換,都進行實例化后評估成福。這里注意SwapMove類的屬性碾局。找出總時長最短"""
    def EvaluateMovesBestImprovement(self):
        for move in self.Moves:
            swapMoveSolution = Solution(self.InputData.InputJobs, move.Permutation)
            self.EvaluationLogic.DefineStartEnd(swapMoveSolution)

            self.SwapMoveSolutions.append(swapMoveSolution)


    """另一種評估方式:只評估到第一個優(yōu)化現(xiàn)有方案的交換排列"""
    def EvaluateMovesFirstImprovement(self):
        bestObjective = self.SolutionPool.GetLowestMakespanSolution().Makespan

        for move in self.Moves:
            swapMoveSolution = Solution(self.InputData.InputJobs, move.Permutation)

            self.EvaluationLogic.DefineStartEnd(swapMoveSolution)
            self.SwapMoveSolutions.append(swapMoveSolution)

            if swapMoveSolution.Makespan < bestObjective:
                # 終止鄰域評估,因為已經(jīng)找到了第一個優(yōu)化的結(jié)果:比pool中的最優(yōu)結(jié)果還要好
                return

    """定義了兩個評估函數(shù)奴艾,現(xiàn)在還需要定義一個接口净当,利用這個接口去調(diào)用兩個評估函數(shù)"""
    def EvaluateMoves(self, evaluationStrategy):
        if evaluationStrategy == 'BestImprovement':
            self.EvaluateMovesBestImprovement()
        elif evaluationStrategy == 'FirstImprovement':
            self.EvaluateMovesFirstImprovement()
        else:
            print(f'Evaluation strategy {evaluationStrategy} not implemented! ')
            # 這個函數(shù)不涉及賦值操作,也不用返回。每次調(diào)用這個函數(shù)像啼,要么調(diào)用其他函數(shù)俘闯,要么打印提示。因此用print就行不用return


    """兩種評估方案各自產(chǎn)生的結(jié)果都被存放在self.SwapMoveSolutions中÷窈希現(xiàn)在要通過MakeBestMove函數(shù)來輸出最優(yōu)解"""
    def MakeBestMove(self):
        self.SwapMoveSolutions.sort(key = lambda solution: solution.Makespan) # 這里的solution是可以直接調(diào)用Makespan的备徐,因為在兩個評估方案中都是用了EvaluationLogic
        bestNeighborhoodSolution = self.SwapMoveSolutions[0]

        return bestNeighborhoodSolution

    """使用clear函數(shù)清空列表等容器。猜想:如果我們更換另一種排列方式甚颂,那么之前保存的moves和SwapMoveSolutions就沒用了"""
    def Update(self, permutation):

            self.Permutation = permutation # 修改類的屬性蜜猾,修改后前面的函數(shù)執(zhí)行都會改變,因此要清空兩個列表屬性

            self.Moves.clear()
            self.SwapMoveSolutions.clear()


    """進行本地搜索locaksearch振诬,用手頭現(xiàn)有的解決方案去跟經(jīng)過交換的方案進行比較蹭睡,默認交換后會有更好的方案(True)"""
    def LocalSearch(self, neighborhoodEvaluationStrategy, solution):
        hasSolutionImproved = True

        while hasSolutionImproved:
            self.Update(solution.Permutation)  # 清空兩個列表
            self.DiscoverMoves() # 填充Moves列表
            self.EvaluateMoves(neighborhoodEvaluationStrategy) # 填充SwapMoveSolutions列表

            bestNeighborhoodSolution = self.MakeBestMove()

            if bestNeighborhoodSolution.Makespan < solution.Makespan:
                print('New best solution has been found!!!')
                print(bestNeighborhoodSolution)  # 對Solution類的對象執(zhí)行print,會執(zhí)行__str__

                self.SolutionPool.AddSolution(bestNeighborhoodSolution)

                solution.Permutation = bestNeighborhoodSolution.Permutation
                solution.Makespan = bestNeighborhoodSolution.Makespan

            else:
                print(f'Reached local optimum of {self.Type} neighborhood. Stop local search. ')
                hasSolutionImproved = False

我們將邏輯外包給Neighborhood.py模塊赶么,并將鄰域嵌入ImprovementAlgorithm類中肩豁,這個類控制優(yōu)化程序的邏輯。

Neighborhood.py文件中辫呻,已經(jīng)包含了上一節(jié)寫的代碼清钥。不過略有優(yōu)化,SwapMove被定義成BaseNeighborhood的子類放闺,這樣SwapMove可以繼承所有成員函數(shù)并根據(jù)子類的特有需求重寫了父類的成員函數(shù)

from OutputData import *
from copy import deepcopy
class BaseNeighborhood:
    def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
        self.InputData = inputData
        self.Permutation = initialPermutation
        self.EvaluationLogic = evaluationLogic
        self.SolutionPool = solutionPool

        self.Moves = []
        self.MoveSolutions = []

        self.Type = 'None'

    def DiscoverMoves(self):
        raise Exception('DiscoverMoves() is not implemented for the abstract BaseNeighborhood class.')

    def EvaluateMoves(self, evaluationStrategy):
        if evaluationStrategy == 'BestImprovement':
            self.EvaluateMovesBestImprovement()
        elif evaluationStrategy == 'FirstImprovement':
            self.EvaluateMovesFirstImprovement()
        else:
            raise Exception(f'Evaluation strategy {evaluationStrategy} not implemented.')

    """ Evaluate all moves. """
    def EvaluateMovesBestImprovement(self):
        for move in self.Moves:
            moveSolution = Solution(self.InputData.InputJobs, move.Permutation)

            self.EvaluationLogic.DefineStartEnd(moveSolution)
            
            self.MoveSolutions.append(moveSolution)

    """ Evaluate all moves until the first one is found that improves the best solution found so far. """
    def EvaluateMovesFirstImprovement(self):
        bestObjective = self.SolutionPool.GetLowestMakespanSolution().Makespan

        for move in self.Moves:
            moveSolution = Solution(self.InputData.InputJobs, move.Permutation)

            if self.Type == 'BestInsertion':
                self.EvaluationLogic.DetermineBestInsertionAccelerated(moveSolution, move.removedJob)
            else:
                self.EvaluationLogic.DefineStartEnd(moveSolution)

            self.MoveSolutions.append(moveSolution)

            if moveSolution.Makespan < bestObjective:
                # abort neighborhood evaluation because an improvement has been found
                return

    def MakeBestMove(self):
        self.MoveSolutions.sort(key = lambda solution: solution.Makespan) # sort solutions according to makespan

        bestNeighborhoodSolution = self.MoveSolutions[0]

        return bestNeighborhoodSolution

    def Update(self, permutation):
        self.Permutation = permutation

        self.Moves.clear()
        self.MoveSolutions.clear()

    def LocalSearch(self, neighborhoodEvaluationStrategy, solution):        
        hasSolutionImproved = True

        while hasSolutionImproved:
            self.Update(solution.Permutation)
            self.DiscoverMoves()
            self.EvaluateMoves(neighborhoodEvaluationStrategy)

            bestNeighborhoodSolution = self.MakeBestMove()

            if bestNeighborhoodSolution.Makespan < solution.Makespan:
                # print("New best solution has been found!")
                print(bestNeighborhoodSolution)

                self.SolutionPool.AddSolution(bestNeighborhoodSolution)

                solution.Permutation = bestNeighborhoodSolution.Permutation
                solution.Makespan = bestNeighborhoodSolution.Makespan
            else:
                print(f"Reached local optimum of {self.Type} neighborhood. Stop local search.")
                hasSolutionImproved = False        

""" Represents the swap of the element at IndexA with the element at IndexB for a given permutation (= solution). """
class SwapMove:
    def __init__(self, initialPermutation, indexA, indexB):
        self.Permutation = list(initialPermutation) # create a copy of the permutation
        self.IndexA = indexA
        self.IndexB = indexB

        self.Permutation[indexA] = initialPermutation[indexB]
        self.Permutation[indexB] = initialPermutation[indexA]
        
""" Contains all $n choose 2$ swap moves for a given permutation (= solution). """
class SwapNeighborhood(BaseNeighborhood):
    def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
        super().__init__(inputData, initialPermutation, evaluationLogic, solutionPool)

        self.Type = 'Swap'

    """ Generate all $n choose 2$ moves. """
    def DiscoverMoves(self):
        for i in range(len(self.Permutation)):
            for j in range(len(self.Permutation)):
                if i < j:
                    swapMove = SwapMove(self.Permutation, i, j)
                    self.Moves.append(swapMove)

""" Represents the insertion of the element at IndexA at the new position IndexB for a given permutation (= solution). """
class InsertionMove:
    def __init__(self, initialPermutation, indexA, indexB):
        self.Permutation = [] # create a copy of the permutation
        self.IndexA = indexA
        self.IndexB = indexB

        for k in range(len(initialPermutation)):
            if k == indexA:
                continue

            self.Permutation.append(initialPermutation[k])

        self.Permutation.insert(indexB, initialPermutation[indexA])

""" Contains all $(n - 1)^2$ insertion moves for a given permutation (= solution). """
class InsertionNeighborhood(BaseNeighborhood):
    def __init__(self, inputData, initialPermutation, evaluationLogic, solutionPool):
        super().__init__(inputData, initialPermutation, evaluationLogic, solutionPool)

        self.Type = 'Insertion'

    def DiscoverMoves(self):
        for i in range(len(self.Permutation)):
            for j in range(len(self.Permutation)):
                if i == j or i == j + 1:
                    continue

                insertionMove = InsertionMove(self.Permutation, i, j)
                self.Moves.append(insertionMove)
                
BaseNeighborhood類圖

6 優(yōu)化程序的流程邏輯

ImprovementAlgorithm類圖

我們的優(yōu)化程序是基于鄰域的祟昭,目前我只介紹了SwapNeighborhood這一中鄰域類型。而在ImprovementAlgorithmus類中,我們可以使用多種鄰域類型來創(chuàng)建鄰域Neighborhood怖侦,并把這些鄰域存入字典篡悟,鍵是鄰域的類型,值是鄰域的對象匾寝。最后在ImprovementAlgorithmus的子類IterativeImprovement中搬葬,用鍵把字典遍歷即可得到solution。

"""這是多種不同優(yōu)化算法的基本類艳悔,特定的優(yōu)化算法可以繼承這個基本類"""
class ImprovementAlgorithmus:
    def __init__(self, inputData, neighborhoodEvaluationStrategy = 'BestImprovement', neighborhoodTypes = ['Swap']):
        self.InputData = inputData  # neighborhoodTypes實際上有多種急凰,也可以是Insertion,TwoEdgeExchange猜年。抡锈。。
        self.EvaluationLogic = None
        self.SolutionPool = None
        self.RNG = None # 表示隨機數(shù)

        self.NeighborhoodEvaluationStrategy = neighborhoodEvaluationStrategy
        self.NeighborhoodTypes = neighborhoodTypes
        self.Neighborhoods = {}

    def Initialize(self, evaluationLogic, solutionPool, rng = None):
        self.EvaluationLogic = evaluationLogic
        self.SolutionPool = solutionPool
        self.RNG = rng

    def InitializeNeighborhoods(self, solution): # 字典Neighborhood中码倦,可能有兩個鍵Insertion和Swap企孩,添加新數(shù)據(jù)時,需要用到相應的構(gòu)造器
        self.Neighborhoods['Swap'] = SwapNeighborhood(self.InputData, solution.Permutation, self.EvaluationLogic, self.SolutionPool)
        # add further neighborhoods 詳見class ImprovementAlgorithm文件

現(xiàn)在袁稽,優(yōu)化算法 ImprovementAlgorithm類包含了基于鄰域的優(yōu)化程序的所有基本組件勿璃。這些可以在不同的算法中使用。在這里,我們創(chuàng)建了一個 "迭代優(yōu)化 "類IterativeImprovement补疑,通過連續(xù)的局部搜索來迭代改進解決方案歧沪。

""" Iterative improvement algorithm through sequential variable neighborhood descent. """
class IterativeImprovement(ImprovementAlgorithm): # einzelne Nachbaren
    def __init__(self, inputData, neighborhoodEvaluationStrategy = 'BestImprovement', neighborhoodTypes = ['Swap']):
        super().__init__(inputData, neighborhoodEvaluationStrategy, neighborhoodTypes)

    def Run(self, solution): #startSolution ?
        self.InitializeNeighborhoods(solution)  # 初始化即構(gòu)造對象  

        # According to "Hansen et al. (2017): Variable neighorhood search", this is equivalent to the 
        # sequential variable neighborhood descent with a pipe neighborhood change step.
        for neighborhoodType in self.NeighborhoodTypes:
            neighborhood = self.Neighborhoods[neighborhoodType]

            neighborhood.LocalSearch(self.NeighborhoodEvaluationStrategy, solution)
        
        return solution

這里Run函數(shù)的用法可以自己思考莲组,前面講過诊胞。InitializeNeighborhoods實際上會用當前參數(shù)中的solution往Neighborhoods字典中填充鄰域?qū)ο蟆W值渲杏辛藘?nèi)容我們才能針對不同類型的鄰域執(zhí)行l(wèi)ocalsearch

現(xiàn)在對Solver類進行擴展锹杈,增加ImprovementPhase()RunLocalSearch()兩個函數(shù)

class Solver:
    def __init__(self, inputData, seed):
        
        self.InputData = inputData
        self.Seed = seed
        self.RNG = numpy.random.default_rng(seed)  #隨機數(shù)生成器

        self.EvaluationLogic = EvaluationLogic(inputData) # 定語是El對象撵孤,可以調(diào)用類中的方法
        self.SolutionPool = SolutionPool()
        
        self.ConstructiveHeuristic = ConstructiveHeuristics(self.EvaluationLogic, self.SolutionPool)

    def ConstructionPhase(self, constructiveSolutionMethod):
        self.ConstructiveHeuristic.Run(self.InputData, constructiveSolutionMethod)

        bestInitalSolution = self.SolutionPool.GetLowestMakespanSolution()

        print("Constructive solution found.")
        print(bestInitalSolution)

        return bestInitalSolution


    def ImprovementPhase(self, startSolution, algorithm):
        algorithm.Initialize(self.EvaluationLogic, self.SolutionPool, self.RNG)
        bestSolution = algorithm.Run(startSolution)

        print("Best found Solution.")
        print(bestSolution)

    def RunLocalSearch(self, constructiveSolutionMethod, algorithm):
        startSolution = self.ConstructionPhase(constructiveSolutionMethod)

        self.ImprovementPhase(startSolution, algorithm)

*注意:

  • 在ImprovementPhase中,有個Initialize函數(shù)竭望,這里對應的是IterativeImprovement類或其他子類的對象邪码,可以調(diào)用他們父類的Initialize函數(shù)和子類中的Run函數(shù)。子類Run函數(shù)實質(zhì)上就是把鄰域進行局部搜索咬清,并返回最優(yōu)解闭专。--> 其實這里已經(jīng)是在RunLocalSearch了,那么為什么還要定義一個叫做RunLocalSearch的函數(shù)旧烧,并在這個函數(shù)中調(diào)用ImprovementPhase影钉?
  • 原因在于,這樣做我們在調(diào)用函數(shù)的時候能少寫參數(shù)掘剪。例如平委,在RunLocalSearch中我們會首先構(gòu)建startSolution,也就是啟發(fā)式算法的方案杖小,然后把這個方案作為參數(shù)輸入到優(yōu)化步驟--ImprovementPhase肆汹。編程要理解這種思想愚墓。

*總結(jié):到目前為止予权,solver類的作用就是通過ConstructionPhase創(chuàng)建初始解決方案,然后把這個方案進行評估浪册。評估時要用到兩個函數(shù)扫腺,但是我們只需要調(diào)用RunLocalSearch這一個函數(shù)就夠了

7 運行實例

from Solver import *
from ImprovementAlgorithm import *

data = InputData("InputFlowshopSIST.JSON")

solver = Solver(data, 2048)

improvementAlgorithm = IterativeImprovement(data)

solver.RunLocalSearch('FCFS', improvementAlgorithm)

*注意:操作流程就是先構(gòu)建一個solver對象,然后用這個solver對象調(diào)用RunLocalSearch村象,第一個參數(shù)是啟發(fā)式算法的名字笆环,之前我們也是這樣定義的;第二個參數(shù)是優(yōu)化算法的對象或?qū)嵗裾摺T跇?gòu)造IterativeImprovement對象時躁劣,我們只有一個必選參數(shù),也就是data(inputData)库菲,可選參數(shù)的值取決于我們想用哪種優(yōu)化方式(best還是first)账忘,以及哪種鄰域類型


*上圖就是用FCFS產(chǎn)生初始方案,再用SwapNeighborhood和BestImprovement得出最優(yōu)解的過程。

問題

到目前為止鳖擒,我們已經(jīng)了解了兩個鄰域的評估策略溉浙。

  • Best Improvement(廣泛搜索):對鄰域的所有交換進行評估。有最大改進的交換被執(zhí)行蒋荚。
  • First Improvement(選擇性搜索):對一個鄰域的交換進行評估戳稽,直到發(fā)現(xiàn)優(yōu)化。一旦發(fā)現(xiàn)有改進期升,就會中斷評估惊奇,并立即執(zhí)行發(fā)現(xiàn)的交換。因此播赁,更好的交換方案被忽略

你認為赊时,采用哪種評價策略能取得更好的效果?兩種評價策略的優(yōu)勢和劣勢是什么行拢?

from Solver import *

data = InputData("InputFlowshopSIST.json")

constructiveSolutionMethod = 'FCFS'

solverBestImprovement = Solver(data, 2048)
bestImprovementAlgorithm = IterativeImprovement(data, 'BestImprovement')
solverBestImprovement.RunLocalSearch(constructiveSolutionMethod, bestImprovementAlgorithm)

solverFirstImprovement = Solver(data, 2048)
bestImprovementAlgorithm = IterativeImprovement(data, 'FirstImprovement')
solverFirstImprovement.RunLocalSearch(constructiveSolutionMethod, bestImprovementAlgorithm)

localOptimumBestImprovement = solverBestImprovement.SolutionPool.GetLowestMakespanSolution()
localOptimumFirstImprovement = solverFirstImprovement.SolutionPool.GetLowestMakespanSolution()

print(f"Best improvement makespan: {localOptimumBestImprovement.Makespan}")
print(f"First improvement makespan: {localOptimumFirstImprovement.Makespan}")

isBestImprovementMakespanLower = localOptimumBestImprovement.Makespan < localOptimumFirstImprovement.Makespan

print(f"The best improvement strategy achieves a lower makespan: {isBestImprovementMakespanLower}")
對比兩種評估方案
from Solver import *

data = InputData("InputFlowshopSIST.json")

improvementAlgorithm = IterativeImprovement(data, 'BestImprovement', ['Swap', 'Insertion'])

solver = Solver(data, 1024)

solver.RunLocalSearch('LPT', improvementAlgorithm)
對比Insertion和Swap
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末祖秒,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子舟奠,更是在濱河造成了極大的恐慌竭缝,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件沼瘫,死亡現(xiàn)場離奇詭異抬纸,居然都是意外死亡,警方通過查閱死者的電腦和手機耿戚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門湿故,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人膜蛔,你說我怎么就攤上這事坛猪。” “怎么了皂股?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵墅茉,是天一觀的道長。 經(jīng)常有香客問我呜呐,道長就斤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任蘑辑,我火速辦了婚禮洋机,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘洋魂。我一直安慰自己绷旗,他們只是感情好啄踊,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著刁标,像睡著了一般颠通。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上膀懈,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天顿锰,我揣著相機與錄音,去河邊找鬼启搂。 笑死硼控,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的胳赌。 我是一名探鬼主播牢撼,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼疑苫!你這毒婦竟也來了熏版?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤捍掺,失蹤者是張志新(化名)和其女友劉穎撼短,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體挺勿,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡曲横,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了不瓶。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片禾嫉。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蚊丐,靈堂內(nèi)的尸體忽然破棺而出熙参,到底是詐尸還是另有隱情,我是刑警寧澤吠撮,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布尊惰,位于F島的核電站讲竿,受9級特大地震影響泥兰,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜题禀,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一鞋诗、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧迈嘹,春花似錦削彬、人聲如沸全庸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽壶笼。三九已至,卻和暖如春雁刷,著一層夾襖步出監(jiān)牢的瞬間覆劈,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工沛励, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留责语,地道東北人。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓目派,卻偏偏與公主長得像坤候,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子企蹭,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容