<返回更多

每个程序员都应该知晓的核心搜索算法

2021-04-08  今日头条  搜索算法
加入收藏

“搜索”是一个宏大的主题,本书通篇都可被称为“用Python解决经典的搜索问题”。本章将介绍每个程序员都应该知晓的核心搜索算法。别看标题很响亮,但本章内容其实称不上全面。

2.1 DNA搜索

在计算机软件中,基因通常会表示为字符A、C、G和T的序列。每个字母代表一种核苷酸(nucleotide),3个核苷酸的组合称作密码子(codon)。如图2-1所示,密码子的编码决定了氨基酸的种类,多个氨基酸一起形成了蛋白质(protein)。生物信息学软件的一个经典任务就是在基因中找到某个特定的密码子。

每个程序员都应该知晓的核心搜索算法

 

图2-1 核苷酸由A、C、G和T之一表示。密码子由3个核苷酸组成,基因由多个密码子组成

2.1.1 DNA的存储方案

核苷酸可以表示为包含4种状态的简单类型IntEnum,如代码清单2-1所示。

代码清单2-1 dna_search.py

from enum import IntEnum
from typing import Tuple, List

Nucleotide: IntEnum = IntEnum('Nucleotide', ('A', 'C', 'G', 'T'))

Nucleotide的类型是IntEnum,而不仅仅是Enum,因为IntEnum“免费”提供了比较运算符(<、>=等)。为了使要实现的搜索算法能完成这些操作,需要数据类型支持这些运算符。从typing包中导入Tuple和List,是为了获得类型提示提供的支持。

如代码清单2-2所示,Codon可以定义为包含3个Nucleotide的元组,Gene可以定义为Codon的列表。

代码清单2-2 dna_search.py(续)

Codon = Tuple[Nucleotide, Nucleotide, Nucleotide]  # type alias for codons
Gene = List[Codon]  # type alias for genes

注意 尽管稍后需要对Codon进行相互比较,但是此处并不需要为Codon定义显式地实现了“<”操作符的自定义类,这是因为只要组成元组的元素类型是可比较的,Python就内置支持元组的比较操作。

互联网上的基因数据通常都是以文件格式提供的,其中包含了代表基因序列中所有核苷酸的超长字符串。下面将为某个虚构的基因定义这样一个字符串,并将其命名为gene_str,具体代码如代码清单2-3所示。

代码清单2-3 dna_search.py(续)

gene_str: str = "ACGTGGCTCTCTAACGTACGTACGTACGGGGTTTATATATACCCTAGGACTCCCTTT"

这里还需要一个实用函数把str转换为Gene。具体代码如代码清单2-4所示。

代码清单2-4 dna_search.py(续)

def string_to_gene(s: str) -> Gene:
    gene: Gene = []
    for i in range(0, len(s), 3):
        if (i + 2) >= len(s):  # don't run off end!
           return gene
        #  initialize codon out of three nucleotides
        codon: Codon = (Nucleotide[s[i]], Nucleotide[s[i + 1]], Nucleotide[s[i + 2]])
        gene.Append(codon)  # add codon to gene
    return gene

string_to_gene()遍历给定的str,把每3个字符转换为Codon,并将其追加到新建的Gene的末尾。如果该函数发现从正在读取的s的当前位置开始不够放下两个Nucleotide的位置(参见循环中的if语句),就说明已经到达不完整基因的末尾了,于是最后一个或两个核苷酸就会被跳过。

string_to_gene()可用于把str类型的gene_str转换为Gene。具体代码如代码清单2-5所示。

代码清单2-5 dna_search.py(续)

my_gene: Gene = string_to_gene(gene_str)

2.1.2 线性搜索

基因需要执行的一项基本操作就是搜索指定的密码子,目标就是简单地查找该密码子是否存在于基因中。

线性搜索(linear search)算法将按照原始数据结构的顺序遍历搜索空间(search space)中的每个元素,直到找到搜索内容或到达数据结构的末尾。其实对搜索而言,线性搜索是最简单、最自然、最显而易见的方式。在最坏的情况下,线性搜索将需要遍历数据结构中的每个元素,因此它的复杂度为O(n),其中n是数据结构中元素的数量,如图2-2所示。

每个程序员都应该知晓的核心搜索算法

 

图2-2 在最坏情况下,线性搜索将需要遍历数组的每个元素

线性搜索函数的定义非常简单。它只需要遍历数据结构中的每个元素,并检查每个元素是否与所查找的数据相等。代码清单2-6所示的代码就对Gene和Codon定义了这样一个函数,然后尝试对my_gene和名为acg和gat的Codon对象调用这个函数。

代码清单2-6 dna_search.py(续)

def linear_contains(gene: Gene, key_codon: Codon) -> bool:
    for codon in gene:
        if codon == key_codon:
            return True
    return False

acg: Codon = (Nucleotide.A, Nucleotide.C, Nucleotide.G)
gat: Codon = (Nucleotide.G, Nucleotide.A, Nucleotide.T)
print(linear_contains(my_gene, acg))  # True
print(linear_contains(my_gene, gat))  # False

