Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lo gate cuts optimizer #470

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
"""File containing the classes required to implement Dijkstra's (best-first) search algorithm."""
import heapq
import numpy as np
from itertools import count


class BestFirstPriorityQueue:
"""Class that implements priority queues for best-first search.

The tuples that are pushed onto the priority queues have the form:

(<cost>, <neg_search_depth>, <rand_num>, <seq_count>, <search_state>)

<cost> (numeric or tuple) is a numeric cost or tuple of numeric
lexically-ordered costs that are to be minimized.

<neg_search_depth> (int) is the negative of the search depth of the
search state represented by the tuple. Thus, if several search states
have identical costs, priority is given to the deepest states to
encourage depth-first behavior.

<rand_num> is a pseudo-random number that randomly break ties in a
stable manner if several search states have identical costs at identical
search depths.

<seq_count> is a sequential count of push operations that is used to
break further ties just in case two states with the same costs and
depths are somehow assigned the same pseudo-random numbers.

<search_state> is a state object generated by the optimization process.
Because of the design of the tuple entries that precede it, state objects
never get evaluated in the heap-managment comparisons that are performed
internally by the priority-queue implementation.

Member Variables:

rand_gen is a Numpy random number generator.

unique is a Python sequence counter.

pqueue is a Python priority queue (currently heapq, with plans to move to
queue.PriorityQueue if parallelization is ultimately required).
"""

def __init__(self, rand_seed):
"""A BestFirstPriorityQueue object must be initialized with a specification
of a random seed (int) for the pseudo-random number generator.
If None is used as the random seed, then a seed is
obtained using an operating-system call to achieve a randomized
initialization.
"""
self.rand_gen = np.random.default_rng(rand_seed)
self.unique = count()
self.pqueue = list() # queue.PriorityQueue()

def put(self, state, depth, cost):
"""Push state onto the priority queue. The search depth and cost
of the state must also be provided as input.
"""

heapq.heappush(
self.pqueue,
(cost, (-depth), self.rand_gen.random(), next(self.unique), state),
)

def get(self):
"""Pop and return the lowest cost state currently on the
queue, along with the search depth of that state and its cost.
None, None, None is returned if the priority queue is empty.
"""
if self.qsize() == 0:
return None, None, None

best = heapq.heappop(self.pqueue) # self.pqueue.get()

return best[-1], (-best[1]), best[0]

def qsize(self):
"""Return the size of the priority queue."""
return len(self.pqueue) # self.pqueue.qsize()

def clear(self):
"""Clear all entries in the priority queue."""
self.pqueue.clear()


class BestFirstSearch:

"""Class that implements best-first search. The search proceeds by
choosing the deepest, lowest-cost state in the search frontier and
generating next states. Successive calls to the optimizationPass()
method will resume the search at the next deepest, lowest-cost state
in the search frontier. The costs of goal states that are returned
are used to constrain subsequent searches. None is returned if no
(additional) feasible solutions can be found, or when no (additional)
solutions can be found without exceeding the lowest upper-bound cost
across the goal states previously returned.

Member Variables:

rand_seed (int) is the seed to use when initializing Numpy random number
generators in the bounded best-first priority-queue objects.

cost_func (lambda state, *args) is a function that computes cost values
from search states. Input arguments to the optimizationPass() method are
also passed to the cost_func. The cost returned can be numeric or tuples
of numerics. In the latter case, lexicographical comparisons are
performed per Python semantics.

next_state_func (lambda state, *args) is a function that returns a list
of next states generated from the input state. Input arguments to the
optimizationPass() method are also passed to the next_state_func.

goal_state_func (lambda state, *args) is a function that returns True if
the input state is a solution state of the search. Input arguments to the
optimizationPass() method are also passed to the goal_state_func.

upperbound_cost_func (lambda goal_state, *args) can either be None or a
function that returns an upper bound to the optimal cost given a goal_state
as input. The upper bound is used to prune next-states from the search in
subsequent calls to the optimizationPass() method. If upperbound_cost_func
is None, the cost of the goal_state as determined by cost_func is used as
an upper bound to the optimal cost. Input arguments to the
optimizationPass() method are also passed to the upperbound_cost_func.

mincost_bound_func (lambda *args) can either be None or a function that
returns a cost bound that is compared to the minimum cost across all
vertices in a search frontier. If the minimum cost exceeds the min-cost
bound, the search is terminated even if a goal state has not yet been found.
Returning None is equivalent to returning an infinite min-cost bound. A
mincost_bound_func that is None is likewise equivalent to an infinite
min-cost bound.

stop_at_first_min (Boolean) is a flag that indicates whether or not to
stop the search after the first minimum-cost goal state has been reached.

max_backjumps (int or None) is the maximum number of backjump operations that
can be performed before the search is forced to terminate. None indicates
that no restriction is placed in the number of backjump operations.

pqueue (BestFirstPriorityQueue) is a best-first priority-queue object.

upperbound_cost (numeric or tuple) is the cost bound obtained by applying
the upperbound_cost_func to the goal states that are encountered.

mincost_bound (numeric or tuple) is the cost bound imposed on the minimum
cost across all vertices in the search frontier. The search is forced to
terminate when the minimum cost exceeds this cost bound.

minimum_reached (Boolean) is a flag that indicates whether or not the
first minimum-cost goal state has been reached.

num_states_visited (int) is the number of states that have been dequeued
and processed in the search.

num_next_states (int) is the number of next-states generated from the
states visited.

num_enqueues (int) is the number of next-states pushed onto the search
priority queue after cost pruning.

num_backjumps (int) is the number of times a backjump operation is
performed. In the case of best-first search, a backjump occurs when the
depth of the lowest-cost state in the search frontier is less than or
equal to the depth of the previous lowest-cost state.
"""