注意 上述函数仅供演示。Python内置的序列类型(list、tuple、range)都已实现了__contains__()方法,这样就能简单地用in操作符在其中搜索某个指定的数据项。实际上,in运算符可以与任何已实现__contains__()方法的类型一起使用。例如,执行print(acg in my_gene)语句即可在my_gene中搜索acg并打印出结果。

2.1.3 二分搜索

有一种搜索方法比查看每个元素速度快,但需要提前了解数据结构的顺序。如果我们知道某数据结构已经排过序,并且可以通过数据项的索引直接访问其中的每一个数据项,就可以执行二分搜索(binary search)。根据这一标准,已排序的Python List是二分搜索的理想对象。

二分搜索的原理如下:查看一定范围内有序元素的中间位置的元素,将其与所查找的元素进行比较,根据比较结果将搜索范围缩小一半,然后重复上述过程。下面看一个具体的例子。

假定有一个按字母顺序排列的单词List,类似于["cat", "dog", "kangaroo", "llama", "rabbit", "rat", "zebra"],要查找的单词是“rat”。

(1)可以确定在这7个单词的列表中,中间位置的元素为“llama”。

(2)可以确定按字母顺序“rat”将排在“llama”之后,因此它一定位于“llama”之后的一半(近似)列表中。(如果在本步中已经找到“rat”,就应该返回它的位置;如果发现所查找的单词排在当前的中间单词之前,就可以确信它位于“llama”之前的一半列表中。)

(3)可以对“rat”有可能存在其中的半个列表再次执行第1步和第2步。这半个列表事实上就成了新的目标列表。这些步骤会持续执行下去,直至找到“rat”或者当前搜索范围中不再包含待查找的元素(意味着该单词列表中不存在“rat”)。

图2-3展示了二分搜索的过程。请注意,与线性搜索不同,它不需要搜索每个元素。

每个程序员都应该知晓的核心搜索算法

 

图2-3 最坏情况下,二分搜索仅会遍历lg(n)个列表元素

二分搜索将搜索空间不停地减半,因此它的最坏情况运行时间为O(lg n)。但是这里还有个排序问题。与线性搜索不同,二分搜索需要对有序的数据结构才能进行搜索,而排序是需要时间的。实际上,最好的排序算法也需要O(n lg n)的时间才能完成。如果我们只打算运行一次搜索,并且原数据结构未经排序,那么可能进行线性搜索就比较合理。但如果要进行多次搜索,那么用于排序所付出的时间成本就是值得的,获得的收益来自每次搜索大幅减少的时间成本。

为基因和密码子编写二分搜索函数,与为其他类型的数据编写搜索函数没什么区别,因为同为Codon类型的数据可以相互比较,而Gene类型只不过是一个List。具体代码如代码清单2-7所示。

代码清单2-7 dna_search.py(续)

def binary_contains(gene: Gene, key_codon: Codon) -> bool:
    low: int = 0
    high: int = len(gene) - 1
    while low <= high:  # while there is still a search space
        mid: int = (low + high) // 2
        if gene[mid] < key_codon:
            low = mid + 1
        elif gene[mid] > key_codon:
            high = mid - 1
        else:
            return True
    return False

下面就逐行过一遍这个函数。

low: int = 0
high: int = len(gene) - 1

首先看一下包含整个列表(gene)的范围。

while low <= high:

只要还有可供搜索的列表范围,搜索就会持续下去。当low大于high时,意味着列表中不再包含需要查看的槽位(slot)了。

mid: int = (low + high) // 2

我们用整除法和在小学就学过的简单均值公式计算中间位置mid。

if gene[mid] < key_codon:
    low = mid + 1

如果要查找的元素大于当前搜索范围的中间位置元素,我们就修改下一次循环迭代时要搜索的范围,方法是将low移到当前中间位置元素后面的位置。下面是把下一次迭代的搜索范围减半的代码。

elif gene[mid] > key_codon:
    high = mid - 1

类似地,如果要查找的元素小于中间位置元素之前,就将当前搜索范围反向减半。

else:
    return True

如果当前查找的元素既不小于也不大于中间位置元素,就表示它就是要找的元素!当然,如果循环迭代完毕,就会返回False,以表明没有找到,这里就不重现代码了。

下面可以尝试用同一份基因数据和密码子运行函数了,但必须记得先进行排序。具体代码如代码清单2-8所示。

代码清单2-8 dna_search.py(续)

my_sorted_gene: Gene = sorted(my_gene)
print(binary_contains(my_sorted_gene, acg))  # True
print(binary_contains(my_sorted_gene, gat))  # False

提示 可以用Python标准库中的bisect模块构建高性能的二分搜索方案。

2.1.4 通用示例

函数linear_contains()和binary_contains()可以广泛应用于几乎所有Python序列。以下通用版本与之前的版本几乎完全相同,只不过有一些名称和类型提示信息有一些变化。具体代码如代码清单2-9所示。

注意 代码清单2-9中有很多导入的类型。本章后续有很多通用搜索算法都将复用generic_search.py文件,这样就可以避免导入的麻烦。

注意 在往下继续之前,需要用pip install typing_extensions或pip3 install typing_extensions安装typing_extensions模块,具体命令取决于Python解释器的配置方式。这里需要通过该模块获得Protocol类型,Python后续版本的标准库中将会包含这个类型(PEP 544已有明确说明)。因此在Python的后续版本中,应该不需要再导入typing_extensions模块了,并且会用from typing import Protocol替换from typing_extensions import Protocol。

代码清单2-9 generic_search.py

from __future__ import annotations
from typing import TypeVar, Iterable, Sequence, Generic, List, Callable, Set, Deque, 
    Dict, Any, Optional
from typing_extensions import Protocol
from heapq import heappush, heappop

T = TypeVar('T')

def linear_contains(iterable: Iterable[T], key: T) -> bool:
    for item in iterable:
        if item == key:
            return True
    return False

C = TypeVar("C", bound="Comparable")

class Comparable(Protocol):
    def __eq__(self, other: Any) -> bool:
       ...
    def __lt__(self: C, other: C) -> bool:
       ...
    def __gt__(self: C, other: C) -> bool:
        return (not self < other) and self != other

    def __le__(self: C, other: C) -> bool:
        return self < other or self == other

    def __ge__(self: C, other: C) -> bool:
        return not self < other

def binary_contains(sequence: Sequence[C], key: C) -> bool:
    low: int = 0
    high: int = len(sequence) - 1
    while low <= high:  # while there is still a search space
        mid: int = (low + high) // 2
        if sequence[mid] < key:
            low = mid + 1
        elif sequence[mid] > key:
            high = mid - 1
        else:
            return True
    return False

if __name__ == "__main__":
    print(linear_contains([1, 5, 15, 15, 15, 15, 20], 5))  # True
    print(binary_contains(["a", "d", "e", "f", "z"], "f"))  # True
    print(binary_contains(["john", "mark", "ronald", "sarah"], "sheila"))  # False

现在可以尝试搜索其他类型的数据了。这些函数几乎可以复用于任何Python集合类型。这正是编写通用代码的威力。上述例子中唯一的遗憾之处就是为了满足Python类型提示的要求,必须以Comparable类的形式实现。Comparable是一种实现比较操作符(<、> =等)的类型。在后续的Python版本中,应该有一种更简洁的方式来为实现这些常见操作符的类型创建类型提示。

2.2 求解迷宫问题

寻找穿过迷宫的路径类似于计算机科学中的很多常见搜索问题,那么不妨直观地用查找迷宫路径来演示广度优先搜索、深度优先搜索和A*算法吧。

此处的迷宫将是由Cell组成的二维网格。Cell是一个包含str值的枚举,其中" "表示空白单元格,"X"表示路障单元格。还有其他一些在打印输出迷宫时供演示用的单元格。具体代码如代码清单2-10所示。

代码清单2-10 maze.py

from enum import Enum
from typing import List, NamedTuple, Callable, Optional
import random
from math import sqrt
from generic_search import dfs, bfs, node_to_path, astar, Node

class Cell(str, Enum):
    EMPTY = " "
    BLOCKED = "X"
    START = "S"
    GOAL = "G"
    PATH = "*"

这里再次用到了很多导入操作。注意,最后一个导入(from generic_search)有几个还未定义的标识符,此处是为了方便才包含进来的,但在用到之前可以先把它们注释掉。

还需要有一种表示迷宫中各个位置的方法,只要用NamedTuple即可实现,其属性表示当前位置的行和列。具体代码如代码清单2-11所示。

代码清单2-11 maze.py(续)

class MazeLocation(NamedTuple):
    row: int
    column: int

2.2.1 生成一个随机迷宫

Maze类将在内部记录一个表示其状态的网格(列表的列表)。还有表示行数、列数、起始位置和目标位置的实例变量,该网格将被随机填入一些路障单元格。

这里生成的迷宫应该相当地稀疏,以便从给定的起始位置到给定的目标位置的路径几乎总是存在(毕竟这里只是为了测试算法)。实际的稀疏度将由迷宫的调用者决定,但这里将给出默认的稀疏度为20%的路障。如果某个随机数超过了当前sparseness参数给出的阈值,就会简单地用路障单元格替换空单元格。如果对迷宫中的每个位置都执行上述操作,那么从统计学上说,整个迷宫的稀疏度将近似于给定的sparseness参数。具体代码如代码清单2-12所示。

代码清单2-12 maze.py(续)

class Maze:
    def __init__(self, rows: int = 10, columns: int = 10, sparseness: float = 0.2, start:
     MazeLocation = MazeLocation(0, 0), goal: MazeLocation = MazeLocation(9, 9)) -> None:
       # initialize basic instance variables
       self._rows: int = rows
       self._columns: int = columns
       self.start: MazeLocation = start
       self.goal: MazeLocation = goal
       # fill the grid with empty cells
       self._grid: List[List[Cell]] = [[Cell.EMPTY for c in range(columns)] 
    for r in range(rows)]
        # populate the grid with blocked cells
        self._randomly_fill(rows, columns, sparseness)
        # fill the start and goal locations in
        self._grid[start.row][start.column] = Cell.START
        self._grid[goal.row][goal.column] = Cell.GOAL

    def _randomly_fill(self, rows: int, columns: int, sparseness: float):
        for row in range(rows):
            for column in range(columns):
                if random.uniform(0, 1.0) < sparseness:
                    self._grid[row][column] = Cell.BLOCKED