def __init__(
self, optimization_settings, search_functions, stop_at_first_min=False
):
"""A BestFirstSearch object must be initialized with a list of
initial states, a random seed for the numpy pseudo-random number
generators that are used to break ties, together with an object
that holds the various functions that are used by the search
engine to generate and explore the search space. A Boolean flag
can optionally be provided to indicate whether to stop the search
after the first minimum-cost goal state has been reached (True),
or whether subsequent calls to the optimizationPass() method should
return any additional minimum-cost goal states that might exist
(False). The default is not to stop at the first minimum. A limit
on the maximum number of backjumps can also be optionally provided
to terminate the search if the number of backjumps exceeds the
specified limit without finding the (next) optimal goal state.
"""

self.rand_seed = optimization_settings.getRandSeed()
self.cost_func = search_functions.cost_func
self.next_state_func = search_functions.next_state_func
self.goal_state_func = search_functions.goal_state_func
self.upperbound_cost_func = search_functions.upperbound_cost_func
self.mincost_bound_func = search_functions.mincost_bound_func

self.stop_at_first_min = stop_at_first_min
self.max_backjumps = optimization_settings.getMaxBackJumps()

self.pqueue = BestFirstPriorityQueue(self.rand_seed)

self.upperbound_cost = None
self.mincost_bound = None
self.minimum_reached = False
self.num_states_visited = 0
self.num_next_states = 0
self.num_enqueues = 0
self.num_backjumps = 0
self.penultimate_stats = None

def initialize(self, initial_state_list, *args):
self.pqueue.clear()

self.upperbound_cost = None
self.mincost_bound = None
self.minimum_reached = False
self.num_states_visited = 0
self.num_next_states = 0
self.num_enqueues = 0
self.num_backjumps = 0
self.penultimate_stats = self.getStats()

self.put(initial_state_list, 0, args)

def optimizationPass(self, *args):
"""Perform best-first search until either a goal state is found and
returned, or cost-bounds are reached or no further goal states can be
found, in which case None is returned. The cost of the returned state
is also returned. Any input arguments to optimizationPass() are passed
along to the search-space functions employed.
"""

if self.mincost_bound_func is not None:
self.mincost_bound = self.mincost_bound_func(*args)

prev_depth = None

while (
self.pqueue.qsize() > 0
and (not self.stop_at_first_min or not self.minimum_reached)
and (self.max_backjumps is None or self.num_backjumps < self.max_backjumps)
):
state, depth, cost = self.pqueue.get()

self.updateMinimumReached(cost)

if cost is None or self.costBoundsExceeded(cost, args):
return None, None

self.num_states_visited += 1

if prev_depth is not None and depth <= prev_depth:
self.num_backjumps += 1

prev_depth = depth

if self.goal_state_func(state, *args):
self.penultimate_stats = self.getStats()
self.updateUpperBoundGoalState(state, *args)
self.updateMinimumReached(cost)

return state, cost

next_state_list = self.next_state_func(state, *args)
self.put(next_state_list, depth + 1, args)

# If all states have been explored, then the minimum has been reached
if self.pqueue.qsize() == 0:
self.minimum_reached = True

return None, None

def minimumReached(self):
"""Return True if the optimization reached a global minimum."""

return self.minimum_reached

def getStats(self, penultimate=False):
"""Return a Numpy array containing the number of states visited
(dequeued), the number of next-states generated, the number of
next-states that are enqueued after cost pruning, and the number
of backjumps performed. Numpy arrays are employed to facilitate
the aggregation of search statisitcs.
"""

if penultimate:
return self.penultimate_stats

return np.array(
(
self.num_states_visited,
self.num_next_states,
self.num_enqueues,
self.num_backjumps,
),
dtype=int,
)

def getUpperBoundCost(self):
"""Return the current upperbound cost"""

return self.upperbound_cost

def updateUpperBoundCost(self, cost_bound):
"""Update the cost upper bound based on an
input cost bound.
"""

if cost_bound is not None and (
self.upperbound_cost is None or cost_bound < self.upperbound_cost
):
self.upperbound_cost = cost_bound

def updateUpperBoundGoalState(self, goal_state, *args):
"""Update the cost upper bound based on a
goal state reached in the search.
"""

if self.upperbound_cost_func is not None:
bound = self.upperbound_cost_func(goal_state, *args)
else:
bound = self.cost_func(goal_state, *args)

if self.upperbound_cost is None or bound < self.upperbound_cost:
self.upperbound_cost = bound

def put(self, state_list, depth, args):
"""Push a list of (next) states onto the
best-first priority queue.
"""

self.num_next_states += len(state_list)

for state in state_list:
cost = self.cost_func(state, *args)

if self.upperbound_cost is None or cost <= self.upperbound_cost:
self.pqueue.put(state, depth, cost)
self.num_enqueues += 1

def updateMinimumReached(self, min_cost):
"""Update the minimum_reached flag indicating
that a global optimum has been reached.
"""
if min_cost is None or (
self.upperbound_cost is not None and self.upperbound_cost <= min_cost
):
self.minimum_reached = True

return self.minimum_reached

def costBoundsExceeded(self, cost, args):
"""Return True if any cost bounds
have been exceeded.
"""

return cost is not None and (
(self.mincost_bound is not None and cost > self.mincost_bound)
or (self.upperbound_cost is not None and cost > self.upperbound_cost)
)
Loading
Loading