现在我们有了迷宫,还需要一种把它简洁地打印到控制台的方法。输出的字符应该靠得很近,以便使该迷宫看起来像一个真实的迷宫。具体代码如代码清单2-13所示。

代码清单2-13 maze.py(续)

# return a nicely formatted version of the maze for printing
def __str__(self) -> str:
    output: str = ""
    for row in self._grid:
        output += "".join([c.value for c in row]) + "n"
    return output

然后测试一下上述这些迷宫函数。

maze: Maze = Maze()
print(maze)

2.2.2 迷宫的其他函数

若有一个函数可在搜索过程中检查我们是否已抵达目标,将会便利很多。换句话说,我们需要检查搜索已到达的某个MazeLocation是否就是目标位置。于是可以为Maze添加一个方法。具体代码如代码清单2-14所示。

代码清单2-14 maze.py(续)

def goal_test(self, ml: MazeLocation) -> bool:
    return ml == self.goal

怎样才能在迷宫内移动呢?假设我们每次可以从迷宫的给定单元格开始水平和垂直移动一格。根据此规则,successors()函数可以从给定的MazeLocation找到可能到达的下一个位置。但是,每个Maze的successors()函数都会有所差别,因为每个Maze都有不同的尺寸和路障集。因此,代码清单2-15中把successors()函数定义为Maze的方法。

代码清单2-15 maze.py(续)

def successors(self, ml: MazeLocation) -> List[MazeLocation]:
    locations: List[MazeLocation] = []
    if ml.row + 1 < self._rows and self._grid[ml.row + 1][ml.column] != Cell.BLOCKED:
        locations.append(MazeLocation(ml.row + 1, ml.column))
    if ml.row - 1 >= 0 and self._grid[ml.row - 1][ml.column] != Cell.BLOCKED:
        locations.append(MazeLocation(ml.row - 1, ml.column))
    if ml.column + 1 < self._columns and self._grid[ml.row][ml.column + 1] != Cell.BLOCKED:
        locations.append(MazeLocation(ml.row, ml.column + 1))
    if ml.column - 1 >= 0 and self._grid[ml.row][ml.column - 1] != Cell.BLOCKED:
        locations.append(MazeLocation(ml.row, ml.column - 1))
    return locations

successors()只是简单地检查Maze中MazeLocation的上方、下方、右侧和左侧,查看是否能找到从该位置过去的空白单元格。它还会避开检查Maze边缘之外的位置。每个找到的可达MazeLocation都会被放入一个列表,该列表将被最终返回给调用者。

2.2.3 深度优先搜索

深度优先搜索(depth-first search,DFS)正如其名,搜索会尽可能地深入,如果碰到障碍就会回溯到最后一次的决策位置。下面将实现一个通用的深度优先搜索算法,它可以求解上述迷宫问题,还可以给其他问题复用。图2-4演示了迷宫中正在进行的深度优先搜索。

每个程序员都应该知晓的核心搜索算法

 

图2-4 在深度优先搜索中,搜索沿着不断深入的路径前进,直至遇到障碍并必须回溯到最后的决策位置

1.栈

深度优先搜索算法依赖栈这种数据结构。(如果你已读过了第1章中有关栈的内容,完全可以跳过本小节的内容。)栈是一种按照后进先出(LIFO)原则操作的数据结构。不妨想象一叠纸,顶部的最后一张纸是从栈中取出的第一张纸。通常,栈可以基于更简单的数据结构(如列表)来实现。这里的栈将基于Python的list类型来实现。

栈一般至少应包含两种操作:

下面将实现这两个操作,以及用于检查栈中是否还存在数据项的empty属性,具体代码如代码清单2-16所示。这些关于栈的代码将会被添加到本章之前用过的generic_search.py文件中。现在我们已经完成了所有必要的导入。

代码清单2-16 generic_search.py(续)

class Stack(Generic[T]):
    def __init__(self) -> None:
        self._container: List[T] = []

    @property
    def empty(self) -> bool:
        return not self._container  # not is true for empty container

    def push(self, item: T) -> None:
        self._container.append(item)

    def pop(self) -> T:
        return self._container.pop()  # LIFO

    def __repr__(self) -> str:
        return repr(self._container)

用Python的List实现栈十分地简单,只需一直在其右端添加数据项,并且始终从其最右端移除数据项。如果List中不再包含任何数据项,则list的pop()方法将会调用失败。因此如果Stack为空,则Stack的pop()方法同样也会失败。

2.DFS算法

在开始实现DFS算法之前,还需要来点儿花絮。这里需要一个Node类,用于在搜索时记录从一种状态到另一种状态(或从一个位置到另一个位置)的过程。不妨把Node视为对状态的封装。在求解迷宫问题时,这些状态就是MazeLocation类型。Node将被称作来自其parent的状态。Node类还会包含cost和heuristic属性,并实现了__lt__()方法,因此稍后在A*算法中能够得以复用。具体代码如代码清单2-17所示。

代码清单2-17 generic_search.py(续)

class Node(Generic[T]):
    def __init__(self, state: T, parent: Optional[Node], cost: float = 0.0, heuristic: 
     float  = 0.0) -> None:
      self.state: T = state
      self.parent: Optional[Node] = parent
      self.cost: float = cost
      self.heuristic: float = heuristic

    def __lt__(self, other: Node) -> bool:
        return (self.cost + self.heuristic) < (other.cost + other.heuristic)

提示 Optional类型表示,参数化类型的值可以被变量引用,或变量可以引用None。

提示 在文件顶部,from __future__ import annotations允许Node在其方法的类型提示中引用自身。若没有这句话,就需要把类型提示放入引号作为字符串来使用(如'Node')。在以后的Python的版本中,将不必导入annotations了。要获得更多信息,请参阅PEP 563“注释的延迟评估”(Postponed Evaluation of Annotations)。

深度优先搜索过程中需要记录两种数据结构:当前要搜索的状态栈(或“位置”),这里名为frontier;已搜索的状态集,这里名为explored。只要在frontier内还有状态需要访问,DFS就将持续检查该状态是否达到目标(如果某个状态已达到目标,则DFS将停止运行并将其返回)并把将要访问的状态添加到frontier中。它还会把已搜索的每个状态都打上标记explored,使得搜索不会陷入原地循环,不会再回到先前已访问的状态。如果frontier为空,则意味着没有要搜索的地方了。具体代码如代码清单2-18所示。

代码清单2-18 generic_search.py(续)

def dfs(initial: T, goal_test: Callable[[T], bool], successors: Callable[[T], List[T]]) 
 -> Optional[Node[T]]:
    # frontier is where we've yet to go
    frontier: Stack[Node[T]] = Stack()
    frontier.push(Node(initial, None))
    # explored is where we've been
    explored: Set[T] = {initial}

    # keep going while there is more to explore
    while not frontier.empty:
        current_node: Node[T] = frontier.pop()
        current_state: T = current_node.state
        # if we found the goal, we're done
        if goal_test(current_state):
            return current_node
        # check where we can go next and haven't explored
        for child in successors(current_state):
            if child in explored:  # skip children we already explored
            continue
        explored.add(child)
        frontier.push(Node(child, current_node))
    return None  # went through everything and never found goal

如果dfs()执行成功,则返回封装了目标状态的Node。从该Node开始,利用parent属性向前遍历,即可重现由起点到目标点的路径。具体代码如代码清单2-19所示。

代码清单2-19 generic_search.py(续)

def node_to_path(node:Node[T]) -> List[T]:
    path: List[T] = [node.state]
    # work backwards from end to front
    while node.parent is not None:
        node = node.parent
        path.append(node.state)
    path.reverse()
    return path

为了便于显示,如果在迷宫上标上搜索成功的路径、起点状态和目标状态,就很有意义了。若能移除一条路径以便对同一个迷宫尝试不同的搜索算法,也是很有意义的事情。因此应该在maze.py的Maze类中添加代码清单2-20所示的两个方法。

代码清单2-20 maze.py(续)

def mark(self, path: List[MazeLocation]):
    for maze_location in path:
        self._grid[maze_location.row][maze_location.column] = Cell.PATH
    self._grid[self.start.row][self.start.column] = Cell.START
    self._grid[self.goal.row][self.goal.column] = Cell.GOAL

def clear(self, path: List[MazeLocation]):
    for maze_location in path:
        self._grid[maze_location.row][maze_location.column] = Cell.EMPTY
    self._grid[self.start.row][self.start.column] = Cell.START
    self._grid[self.goal.row][self.goal.column] = Cell.GOAL

本节内容有点多了,现在终于可以求解迷宫了。具体代码如代码清单2-21所示。

代码清单2-21 maze.py(续)

if __name__ == "__main__":
    # Test DFS
    m: Maze = Maze()
    print(m)
    solution1: Optional[Node[MazeLocation]] = dfs(m.start, m.goal_test, m.successors)
    if solution1 is None:
        print("No solution found using depth-first search!")
    else:
        path1: List[MazeLocation] = node_to_path(solution1)
        m.mark(path1)
        print(m)
        m.clear(path1)

成功的解将类似于如下形式:

S****X X
 X  *****
       X*
 XX******X
  X*
  X**X
 X  *****
         *
      X  *X
         *G

星号代表深度优先搜索函数找到的路径,从起点至目标点。请记住,因为每一个迷宫都是随机生成的,所以并非每个迷宫都有解。

2.2.4 广度优先搜索

或许大家会注意到,用深度优先遍历找到的迷宫路径似乎有点儿不尽如人意,通常它们不是最短路径。广度优先搜索(breadth-first search,BFS)则总是会查找最短路径,它从起始状态开始由近到远,在搜索时的每次迭代中依次查找每一层节点。针对某些问题,深度优先搜索可能会比广度优先搜索更快找到解,反之亦然。因此,要在两者之间进行选择,有时就是在可能快速求解与确定找到最短路径(如果存在)之间做出权衡。图2-5演示了迷宫中正在进行的广度优先搜索。

每个程序员都应该知晓的核心搜索算法

 

图2-5 在广度优先搜索过程中,离起点位置最近的元素最先被搜索

深度优先搜索有时会比广度优先搜索更快地返回结果,要想理解其中的原因,不妨想象一下在洋葱的一层皮上寻找标记。采用深度优先策略的搜索者可以把小刀插入洋葱的中心并随意检查切出的块。如果标记所在的层刚好邻近切出的块,那么就有可能比采用广度优先策略的搜索者更快地找到它,广度优先策略的搜索者会费劲地每次“剥一层洋葱皮”。

为了更好地理解为什么广度优先搜索始终都会找出最短路径(如果存在的话),可以考虑一下要找到波士顿和纽约之间停靠车站数最少的火车路径。如果不断朝同一个方向前进并在遇到死路时进行回溯(如同深度优先搜索),就有可能在回到纽约之前先找到一条通往西雅图的路线。但在广度优先搜索时,首先会检查距离波士顿1站路的所有车站,然后检查距离波士顿2站路的所有车站,再检查距离波士顿3站路的所有车站,一直持续至找到纽约为止。因此在找到纽约时,就能知道已找到了车站数最少的路线,因为离波士顿较近的所有车站都已经查看过了,且其中没有纽约。

1.队列

实现BFS需要用到名为队列(queue)的数据结构。栈是LIFO,而队列是FIFO(先进先出)。队列就像是排队使用洗手间的一队人。第一个进入队列的人可以先进入洗手间。队列至少具有与栈类似的push()方法和pop()方法。实际上,Queue的实现(底层由Python的deque支持)几乎与Stack的实现完全相同,只有一点儿变化,即从_container的左端而不是右端移除元素,并用deque替换了list。这里用“左”来代表底层存储结构的起始位置。左端的元素是在deque中停留时间最长的元素(依到达时间而定),所以它们是首先弹出的元素。具体代码如代码清单2-22所示。

代码清单2-22 generic_search.py(续)

class Queue(Generic[T]):
    def __init__(self) -> None:
         self._container: Deque[T] = Deque()

    @property
    def empty(self) -> bool:
        return not self._container  # not is true for empty container

    def push(self, item: T) -> None:
        self._container.append(item)

    def pop(self) -> T:
        return self._container.popleft()  # FIFO

    def __repr__(self) -> str:
        return repr(self._container)

提示 为什么Queue的实现要用deque作为其底层存储结构,而Stack的实现则使用list作为其底层存储结构呢?这与弹出数据的位置有关。在栈中,是右侧压入右侧弹出。在队列中,也是右侧压入,但是从左侧弹出。Python的list数据结构从右侧弹出的效率较高,但从左侧弹出则不然。deque则从两侧都能够高效地弹出数据。因此,在deque上有一个名为popleft()的内置方法,但在list上则没有与其等效的方法。当然可以找到其他方法来用list作为队列的底层存储结构,但效率会比较低。在deque上从左侧弹出数据的操作复杂度为O(1),而在list上则为O(n)。在用list的时候,从左侧弹出数据后,每个后续元素都必须向左移动一个位置,效率也就降低了。

2.BFS算法

广度优先搜索的算法与深度优先搜索的算法惊人地一致,只是frontier由栈变成了队列。把frontier由栈改为队列会改变对状态进行搜索的顺序,并确保离起点状态最近的状态最先被搜索到。具体代码如代码清单2-23所示。

代码清单2-23 generic_search.py(续)

def bfs(initial: T, goal_test: Callable[[T], bool], successors: Callable[[T], List[T]]) 
 -> Optional[Node[T]]:
    # frontier is where we've yet to go
    frontier: Queue[Node[T]] = Queue()
    frontier.push(Node(initial, None))
    # explored is where we've been
    explored: Set[T] = {initial}

    # keep going while there is more to explore
    while not frontier.empty:
        current_node: Node[T] = frontier.pop()
        current_state: T = current_node.state
        # if we found the goal, we're done
        if goal_test(current_state):
            return current_node
        # check where we can go next and haven't explored
        for child in successors(current_state):
            if child in explored:  # skip children we already explored
                continue
            explored.add(child)
            frontier.push(Node(child, current_node))
    return None  # went through everything and never found goal

运行一下bfs(),就会看到它总会找到当前迷宫的最短路径。在if __name__ == "__main__"部分中,在之前的测试代码后面加入代码清单2-24所示的语句,以便能对同一个迷宫对比两种算法的结果。

代码清单2-24 maze.py(续)

# Test BFS
solution2: Optional[Node[MazeLocation]] = bfs(m.start, m.goal_test, m.successors)
if solution2 is None:
    print("No solution found using breadth-first search!")
else:
    path2: List[MazeLocation] = node_to_path(solution2)
    m.mark(path2)
    print(m)
    m.clear(path2)

令人惊讶的是,算法可以保持不变,只需修改其访问的数据结构即可得到完全不同的结果。以下是在之前名为dfs()的同一个迷宫上调用bfs()的结果。请注意,用星号标记出来的从起点到目标点的路径比上一个示例中的路径更为直接。

S    X X
*X
*      X
*XX      X
* X
* X  X
*X
*
*     X   X
*********G

2.2.5 A*搜索

给洋葱层层剥皮可能会非常耗时,广度优先搜索正是如此。和BFS一样,A*搜索旨在找到从起点状态到目标状态的最短路径。与以上BFS的实现不同,A*搜索将结合运用代价函数和启发函数,把搜索集中到最有可能快速抵达目标的路径上。

代价函数g(n) 会检查抵达指定状态的成本。在求解迷宫的场景中,成本是指之前已经走过多少步才到达当前状态。启发式信息计算函数h(n) 则给出了从当前状态到目标状态的成本估算。可以证明,如果h(n)是一个可接受的启发式信息(admissible heuristic),那么找到的最终路径将是最优解。可接受的启发式信息永远不会高估抵达目标的成本。在二维平面上,直线距离启发式信息就是一个例子,因为直线总是最短的路径[1]。

到达任一状态所需的总成本为f(n),它只是合并了g(n)和h(n)而已。实际上,f(n) = g(n) + h(n)。当从frontier选取要探索的下一个状态时,A*搜索将选择f(n)最低的状态,这正是它与BFS和DFS的区别。

1.优先队列

为了在frontier上选出f(n)最低的状态,A*搜索用优先队列(priority queue)作为存储frontier的数据结构。优先队列能使其数据元素维持某种内部顺序,以便使首先弹出的元素始终是优先级最高的元素。在本例中,优先级最高的数据项是f(n)最低的那个。通常这意味着内部将会采用二叉堆,使得压入和弹出操作的复杂度均为O(lg n)。

Python的标准库中包含了heappush()函数和heappop()函数,这些函数将读取一个列表并将其维护为二叉堆。用这些标准库函数构建一个很薄的封装器,即可实现一个优先队列。PriorityQueue类将与Stack类和Queue类很相似,只是修改了push()方法和pop()方法,以便可以利用heappush()和heappop()。具体代码如代码清单2-25所示。

代码清单2-25 generic_search.py(续)

class PriorityQueue(Generic[T]):
    def __init__(self) -> None:
        self._container: List[T] = []

    @property
    def empty(self) -> bool:
        return not self._container  # not is true for empty container

    def push(self, item: T) -> None:
        heappush(self._container, item)  # in by priority

    def pop(self) -> T:
        return heappop(self._container)  # out by priority

    def __repr__(self) -> str:
        return repr(self._container)

为了确定某元素相对于其他同类元素的优先级,heappush()和heappop()用“<”操作符进行比较。这就是之前需要在Node上实现__lt__()的原因。通过查看相应的f(n)即可对Node进行相互比较,f(n)只是简单地把cost属性和heuristic属性相加而已。

2.启发式信息

启发式信息(heuristics)是对问题解决方式的一种直觉[2]。在求解迷宫问题时,启发式信息旨在选取下一次搜索的最佳迷宫位置,最终是为了抵达目标。换句话说,这是一种有根据的猜测,猜测frontier上的哪些节点最接近目标位置。如前所述,如果A*搜索采用的启发式信息能够生成相对准确的结果且为可接受的(永远不会高估距离),那么A*将会得出最短路径。追求更短距离的启发式信息最终会导致搜索更多的状态,而追求接近实际距离(但不会高估,以免不可接受)的启发式信息搜索的状态会比较少。因此,理想的启发式信息应该是尽可能接近真实距离,而不会过分高估。

3.欧氏距离

在几何学中,两点之间的最短路径就是直线。因此在求解迷宫问题时,直线启发式信息总是可接受的,这很有道理。由毕达哥拉斯定理推导出来的欧氏距离(Euclidean distance)表明:

每个程序员都应该知晓的核心搜索算法

 

。对本节的迷宫问题而言,x的差相当于两个迷宫位置的列的差, y的差相当于行的差。请注意,要回到maze.py中去实现本函数。具体代码如代码清单2-26所示。

代码清单2-26 maze.py(续)

def euclidean_distance(goal: MazeLocation) -> Callable[[MazeLocation], float]:
    def distance(ml: MazeLocation) -> float:
        xdist: int = ml.column - goal.column
        ydist: int = ml.row - goal.row
        return sqrt((xdist * xdist) + (ydist * ydist))
    return distance

euclidean_distance()函数将返回另一个函数。类似Python这种支持把函数视为“一等公民”的编程语言,能够支持这种有趣的做法。distance()将获取(capture)euclidean_distance()传入的goal MazeLocation,“ 获取”的意思是每次调用distance()时,distance()都可以引用此变量(持久性)。返回的函数用到了goal进行计算。这种做法可以创建参数较少的函数。返回的distance()函数只用迷宫起始位置作为参数,并持久地“看到”goal。

图2-6演示了网格环境(如同曼哈顿的街道)下的欧氏距离。

每个程序员都应该知晓的核心搜索算法

 

图2-6 欧氏距离是从起点到目标点的直线距离

4.曼哈顿距离

欧氏距离非常有用,但对于此处的问题(只能朝4个方向中的一个方向移动的迷宫),我们还可以处理得更好。曼哈顿距离(Manhattan distance)源自在曼哈顿的街道上行走,曼哈顿是纽约最著名的行政区,以网格模式分布。要从曼哈顿的任一地点到另一地点,需要走过一定数量的横向街区和纵向街区(曼哈顿几乎不存在对角线的街道)。所谓曼哈顿距离,其实就是获得两个迷宫位置之间的行数差,并将其与列数差相加而得到。图2-7演示了曼哈顿距离。

每个程序员都应该知晓的核心搜索算法

 

图2-7 曼哈顿距离不涉及对角线。路径必须沿着水平线或垂直线前进

具体代码如代码清单2-27所示。

代码清单2-27 maze.py(续)

def manhattan_distance(goal: MazeLocation) -> Callable[[MazeLocation], float]:
    def distance(ml: MazeLocation) -> float:
        xdist: int = abs(ml.column - goal.column)
        ydist: int = abs(ml.row - goal.row)
        return (xdist + ydist)
    return distance

因为这种启发式信息能够更准确地契合在迷宫中移动的实际情况(沿垂直和水平方向移动而不是沿对角线移动),所以它比欧氏距离更为接近从迷宫某位置到目标位置的实际距离。因此,如果A*搜索与曼哈顿距离组合使用,其遍历的迷宫状态就会比A*搜索与欧氏距离的组合要遍历的迷宫状态要少一些。结果路径仍会是最优解,因为对于在其中仅允许朝4种方向移动的迷宫而言,曼哈顿距离是可接受的(永远不会高估距离)。

5.A*算法

从BFS转为A*搜索,需要进行一些小的改动。第1处改动是把frontier从队列改为优先队列,这样frontier就会弹出f(n)最低的节点。第2处改动是把已探索的状态集改为字典类型。用了字典将能跟踪记录每一个可能被访问节点的最低成本(g(n))。用了启发函数后,如果启发计算结果不一致,则某些节点可能会被访问两次。如果在新的方向上找到节点的成本比按之前的路线访问的成本要低,我们将会采用新的路线。

为简单起见,函数astar()没有把代价函数用作参数,而只是把在迷宫中的每一跳的成本简单地视为1。每个新Node都被赋予了由此简单公式算出的成本值,以及由作为参数传给搜索函数heuristic()的新函数计算出来的启发分值。除这些改动之外,astar()与bfs()极其相似,不妨将它们放在一起做一个比较。具体代码如代码清单2-28所示。

代码清单2-28 generic_search.py(续)

def astar(initial: T, goal_test: Callable[[T], bool], successors: Callable[[T],
 List[T]], heuristic: Callable[[T], float]) -> Optional[Node[T]]:
    # frontier is where we've yet to go
    frontier: PriorityQueue[Node[T]] = PriorityQueue()
    frontier.push(Node(initial, None, 0.0, heuristic(initial)))
    # explored is where we've been
    explored: Dict[T, float] = {initial: 0.0}

    # keep going while there is more to explore
    while not frontier.empty:
        current_node: Node[T] = frontier.pop()
        current_state: T = current_node.state
        # if we found the goal, we're done
        if goal_test(current_state):
            return current_node
        # check where we can go next and haven't explored
        for child in successors(current_state):
            # 1 assumes a grid, need a cost function for more sophisticated apps
            new_cost: float = current_node.cost + 1  
            if child not in explored or explored[child] > new_cost:
                explored[child] = new_cost
                frontier.push(Node(child, current_node, new_cost, heuristic(child)))
    return None  # went through everything and never found goal

恭喜。到这里为止,不仅迷宫问题的解法介绍完毕,还介绍了一些可供多种不同搜索应用程序使用的通用搜索函数。DFS和BFS适用于小型的数据集和状态空间,这种情况下的性能并没那么重要。在某些情况下,DFS的性能会优于BFS,但BFS的优势是始终能提供最佳的路径。有趣的是,BFS和DFS的实现代码是一样的,差别仅仅是不用栈而用了队列。稍微复杂一点的A*搜索算法会与高质量、保证一致性、可接受的启发式信息组合,不仅可以提供最佳路径,而且性能也远远优于BFS。因为这3个函数都是以通用方式实现的,所以只需要一句import generic_search即可让几乎所有搜索空间都能使用它们。

下面用maze.py测试部分中的同一个迷宫对astar()进行测试,具体代码如代码清单2-29所示。

代码清单2-29 maze.py(续)

# Test A*
distance: Callable[[MazeLocation], float] = manhattan_distance(m.goal)
solution3: Optional[Node[MazeLocation]] = astar(m.start, m.goal_test, m.successors,
 distance)
if solution3 is None:
    print("No solution found using A*!")
else:
    path3: List[MazeLocation] = node_to_path(solution3)
    m.mark(path3)
    print(m)

有趣的是,即便bfs()和astar()都找到了最佳路径(长度相等),以下的astar()输出与bfs()也略有不同。因为有了启发式信息,astar()立即由对角线走向了目标位置。最终它搜索的状态将比bfs()更少,从而拥有更高的性能。如果想自行证明这一点,请为每个状态添加计数器。

S**  X X
 X**
   *   X  
 XX*     X
  X*      
  X**X    
 X  ****  
       *  
     X * X
       **G

本文摘自《算法精粹:经典计算机科学问题的Python实现》

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>