diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..497f565 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,632 @@ +[MAIN] + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint +# in a server-like mode. +clear-cache-post-run=no + +# Load and enable all available extensions. Use --list-extensions to see a list +# all available extensions. +#enable-all-extensions= + +# In error mode, messages with a category besides ERROR or FATAL are +# suppressed, and no reports are done by default. Error mode is compatible with +# disabling specific errors. +#errors-only= + +# Always return a 0 (non-error) status code, even if lint errors are found. +# This is primarily useful in continuous integration scripts. +#exit-zero= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +extension-pkg-allow-list= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. (This is an alternative name to extension-pkg-allow-list +# for backward compatibility.) +extension-pkg-whitelist= + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +fail-on= + +# Specify a score threshold under which the program will exit with error. +fail-under=10 + +# Interpret the stdin as a python script, whose filename needs to be passed as +# the module_or_package argument. +#from-stdin= + +# Files or directories to be skipped. They should be base names, not paths. +ignore=CVS + +# Add files or directories matching the regular expressions patterns to the +# ignore-list. The regex matches against paths and can be in Posix or Windows +# format. Because '\\' represents the directory delimiter on Windows systems, +# it can't be used as an escape character. +ignore-paths=.venv,tests + +# Files or directories matching the regular expression patterns are skipped. +# The regex matches against base names, not paths. The default value ignores +# Emacs file locks +ignore-patterns=^\.# + +# List of module names for which member attributes should not be checked +# (useful for modules/projects where namespaces are manipulated during runtime +# and thus existing member attributes cannot be deduced by static analysis). It +# supports qualified module names, as well as Unix pattern matching. +ignored-modules= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use, and will cap the count on Windows to +# avoid hangs. +jobs=1 + +# Control the amount of potential inferred values when inferring a single +# object. This can help the performance when dealing with large functions or +# complex, nested conditions. +limit-inference-results=100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +load-plugins= + +# Pickle collected data for later comparisons. +persistent=yes + +# Minimum Python version to use for version dependent checks. Will default to +# the version used to run pylint. +py-version=3.10 + +# Discover python modules and packages in the file system subtree. +recursive=yes + +# Add paths to the list of the source roots. Supports globbing patterns. The +# source root is an absolute path or a path relative to the current working +# directory used to determine a package namespace for modules located under the +# source root. +source-roots=data,src + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# In verbose mode, extra non-checker-related info will be displayed. +#verbose= + + +[BASIC] + +# Naming style matching correct argument names. +argument-naming-style=snake_case + +# Regular expression matching correct argument names. Overrides argument- +# naming-style. If left empty, argument names will be checked with the set +# naming style. +#argument-rgx= + +# Naming style matching correct attribute names. +attr-naming-style=snake_case + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. If left empty, attribute names will be checked with the set naming +# style. +#attr-rgx= + +# Bad variable names which should always be refused, separated by a comma. +bad-names=foo, + bar, + baz, + toto, + tutu, + tata + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +bad-names-rgxs= + +# Naming style matching correct class attribute names. +class-attribute-naming-style=any + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. If left empty, class attribute names will be checked +# with the set naming style. +#class-attribute-rgx= + +# Naming style matching correct class constant names. +class-const-naming-style=UPPER_CASE + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. If left empty, class constant names will be checked with +# the set naming style. +#class-const-rgx= + +# Naming style matching correct class names. +class-naming-style=PascalCase + +# Regular expression matching correct class names. Overrides class-naming- +# style. If left empty, class names will be checked with the set naming style. +#class-rgx= + +# Naming style matching correct constant names. +const-naming-style=UPPER_CASE + +# Regular expression matching correct constant names. Overrides const-naming- +# style. If left empty, constant names will be checked with the set naming +# style. +#const-rgx= + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# Naming style matching correct function names. +function-naming-style=snake_case + +# Regular expression matching correct function names. Overrides function- +# naming-style. If left empty, function names will be checked with the set +# naming style. +#function-rgx= + +# Good variable names which should always be accepted, separated by a comma. +good-names=i, + j, + k, + ex, + Run, + _ + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +good-names-rgxs= + +# Include a hint for the correct naming format with invalid-name. +include-naming-hint=no + +# Naming style matching correct inline iteration names. +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. If left empty, inline iteration names will be checked +# with the set naming style. +#inlinevar-rgx= + +# Naming style matching correct method names. +method-naming-style=snake_case + +# Regular expression matching correct method names. Overrides method-naming- +# style. If left empty, method names will be checked with the set naming style. +#method-rgx= + +# Naming style matching correct module names. +module-naming-style=snake_case + +# Regular expression matching correct module names. Overrides module-naming- +# style. If left empty, module names will be checked with the set naming style. +#module-rgx= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Regular expression which should only match function or class names that do +# not require a docstring. +no-docstring-rgx=^_ + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. +# These decorators are taken in consideration only for invalid-name. +property-classes=abc.abstractproperty + +# Regular expression matching correct type alias names. If left empty, type +# alias names will be checked with the set naming style. +#typealias-rgx= + +# Regular expression matching correct type variable names. If left empty, type +# variable names will be checked with the set naming style. +#typevar-rgx= + +# Naming style matching correct variable names. +variable-naming-style=snake_case + +# Regular expression matching correct variable names. Overrides variable- +# naming-style. If left empty, variable names will be checked with the set +# naming style. +#variable-rgx= + + +[CLASSES] + +# Warn about protected attribute access inside special methods +check-protected-access-in-special-methods=no + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__, + __new__, + setUp, + asyncSetUp, + __post_init__ + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make,os._exit + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + + +[DESIGN] + +# List of regular expressions of class ancestor names to ignore when counting +# public methods (see R0903) +exclude-too-few-public-methods= + +# List of qualified class names to ignore when counting class parents (see +# R0901) +ignored-parents= + +# Maximum number of arguments for function / method. +max-args=5 + +# Maximum number of attributes for a class (see R0902). +max-attributes=7 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr=5 + +# Maximum number of branch for function / method body. +max-branches=12 + +# Maximum number of locals for function / method body. +max-locals=15 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=20 + +# Maximum number of return / yield for function / method body. +max-returns=6 + +# Maximum number of statements in function / method body. +max-statements=50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=2 + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when caught. +overgeneral-exceptions=builtins.BaseException,builtins.Exception + + +[FORMAT] + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Maximum number of characters on a single line. +max-line-length=100 + +# Maximum number of lines in a module. +max-module-lines=1000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + + +[IMPORTS] + +# List of modules that can be imported at any level, not just the top level +# one. +allow-any-import-level= + +# Allow explicit reexports by alias from a package __init__. +allow-reexport-from-package=no + +# Allow wildcard imports from modules that define __all__. +allow-wildcard-with-all=no + +# Deprecated modules which should not be used, separated by a comma. +deprecated-modules= + +# Output a graph (.gv or any supported image format) of external dependencies +# to the given file (report RP0402 must not be disabled). +ext-import-graph= + +# Output a graph (.gv or any supported image format) of all (i.e. internal and +# external) dependencies to the given file (report RP0402 must not be +# disabled). +import-graph= + +# Output a graph (.gv or any supported image format) of internal dependencies +# to the given file (report RP0402 must not be disabled). +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + +# Couples of modules and preferred modules, separated by a comma. +preferred-modules= + + +[LOGGING] + +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style=old + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules=logging + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, +# UNDEFINED. +confidence=HIGH, + CONTROL_FLOW, + INFERENCE, + INFERENCE_FAILURE, + UNDEFINED + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once). You can also use "--disable=all" to +# disable everything first and then re-enable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +disable=raw-checker-failed, + bad-inline-option, + locally-disabled, + file-ignored, + suppressed-message, + useless-suppression, + deprecated-pragma, + use-symbolic-message-instead, + fixme + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable=c-extension-no-member + + +[METHOD_ARGS] + +# List of qualified names (i.e., library.method) which require a timeout +# parameter e.g. 'requests.api.get,requests.api.post' +timeout-methods=requests.api.delete,requests.api.get,requests.api.head,requests.api.options,requests.api.patch,requests.api.post,requests.api.put,requests.api.request + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME, + XXX, + TODO + +# Regular expression of note tags to take in consideration. +notes-rgx= + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + +# Complete name of functions that never returns. When checking for +# inconsistent-return-statements if a never returning function is called then +# it will be considered as an explicit return statement and no message will be +# printed. +never-returning-functions=sys.exit,argparse.parse_error + + +[REPORTS] + +# Python expression which should return a score less than or equal to 10. You +# have access to the variables 'fatal', 'error', 'warning', 'refactor', +# 'convention', and 'info' which contain the number of messages in each +# category, as well as 'statement' which is the total number of statements +# analyzed. This score is used by the global evaluation report (RP0004). +evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +msg-template= + +# Set the output format. Available formats are text, parseable, colorized, json +# and msvs (visual studio). You can also give a reporter class, e.g. +# mypackage.mymodule.MyReporterClass. +#output-format= + +# Tells whether to display a full report or only the messages. +reports=no + +# Activate the evaluation score. +score=yes + + +[SIMILARITIES] + +# Comments are removed from the similarity computation +ignore-comments=yes + +# Docstrings are removed from the similarity computation +ignore-docstrings=yes + +# Imports are removed from the similarity computation +ignore-imports=yes + +# Signatures are removed from the similarity computation +ignore-signatures=no + +# Minimum lines number of a similarity. +min-similarity-lines=4 + + +[SPELLING] + +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions=4 + +# Spelling dictionary name. No available dictionaries : You need to install +# both the python package and the system dependency for enchant to work.. +spelling-dict= + +# List of comma separated words that should be considered directives if they +# appear at the beginning of a comment and should not be checked. +spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy: + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# A path to a file that contains the private dictionary; one word per line. +spelling-private-dict-file= + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +spelling-store-unknown-words=no + + +[STRING] + +# This flag controls whether inconsistent-quotes generates a warning when the +# character used as a quote delimiter is used inconsistently within a module. +check-quote-consistency=no + +# This flag controls whether the implicit-str-concat should generate a warning +# on implicit string concatenation in sequences defined over several lines. +check-str-concat-over-line-jumps=no + + +[TYPECHECK] + +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators=contextlib.contextmanager + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members= + +# Tells whether to warn about missing members when the owner of the attribute +# is inferred to be None. +ignore-none=yes + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=yes + +# List of symbolic message names to ignore for Mixin members. +ignored-checks-for-mixins=no-member, + not-async-context-manager, + not-context-manager, + attribute-defined-outside-init + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=optparse.Values,thread._local,_thread._local,argparse.Namespace + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + +# Regex pattern to define which classes are considered mixins. +mixin-class-rgx=.*[Mm]ixin + +# List of decorators that change the signature of a decorated function. +signature-mutators= + + +[VARIABLES] + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid defining new builtins when possible. +additional-builtins= + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=yes + +# List of names allowed to shadow builtins +allowed-redefined-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_, + _cb + +# A regular expression matching the name of dummy variables (i.e. expected to +# not be used). +dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ + +# Argument names that match this expression will be ignored. +ignored-argument-names=_.*|^ignored_|^unused_ + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io diff --git a/data/benchmark/__init__.py b/data/benchmark/__init__.py index 5871c8a..13e4734 100644 --- a/data/benchmark/__init__.py +++ b/data/benchmark/__init__.py @@ -1,10 +1,3 @@ """Benchmarking tools for the MILP model.""" -from .generate_baseline_schedules import generate_baseline_schedule -from .generate_milp_schedules import ( - calculate_makespan, - generate_extended_schedule, - generate_simple_schedule, - set_up_base_lp, -) from .benchmark import run_experiments from .processing import analyze_benchmarks diff --git a/data/benchmark/benchmark.py b/data/benchmark/benchmark.py index 24f011f..d90809e 100644 --- a/data/benchmark/benchmark.py +++ b/data/benchmark/benchmark.py @@ -1,19 +1,18 @@ """Generates the benchmark data.""" -from copy import deepcopy - from mqt.bench import get_benchmark from qiskit import QuantumCircuit import numpy as np -from utils.helpers import Timer - -from .generate_baseline_schedules import generate_baseline_schedule -from .generate_milp_schedules import ( - generate_extended_schedule, - generate_simple_schedule, - set_up_base_lp, +from src.scheduling import ( + Benchmark, + InfoProblem, + PTimes, + Result, + SchedulerType, + STimes, + generate_schedule, ) -from .types import Benchmark, PTimes, Result, STimes +from utils.helpers import Timer def _generate_batch(max_qubits: int, circuits_per_batch: int) -> list[QuantumCircuit]: @@ -64,35 +63,37 @@ def run_experiments( ] benchmark_results: list[dict[str, PTimes | STimes | dict[str, Result]]] = [] for benchmark in benchmarks: - lp_instance = set_up_base_lp( - benchmark, setting, big_m=1000, timesteps=list(range(t_max)) - ) - p_times = _get_processing_times(benchmark, setting, get_integers) - s_times = _get_setup_times( + # lp_instance = set_up_base_lp( + # benchmark, setting, big_m=1000, timesteps=list(range(t_max)) + # ) + p_times = _get_benchmark_processing_times(benchmark, setting, get_integers) + s_times = _get_benchmark_setup_times( benchmark, setting, default_value=2**5, get_integers=get_integers ) + problem = InfoProblem( + base_jobs=benchmark, + accelerators=setting, + big_m=1000, + timesteps=t_max, + process_times=p_times, + setup_times=s_times, + ) result: dict[str, Result] = {} # Run the baseline model with Timer() as t0: - makespan, jobs = generate_baseline_schedule( - benchmark, setting, p_times, s_times - ) + makespan, jobs, _ = generate_schedule(problem, SchedulerType.BASELINE) result["baseline"] = Result(makespan, jobs, t0.elapsed) # Run the simple model - lp_instance_copy = deepcopy(lp_instance) + with Timer() as t1: - makespan, jobs = generate_simple_schedule( - lp_instance_copy, p_times, s_times - ) + makespan, jobs, _ = generate_schedule(problem, SchedulerType.SIMPLE) result["simple"] = Result(makespan, jobs, t1.elapsed) # Run the extended model with Timer() as t2: - makespan, jobs = generate_extended_schedule( - lp_instance, p_times, s_times - ) + makespan, jobs, _ = generate_schedule(problem, SchedulerType.EXTENDED) result["extended"] = Result(makespan, jobs, t2.elapsed) # Store results @@ -104,7 +105,7 @@ def run_experiments( return results -def _get_processing_times( +def _get_benchmark_processing_times( base_jobs: list[QuantumCircuit], accelerators: dict[str, int], get_integers: bool = False, @@ -117,7 +118,7 @@ def _get_processing_times( ] -def _get_setup_times( +def _get_benchmark_setup_times( base_jobs: list[QuantumCircuit], accelerators: dict[str, int], default_value: float, diff --git a/data/benchmark/generate_baseline_schedules.py b/data/benchmark/generate_baseline_schedules.py deleted file mode 100644 index 147f554..0000000 --- a/data/benchmark/generate_baseline_schedules.py +++ /dev/null @@ -1,164 +0,0 @@ -"""Generate baseline schedules.""" -from collections import defaultdict - -import pulp -from qiskit import QuantumCircuit - -from .types import Bin, JobHelper, JobResultInfo, PTimes, STimes - - -def generate_baseline_schedule( - jobs: list[QuantumCircuit], - accelerators: dict[str, int], - process_times: PTimes, - setup_times: STimes, -) -> tuple[float, list[JobResultInfo]]: - """Generates a baseline schedule for the given jobs and accelerators using binpacking. - - First generates the schedule using binpacking and then calculates the makespan - by executing the schedule with the correct p_ij and s_ij values. - - Args: - jobs (list[QuantumCircuit]): The list of circuits (jobs) to schedule. - accelerators (dict[str, int]): The list of accelerators to schedule on (bins). - process_times (PTimes): The process times for each job on each machine. - setup_times (STimes): The setup times for each job on each machine. - - Returns: - tuple[float, list[JobResultInfo]]: List of jobs with their assigned machine and - start and completion times. - """ - - def find_fitting_bin(job: JobHelper, bins: list[Bin]) -> int | None: - if job.circuit is None: - return None - for idx, b in enumerate(bins): - if b.capacity >= job.circuit.num_qubits: - return idx - return None - - new_jobs = [JobHelper(str(idx + 1), job) for idx, job in enumerate(jobs)] - open_bins = [ - Bin(index=0, capacity=qpu, qpu=idx) - for idx, qpu in enumerate(accelerators.values()) - ] - closed_bins = [] - index = 1 - for job in new_jobs: - if job is None or job.circuit is None: - continue - # Find the index of a fitting bin - bin_idx = find_fitting_bin(job, open_bins) - - if bin_idx is None: - # Open new bins - new_bins = [ - Bin(index=index, capacity=qpu, qpu=idx) - for idx, qpu in enumerate(accelerators.values()) - ] - index += 1 - - # Search for a fitting bin among the new ones - bin_idx = find_fitting_bin(job, new_bins) - assert bin_idx is not None, "Job doesn't fit onto any qpu" - bin_idx += len(open_bins) - open_bins += new_bins - - # Add job to selected bin - selected_bin = open_bins[bin_idx] - selected_bin.jobs.append(job) - selected_bin.capacity -= job.circuit.num_qubits - - # Close bin if full - if selected_bin.capacity == 0: - selected_bin.full = True - closed_bins.append(selected_bin) - del open_bins[bin_idx] - - # Close all open bins - for obin in open_bins: - if len(obin.jobs) > 0: - closed_bins.append(obin) - - # Build combined jobs from bins - combined_jobs: list[JobResultInfo] = [] - for _bin in sorted(closed_bins, key=lambda x: x.index): - # combined_jobs.append(ScheduledJob(job=assemble_job(_bin.jobs), qpu=_bin.qpu)) - for job in _bin.jobs: - if job is None or job.circuit is None: - continue - combined_jobs.append( - JobResultInfo( - name=job.name, - machine=list(accelerators.keys())[_bin.qpu], - start_time=_bin.index, - completion_time=-1.0, - capacity=job.circuit.num_qubits, - ) - ) - - return _calculate_result_from_baseline( - combined_jobs, process_times, setup_times, jobs, accelerators - ) - - -def _calculate_result_from_baseline( - jobs: list[JobResultInfo], - process_times: PTimes, - setup_times: STimes, - base_jobs: list[QuantumCircuit], - accelerators: dict[str, int], -) -> tuple[float, list[JobResultInfo]]: - """Converst the setup and process times to a format that can be used by the - _calculate_makespan function. - """ - lp_jobs = ["0"] + [str(idx + 1) for idx, _ in enumerate(base_jobs)] - machines = list(accelerators.keys()) - p_times = pulp.makeDict( - [lp_jobs[1:], machines], - process_times, - 0, - ) - s_times = pulp.makeDict( - [lp_jobs, lp_jobs, machines], - setup_times, - 0, - ) - - return _calculate_makespan(jobs, p_times, s_times), jobs - - -def _calculate_makespan( - jobs: list[JobResultInfo], - p_times: defaultdict[str, defaultdict[str, float]], - s_times: defaultdict[str, defaultdict[str, defaultdict[str, float]]], -) -> float: - """Calculates the actual makespan from the list of jobs. - By executing the schedule with the corret p_ij and s_ij values. - """ - assigned_machines: defaultdict[str, list[JobResultInfo]] = defaultdict(list) - for job in jobs: - assigned_machines[job.machine].append(job) - makespans = [] - for machine, assigned_jobs in assigned_machines.items(): - for job in sorted(assigned_jobs, key=lambda x: x.start_time): - # Find the last predecessor that is completed before the job starts - # this can technically change the correct predecessor to a wrong one - # because completion times are updated in the loop - # I'm not sure if copying before the loop corrects this - last_completed = max( - (job for job in assigned_jobs), key=lambda x: x.completion_time - ) - if job.start_time == 0.0: - last_completed = JobResultInfo("0", machine, 0.0, 0.0) - job.start_time = last_completed.completion_time - # calculate p_j + s_ij - completion_time = ( # check if this order is correct - last_completed.completion_time - + p_times[job.name][machine] - + s_times[last_completed.name][job.name][machine] - ) - job.completion_time = completion_time - makespans.append(max(job.completion_time for job in assigned_jobs)) - - return max(makespans) diff --git a/data/benchmark/processing.py b/data/benchmark/processing.py index 6ba837f..87a3d52 100644 --- a/data/benchmark/processing.py +++ b/data/benchmark/processing.py @@ -66,18 +66,19 @@ def analyze_benchmarks(in_file: str) -> dict[str, ImprovementResult]: # Loop through each benchmark for benchmark in benchmarks: # Extract the makespan values + results = benchmark["results"] makespans.append( MakespanResult( - baseline=benchmark["baseline"]["makespan"], - simple=benchmark["simple"]["makespan"], - extended=benchmark["extended"]["makespan"], + baseline=results["baseline"]["makespan"], + simple=results["simple"]["makespan"], + extended=results["extended"]["makespan"], ) ) times.append( TimingResult( - baseline=benchmark["baseline"]["time"], - simple=benchmark["simple"]["time"], - extended=benchmark["extended"]["time"], + baseline=results["baseline"]["time"], + simple=results["simple"]["time"], + extended=results["extended"]["time"], ) ) diff --git a/data/benchmark/types.py b/data/benchmark/types.py deleted file mode 100644 index 2b2c52e..0000000 --- a/data/benchmark/types.py +++ /dev/null @@ -1,65 +0,0 @@ -from dataclasses import dataclass, field - -from qiskit import QuantumCircuit -import pulp - - -@dataclass -class Bin: - """Helper to keep track of binning problem.""" - - capacity: int = 0 - full: bool = False - index: int = -1 - jobs: list[QuantumCircuit] = field(default_factory=list) - qpu: int = -1 - - -@dataclass -class JobHelper: - """Helper to keep track of job names.""" - - name: str - circuit: QuantumCircuit | None - - -@dataclass -class LPInstance: - """Helper to keep track of LP problem.""" - - problem: pulp.LpProblem - jobs: list[str] - machines: list[str] - x_ik: dict[str, dict[str, pulp.LpVariable]] - z_ikt: dict[str, dict[str, dict[int, pulp.LpVariable]]] - c_j: dict[str, pulp.LpVariable] - s_j: dict[str, pulp.LpVariable] - named_circuits: list[JobHelper] - - -@dataclass -class JobResultInfo: - """Helper to keep track of job results.""" - - name: str - machine: str = "" - start_time: float = -1.0 - completion_time: float = -1.0 - capacity: int = 0 - - -@dataclass -class Result: - """Benchmark result for one instance of setting+jobs.""" - - makespan: float - jobs: list[JobResultInfo] - time: float - - -# Typedef -PTimes = list[list[float]] -STimes = list[list[list[float]]] -Benchmark = list[ - dict[str, dict[str, int] | list[dict[str, PTimes | STimes | dict[str, Result]]]] -] diff --git a/data/example/__init__.py b/data/example/__init__.py new file mode 100644 index 0000000..6cbcee8 --- /dev/null +++ b/data/example/__init__.py @@ -0,0 +1,2 @@ +"""Example MILP problem.""" +from .example_problem import example_problem diff --git a/data/example/example_problem.py b/data/example/example_problem.py new file mode 100644 index 0000000..7c8e103 --- /dev/null +++ b/data/example/example_problem.py @@ -0,0 +1,114 @@ +"""Modlue for the example problem.""" +import json + +from qiskit import QuantumCircuit +import numpy as np + +from src.scheduling import InfoProblem, generate_schedule, SchedulerType + +np.random.seed(42) + + +def _calculate_exmaple_process_times(job_i, machine_k) -> float: + if job_i == 0: + return 0 + return job_i + np.random.randint(-2, 3) + machine_k + + +def _calculate_example_setup_times(job_i, job_j_, machine_k) -> float: + if job_j_ == 0: + return 0 + return (job_i + job_j_) // 2 + np.random.randint(-2, 3) + machine_k + + +def _generate_problem(big_m: int, timesteps: int) -> tuple[InfoProblem, dict[str, int]]: + # Inputs + jobs = ["0", "A", "B", "C", "D", "E", "F", "G", "H", "I"] + job_capacities = { + "0": 0, # dummy job + "A": 5, + "B": 5, + "C": 5, + "D": 5, + "E": 3, + "F": 2, + "G": 2, + "H": 2, + "I": 2, + } + machines = ["QUITO", "BELEM"] + machine_capacities = {"QUITO": 5, "BELEM": 5} + + processing_times = [ + [ + _calculate_exmaple_process_times( + job_capacities[job], machine_capacities[machine] + ) + for machine in machines + ] + for job in jobs + ] + setup_times = [ + [ + [ + 50 # BIG! + if job_i in [job_j, "0"] + else _calculate_example_setup_times( + job_capacities[job_i], + job_capacities[job_j], + machine_capacities[machine], + ) + for machine in machines + ] + for job_i in jobs + ] + for job_j in jobs + ] + del job_capacities["0"] + return ( + InfoProblem( + base_jobs=[QuantumCircuit(cap) for cap in job_capacities.values()], + accelerators=machine_capacities, + big_m=big_m, + timesteps=timesteps, + process_times=processing_times, + setup_times=setup_times, + ), + job_capacities, + ) + + +def example_problem(big_m: int, timesteps: int, filename: str = "scheduling"): + """Runs the example problem and saves the LP file and JSON file. + TODO should also run the solution explorer and produce the output pdf. + + Args: + big_m (int): LP metavariable. + timesteps (int): LP metavariable. + filename (str, optional): Filename for .lp, .json and .pdf. Defaults to "scheduling". + """ + _problem, job_capacities = _generate_problem(big_m, timesteps) + _, _, lp_instance = generate_schedule(_problem, SchedulerType.SIMPLE) + lp_instance.problem.writeLP(f"{filename}.lp") + + with open(f"{filename}.json", "w+", encoding="utf-8") as f: + json.dump( + { + "params": { + "jobs": list(job_capacities.keys()), + "machines": list(_problem.accelerators.keys()), + "job_capcities": job_capacities, + "machine_capacities": _problem.accelerators, + "timesteps": timesteps, + "processing_times": _problem.process_times, + "setup_times": _problem.setup_times, + }, + "variables": { + var.name: var.varValue + for var in lp_instance.problem.variables() + if var.name.startswith(("c_j", "s_j", "x_ik_", "z_ikt_")) + }, + }, + f, + indent=4, + ) diff --git a/data/example/solution_explorer.py b/data/example/solution_explorer.py new file mode 100644 index 0000000..89a8b7c --- /dev/null +++ b/data/example/solution_explorer.py @@ -0,0 +1,155 @@ +"""A utility script to visualize a solution to the scheduling problem.""" + +# TODO make usable with example_problem.py +# - Globals of milp are not available anymore +# - Need to read the json solution file +# - Move argparse to main +import argparse +import json + +import numpy as np +import matplotlib.pyplot as plt +import pandas as pd +from matplotlib import ticker +from matplotlib.patches import Patch + + +def _read_solution_file(solution_file: str) -> pd.DataFrame: + """Reads a solution file and returns a dataframe the information of each job. + + Args: + solution_file (str): The solution file to read + + Returns: + pd.DataFrame: A dataframe with the columns job, qubits, machine, capacity, + start, end, duration + """ + with open(solution_file, encoding="utf-8") as f: + data = json.load(f) + + scenario = data[0] + + machine_capacities = scenario["setting"] + benchmarks = scenario["benchmarks"][0]["results"] + benchmark = benchmarks["baseline"] + + rows_list = [] + for job in benchmark["jobs"]: + qubits = job["capacity"] + start = job["start_time"] + end = job["completion_time"] + duration = end - start + name = job["name"][:5] + machine = job["machine"] + capacity = machine_capacities[machine] + rows_list.append( + { + "job": name, + "qubits": qubits, + "machine": machine, + "capacity": capacity, + "start": start, + "end": end, + "duration": duration, + } + ) + + df = pd.DataFrame(rows_list) + return df + + +def generate_schedule_plot(solution_file: str, pdf_name: str | None = None): + """Generates a plot of the schedule in the solution file. + + Args: + solution_file (str): The schedule to visualize. + pdf_name (str | None, optional): The name of the output PDF to write. If not + provided, the plot is instead opened with `plt.show()`. Defaults to None. + """ + # General comment: The completion time of a job is the last time step in which it is processed + # Similarily, the start time of a job is the first time step in which it is processed + # The duration is the number of time steps in which it is processed + + # Read the solution + df = _read_solution_file(solution_file) + print(df) + + # Create a color mapping for the machines + machine_colors = ["#154060", "#98c6ea", "#527a9c"] + color_mapping = dict(zip(df["machine"].unique(), machine_colors)) + + # Plot the jobs + # The grid lines are at the start of a time step. + # Hence, if a job ends in time step 11, the bar ends at 12. + _, ax = plt.subplots() + + for i, row in df.iterrows(): + padding = 0.1 + height = 1 - 2 * padding + ax.barh( + i, + row["duration"], + left=row["start"], + height=height, + edgecolor="black", + linewidth=2, + color=color_mapping[row["machine"]], + ) + + # Create patches for the legend + patches = [] + for color in color_mapping.values(): + p = Patch(color=color) + p.set_edgecolor("black") + p.set_linewidth(1) + patches.append(p) + + # Set the xticks + ax.xaxis.set_minor_locator(ticker.MultipleLocator(1)) + + # Set the yticks + yticks = np.arange(len(df)) + ytick_labels = [f"{job} ({qubits})" for job, qubits in zip(df["job"], df["qubits"])] + ax.set_yticks(yticks) + ax.set_yticklabels(ytick_labels) + ax.invert_yaxis() + + # Set the axis labels + plt.xlabel("Time") + plt.grid(axis="x", which="major") + plt.grid(axis="x", which="minor", alpha=0.4) + legend_labels = [ + f"{machine} ({capacity})" + for machine, capacity in zip(df["machine"], df["capacity"]) + ] + plt.legend(handles=patches, labels=legend_labels) + + if pdf_name: + plt.tight_layout() + plt.savefig(pdf_name, format="pdf", bbox_inches="tight") + else: + plt.show() + + +if __name__ == "__main__": + # Parse the command line arguments + parser = argparse.ArgumentParser( + description="Visualize a solution to the scheduling problem" + ) + parser.add_argument( + "solution", + type=str, + help="The solution file to visualize", + nargs="?", + default="scheduling.sol", + ) + parser.add_argument( + "--pdf", + type=str, + help="Write the plot to a PDF file", + nargs="?", + metavar="FILE", + ) + args = parser.parse_args() + + generate_schedule_plot(args.solution, args.pdf) diff --git a/data/milp.py b/data/milp.py deleted file mode 100644 index 4a8fe51..0000000 --- a/data/milp.py +++ /dev/null @@ -1,293 +0,0 @@ -import json - -import pulp -import numpy as np - -np.random.seed(42) - - -def get_process_time(job_i, machine_k) -> int: - if job_i == 0: - return 0 - return job_i + np.random.randint(-2, 3) + machine_k - - -def get_setup_time(job_i, job_j_, machine_k) -> int: # change to float - if job_j_ == 0: - return 0 - return (job_i + job_j_) // 2 + np.random.randint(-2, 3) + machine_k - - -# Meta Variables -BIG_M = 1000 -TIMESTEPS = 2**6 - -# Inputs -jobs = ["0", "A", "B", "C", "D", "E", "F", "G", "H", "I"] -job_capacities = { - "0": 0, # dummy job - "A": 5, - "B": 5, - "C": 5, - "D": 5, - "E": 3, - "F": 2, - "G": 2, - "H": 2, - "I": 2, -} -machines = ["QUITO", "BELEM"] -machine_capacities = {"QUITO": 5, "BELEM": 5} -timesteps = list(range(TIMESTEPS + 1)) # Big enough to cover all possible timesteps - -processing_times = [ - [ - get_process_time(job_capacities[job], machine_capacities[machine]) - for machine in machines - ] - for job in jobs -] -setup_times = [ - [ - [ - 50 # BIG! - if job_i in [job_j, "0"] - else get_setup_time( - job_capacities[job_i], - job_capacities[job_j], - machine_capacities[machine], - ) - for machine in machines - ] - for job_i in jobs - ] - for job_j in jobs -] - - -def generate_lp() -> pulp.LpProblem: - # params - p_times = pulp.makeDict([jobs, machines], processing_times, 0) - s_times = pulp.makeDict([jobs, jobs, machines], setup_times, 0) - - # decision variables - x_ik = pulp.LpVariable.dicts( - "x_ik", (jobs, machines), cat=pulp.const.LpBinary - ) # x: Job i is assigned to machine k - y_ijk = pulp.LpVariable.dicts( - "y_ijk", (jobs, jobs, machines), cat=pulp.const.LpBinary - ) # y: Job i is assigned before job j - z_ikt = pulp.LpVariable.dicts( - "z_ikt", (jobs, machines, timesteps), cat=pulp.const.LpBinary - ) # z: Job i is assigned to machine k at timestep t - - a_ij = pulp.LpVariable.dicts( - "a_ij", (jobs, jobs), cat=pulp.const.LpBinary - ) # a: Job i ends before job j starts - b_ij = pulp.LpVariable.dicts( - "b_ij", (jobs, jobs), cat=pulp.const.LpBinary - ) # b: Job i ends before job j ends - d_ijk = pulp.LpVariable.dicts( - "d_ijk", (jobs, jobs, machines), cat=pulp.const.LpBinary - ) # d: Job i and j run on the same machine - - e_ijlk = pulp.LpVariable.dicts( - "e_ijlk", (jobs, jobs, jobs, machines), cat=pulp.const.LpBinary - ) - - c_max = pulp.LpVariable("makespan", 0, cat="Continuous") # c: makespan - c_j = pulp.LpVariable.dicts( - "c_j", (jobs), 0, cat="Continuous" - ) # c: completion time - s_j = pulp.LpVariable.dicts("s_j", (jobs), 0, cat="Continuous") # s: start time - - # Problem - problem = pulp.LpProblem("Scheduling", pulp.LpMinimize) - # Objective function - problem += pulp.lpSum(c_max) - - # Constraints - - # makespan constraint (1) - for job in jobs[1:]: - problem += c_j[job] <= c_max - - # job assignment constraint (3) - for job in jobs[1:]: - problem += pulp.lpSum(x_ik[job][machine] for machine in machines) == 1 - - # replaced (4) - (6): jobs can have multiple predecessors and successors - for job in jobs[1:]: - problem += ( - pulp.lpSum( - y_ijk[job_j][job][machine] for machine in machines for job_j in jobs - ) - >= 1 # each job has a predecessor - ) - # if the job has a predecessor or successor - # on a machine it also has to run on this machine - for job in jobs[1:]: - for machine in machines: - problem += ( # predecessor - x_ik[job][machine] - >= pulp.lpSum(y_ijk[job_j][job][machine] for job_j in jobs) / BIG_M - ) - problem += ( # successor - x_ik[job][machine] - >= pulp.lpSum(y_ijk[job][job_j][machine] for job_j in jobs) / BIG_M - ) - - # only if job runs at t=0 it can have predecessor 0 - for job in jobs[1:]: - for machine in machines: - problem += z_ikt[job][machine][0] == y_ijk["0"][job][machine] - - # completion time for each job (7) - for job in jobs[1:]: - problem += c_j[job] >= s_j[job] + pulp.lpSum( - x_ik[job][machine] * p_times[job][machine] for machine in machines - ) + pulp.lpSum( - y_ijk[job_j][job][machine] * s_times[job_j][job][machine] - for machine in machines - for job_j in jobs - ) - - # completion time for dummy job (8) - problem += c_j["0"] == 0 - - # order constraint (9) - for job in jobs[1:]: - for job_j in jobs: - problem += ( - c_j[job_j] - + (pulp.lpSum(y_ijk[job_j][job][machine] for machine in machines) - 1) - * BIG_M - <= s_j[job] - ) - - # (10) we don't need this constraint - # job is combleted (11) - for job in jobs[1:]: - problem += c_j[job] - s_j[job] + 1 == pulp.lpSum( - z_ikt[job][machine][timestep] - for timestep in timesteps - for machine in machines - ) - - # z fits machine assignment (12) - for job in jobs[1:]: - for machine in machines: - problem += ( - pulp.lpSum(z_ikt[job][machine][timestep] for timestep in timesteps) - <= x_ik[job][machine] * BIG_M - ) - - # z fits time assignment (13) - (14) - for job in jobs[1:]: - for timestep in timesteps: - problem += ( - pulp.lpSum(z_ikt[job][machine][timestep] for machine in machines) - * timestep - <= c_j[job] - ) - - for job in jobs[1:]: - for timestep in timesteps: - problem += s_j[job] <= pulp.lpSum( - z_ikt[job][machine][timestep] for machine in machines - ) * timestep + BIG_M * ( - 1 - pulp.lpSum(z_ikt[job][machine][timestep] for machine in machines) - ) - - # capacity constraint (15) - for timestep in timesteps: - for machine in machines: - problem += ( - pulp.lpSum( - z_ikt[job][machine][timestep] * job_capacities[job] - for job in jobs[1:] - ) - <= machine_capacities[machine] - ) - # These constraints encode the specific behavior we want to achieve with y_ijk: - # y_ijk == 1 <=> - # c_i < c_j and not exist l in J: c_i < c_l < s_j and i,j,l run on the same machine - for job in jobs[1:]: - for job_j in jobs[1:]: - if job == job_j: - problem += a_ij[job][job_j] == 0 - problem += b_ij[job][job_j] == 0 - continue - problem += a_ij[job][job_j] >= (s_j[job_j] - c_j[job]) / BIG_M - problem += b_ij[job][job_j] >= (c_j[job_j] - c_j[job]) / BIG_M - for machine in machines: - problem += ( - d_ijk[job][job_j][machine] - >= x_ik[job][machine] + x_ik[job_j][machine] - 1 - ) - for job_l in jobs[1:]: - problem += ( - e_ijlk[job][job_j][job_l][machine] - >= b_ij[job][job_l] - + a_ij[job_l][job_j] - + d_ijk[job][job_j][machine] - + d_ijk[job][job_l][machine] - - 3 - ) - - for job in jobs[1:]: - for job_j in jobs[1:]: - for machine in machines: - problem += ( - y_ijk[job][job_j][machine] - >= a_ij[job][job_j] - + (pulp.lpSum( - e_ijlk[job][job_j][job_l][machine] for job_l in jobs[1:] - ) / BIG_M - ) - + d_ijk[job][job_j][machine] - - 2 - ) - - # (16) - (20) already encoded in vars - problem.writeLP("scheduling.lp") - return problem - - -def solve_and_print_lp(filename: str, problem: pulp.LpProblem) -> None: - solver_list = pulp.listSolvers(onlyAvailable=True) - if len(solver_list) == 2: - solver = pulp.getSolver("GUROBI_CMD") - problem.solve(solver) - else: - problem.solve() - print("Status:", pulp.LpStatus[problem.status]) - - with open(filename, "w+", encoding="utf-8") as f: - json.dump( - { - "params": { - "jobs": jobs, - "machines": machines, - "job_capcities": job_capacities, - "machine_capacities": machine_capacities, - "timesteps": timesteps, - "processing_times": processing_times, - "setup_times": setup_times, - }, - "status": pulp.LpStatus[problem.status], - "objective": pulp.value(problem.objective), - "variables": { - var.name: var.varValue - for var in problem.variables() - if var.varValue > 0 - }, - }, - f, - indent=4, - ) - - -if __name__ == "__main__": - problem = generate_lp() - #solve_and_print_lp("scheduling.json", problem) diff --git a/data/solution_explorer.py b/data/solution_explorer.py deleted file mode 100644 index 645e8a7..0000000 --- a/data/solution_explorer.py +++ /dev/null @@ -1,169 +0,0 @@ -"""A utility script to visualize a solution to the scheduling problem.""" - -import argparse - -import matplotlib as mpl -import matplotlib.pyplot as plt -import milp -import pandas as pd -from matplotlib import ticker -from matplotlib.patches import Patch - -# Parse the command line arguments -parser = argparse.ArgumentParser( - description="Visualize a solution to the scheduling problem" -) -parser.add_argument( - "solution", - type=str, - help="The solution file to visualize", - nargs="?", - default="scheduling.sol", -) -parser.add_argument( - "--no-z", - help="Do not consider the z variables, just use start and completion times", - action="store_true", -) -parser.add_argument( - "--pdf", - type=str, - help="Write the plot to a PDF file", - nargs="?", - metavar="FILE", -) -args = parser.parse_args() - -# Read the solution -values: dict[str, float] = {} -with open(args.solution, encoding="utf-8") as f: - for line in f: - if line.startswith("#"): - continue - [name, value] = line.split(" ") - values[name] = float(value) - -# General comment: The completion time of a job is the last time step in which it is processed -# Similarily, the start time of a job is the first time step in which it is processed -# The duration is the number of time steps in which it is processed - - -def list2binstr(l: list[int]) -> str: - """Converts a list of 0 or 1 to a binary string. - - Args: - l (list[int]): The list of integers - - Returns: - str: A binary string - """ - return "".join(map(str, l)) - - -# Create a dataframe with the job schedule -df = pd.DataFrame( - columns=["job", "capacity", "machine", "start", "end", "duration", "zmask"] -) -for job in filter(lambda j: j != "0", milp.jobs): - start = round(values[f"s_j_{job}"]) - end = round(values[f"c_j_{job}"]) - [assigned_machine] = [ - machine for machine in milp.machines if values[f"x_ik_{job}_{machine}"] >= 0.5 - ] - capacity = milp.job_capacities[job] - duration = end - start + 1 - all_zs = [ - [round(values[f"z_ikt_{job}_{machine}_{t}"]) for t in milp.timesteps] - for machine in milp.machines - ] - [zs] = [z for z in all_zs if sum(z) > 0] - zs = list2binstr(zs) - df.loc[len(df)] = [job, capacity, assigned_machine, start, end, duration, zs] - -print(df) - -# Create patches for the legend -cmap = mpl.colormaps.get_cmap("tab10") -color_mapping = { - m: cmap(i / (len(milp.machines) - 1)) for i, m in enumerate(milp.machines) -} -patches = [] -for color in color_mapping.values(): - p = Patch(color=color) - p.set_edgecolor("black") - p.set_linewidth(1) - patches.append(p) - -# Create tick points (where a job starts or ends) -tick_points = df["start"].values.tolist() + (df["end"] + 1).values.tolist() -tick_points = list(set(tick_points)) -tick_points.sort() - -# Plot the jobs -# The grid lines are at the start of a time step. -# Hence, if a job ends in time step 11, the bar ends at 12. -fig, ax = plt.subplots() - - -def collect_binary_one_runs(s: str) -> list[tuple[int, int]]: - """Given a binary string, returns the start and length of all runs of - consecutive ones. - - Example: "001110001" -> [(2, 3), (8, 1)] - - Args: - s (str): A binary string - - Returns: - list[tuple[int, int]]: A list of tuples (start, length) of all runs - """ - runs = [] - start = None - for i, c in enumerate(s): - if c == "1": - # Start the run if not already started - if start is None: - start = i - - # Check if run is at end - if i == len(s) - 1 or s[i + 1] == "0": - runs.append((start, i - start + 1)) - start = None - return runs - - -for i, row in df.iterrows(): - color = color_mapping[row["machine"]] - bar_color = color if args.no_z else "none" - PADDING = 0.1 - HEIGHT = 1 - 2 * PADDING - if not args.no_z: - zruns = collect_binary_one_runs(row["zmask"]) - ax.broken_barh(zruns, (i - 0.5 + PADDING, HEIGHT), color=color) - - ax.barh( - i, - row["duration"], - left=row["start"], - height=HEIGHT, - edgecolor="black", - linewidth=2, - color=bar_color, - ) - -yticks = list(range(len(df))) -ax.set_yticks(yticks) -ax.set_yticklabels(df["job"]) -ax.invert_yaxis() -ax.xaxis.set_minor_locator(ticker.MultipleLocator(base=1.0)) -# plt.rc("font", family="serif") -plt.xlabel("Time") -plt.grid(axis="x", which="major") -plt.grid(axis="x", which="minor", alpha=0.4) -plt.legend(handles=patches, labels=color_mapping.keys()) - -if args.pdf: - plt.tight_layout() - plt.savefig(args.pdf, format="pdf", bbox_inches="tight") -else: - plt.show() diff --git a/data/table.py b/data/table.py deleted file mode 100644 index 6db1b1b..0000000 --- a/data/table.py +++ /dev/null @@ -1,50 +0,0 @@ -import argparse - -import milp as milp - -# Parse the command line arguments -parser = argparse.ArgumentParser( - description="Print setup and process times as LaTeX table" -) -parser.add_argument( - "machine", - type=int, - help="The machine to print the times for", -) -parser.add_argument( - "mode", - help="What times to print", - choices=["setup", "process"], -) -args = parser.parse_args() - - -def print_row(row: list[str]): - print(" & ".join(row) + " \\\\") - - -mode = args.mode -m = args.machine -machine = milp.machines[m] - -# Print the header -header = [machine] + milp.jobs -print_row(header) - -# Print the table -for i, job_i in enumerate(milp.jobs): - row = [job_i] - if mode == "setup": - for j, job_j in enumerate(milp.jobs): - if job_i == job_j: - item = "-" - else: - s_ijk = milp.get_setup_time(i, j, m) - item = str(s_ijk) - row.append(f"{item:4}") - elif mode == "process": - p_ik = milp.get_process_time(i, m) - row.append(f"{p_ik:4}") - else: - raise ValueError(f"Unknown mode: {mode}") - print_row(row) diff --git a/run_example_problem.py b/run_example_problem.py new file mode 100644 index 0000000..37063f1 --- /dev/null +++ b/run_example_problem.py @@ -0,0 +1,9 @@ +"""Runs the example problem.""" +from data.example import example_problem + +# Meta Variables +BIG_M = 1000 +TIMESTEPS = 2**6 + +if __name__ == "__main__": + example_problem(BIG_M, TIMESTEPS,"./data/results/scheduling") diff --git a/src/common/experiment.py b/src/common/experiment.py index 6f29fe3..0f7e08e 100644 --- a/src/common/experiment.py +++ b/src/common/experiment.py @@ -33,10 +33,10 @@ class CircuitJob: coefficient: tuple[float, WeightType] | None cregs: int index: int - instance: QuantumCircuit | None + circuit: QuantumCircuit | None n_shots: int observable: PauliList # Should be single pauli - partition_lable: str + partition_label: str result_counts: dict[str, int] | None uuid: UUID @@ -50,7 +50,7 @@ class CombinedJob: coefficients: list[tuple[float, WeightType]] = field(default_factory=list) cregs: list[int] = field(default_factory=list) indices: list[int] = field(default_factory=list) - instance: QuantumCircuit | None = None + circuit: QuantumCircuit | None = None mapping: list[slice] = field(default_factory=list) n_shots: int = 0 observable: PauliList | None = None @@ -82,10 +82,10 @@ def job_from_circuit(circuit: QuantumCircuit) -> CircuitJob: coefficient=None, cregs=len(circuit.cregs), index=0, - instance=circuit, + circuit=circuit, n_shots=1024, observable=PauliList(""), - partition_lable="1", + partition_label="1", result_counts=None, uuid=uuid4(), ) @@ -105,11 +105,11 @@ def jobs_from_experiment(experiment: Experiment) -> list[CircuitJob]: coefficient=experiment.coefficients[idx], cregs=len(circuit.cregs), index=idx, - instance=circuit, + circuit=circuit, n_shots=experiment.n_shots, # TODO this might need to change for proper observables observable=experiment.observables, - partition_lable=experiment.partition_label, + partition_label=experiment.partition_label, result_counts=None, uuid=experiment.uuid, ) diff --git a/src/provider/__init__.py b/src/provider/__init__.py index 5592cca..e1a1b1f 100644 --- a/src/provider/__init__.py +++ b/src/provider/__init__.py @@ -1,4 +1,3 @@ """Module for runtime components.""" from .accelerator import Accelerator, IBMQBackend from .accelerator_group import AcceleratorGroup -from .scheduler import Scheduler diff --git a/src/provider/accelerator_group.py b/src/provider/accelerator_group.py index 1f3b446..a40f1e0 100644 --- a/src/provider/accelerator_group.py +++ b/src/provider/accelerator_group.py @@ -171,9 +171,7 @@ def _run_job( return None run_job = job.job try: - run_job.result_counts = accs[pool_id].run_and_get_counts( - run_job.instance, run_job.n_shots - ) + run_job.result_counts = accs[pool_id].run_and_get_counts(run_job.circuit, run_job.n_shots) except Exception as exc: print(exc) return run_job diff --git a/src/provider/generate_schedule.py b/src/provider/generate_schedule.py deleted file mode 100644 index 476028a..0000000 --- a/src/provider/generate_schedule.py +++ /dev/null @@ -1,572 +0,0 @@ -"""Methods for generating a schedule for a given provider.""" -from bisect import insort -from collections import defaultdict -from dataclasses import dataclass, field - -import pulp - -from src.common import CircuitJob, ScheduledJob -from src.tools import assemble_job -from .accelerator import Accelerator - - -@dataclass -class Bin: - """Helper to keep track of binning problem.""" - - capacity: int - index: int - qpu: int - full: bool = False - jobs: list[CircuitJob] = field(default_factory=list) - - -@dataclass -class LPInstance: - """Helper to keep track of LP problem.""" - - problem: pulp.LpProblem - jobs: list[str] - machines: list[str] - x_ik: dict[str, dict[str, pulp.LpVariable]] - z_ikt: dict[str, dict[str, dict[int, pulp.LpVariable]]] - c_j: dict[str, pulp.LpVariable] - s_j: dict[str, pulp.LpVariable] - - -@dataclass -class JobResultInfo: - """Keep track of job results after scheduling.""" - - name: str - machine: str - start_time: float - completion_time: float - - -def generate_baseline_schedule( - jobs: list[CircuitJob], accelerators: list[Accelerator], **kwargs -) -> list[ScheduledJob]: - """Schedule jobs onto qpus. - - Each qpu represents a bin. - Since all jobs are asumed to take the same amount of time, they are associated - with a timestep (index). - k-first fit bin means we keep track of all bins that still have space left. - Once a qpu is full, we add a new bin for all qpus at the next timestep. - We can't run circuits with one qubit, scheduling doesn't take this into account. - Args: - jobs (list[CircuitJob]): The list of jobs to run. - accelerators (list[Accelerator]): The list of available accelerators. - Returns: - list[ScheduledJob]: A list of Jobs scheduled to accelerators. - """ - # Use binpacking to combine circuits into qpu sized jobs - # placeholder for propper scheduling - # TODO set a flag when an experiment is done - # TODO consider number of shots - # Assumption: bins should be equally loaded and take same amount of time - - def find_fitting_bin(job: CircuitJob, bins: list[Bin]) -> int | None: - for idx, b in enumerate(bins): - if b.capacity >= job.instance.num_qubits: - return idx - return None - - open_bins = [ - Bin(index=0, capacity=qpu.qubits, qpu=idx) - for idx, qpu in enumerate(accelerators) - ] - closed_bins = [] - index = 1 - for job in jobs: - if job.instance is None: - continue - # Find the index of a fitting bin - bin_idx = find_fitting_bin(job, open_bins) - - if bin_idx is None: - # Open new bins - new_bins = [ - Bin(index=index, capacity=qpu.qubits, qpu=idx) - for idx, qpu in enumerate(accelerators) - ] - index += 1 - - # Search for a fitting bin among the new ones - bin_idx = find_fitting_bin(job, new_bins) - assert bin_idx is not None, "Job doesn't fit onto any qpu" - bin_idx += len(open_bins) - open_bins += new_bins - - # Add job to selected bin - selected_bin = open_bins[bin_idx] - selected_bin.jobs.append(job) - selected_bin.capacity -= job.instance.num_qubits - - # Close bin if full - if selected_bin.capacity == 0: - selected_bin.full = True - closed_bins.append(selected_bin) - del open_bins[bin_idx] - - # Close all open bins - for obin in open_bins: - if len(obin.jobs) > 0: - closed_bins.append(obin) - - # Build combined jobs from bins - combined_jobs = [] - for _bin in sorted(closed_bins, key=lambda x: x.index): - combined_jobs.append(ScheduledJob(job=assemble_job(_bin.jobs), qpu=_bin.qpu)) - return combined_jobs - - -def generate_simple_schedule( - jobs: list[CircuitJob], - accelerators: list[Accelerator], - big_m: int = 1000, - t_max: int = 2**7, - **kwargs, -) -> list[ScheduledJob]: - """Calclulate a schedule for the given jobs and accelerators based on the simple MILP. - - The simple MILP includes the machine dependent setup times. - Args: - jobs (list[CircuitJob]): The list of jobs to run. - accelerators (list[Accelerator]): The list of available accelerators. - big_m (int, optional): M hepler for LP. Defaults to 1000. - t_max (int, optional): Max number of Timesteps. Defaults to 2**7. - Returns: - list[ScheduledJob]: A list of Jobs scheduled to accelerators. - """ - lp_instance = _set_up_base_lp(jobs, accelerators, big_m, list(range(t_max))) - # (4) - (7), (9) - p_times = pulp.makeDict( - [lp_instance.jobs[1:], lp_instance.machines], - _get_processing_times(jobs, accelerators), - 0, - ) - s_times = pulp.makeDict( - [lp_instance.jobs[1:], lp_instance.machines], - _get_simple_setup_times(jobs, accelerators), - 0, - ) - - for job in lp_instance.jobs[1:]: - lp_instance.problem += lp_instance.c_j[job] >= lp_instance.s_j[ # (7) - job - ] + pulp.lpSum( - lp_instance.x_ik[job][machine] - * (p_times[job][machine] + s_times[job][machine]) - for machine in lp_instance.machines - ) - - return _solve_lp(lp_instance, jobs, accelerators) - - -def generate_extended_schedule( - jobs: list[CircuitJob], - accelerators: list[Accelerator], - big_m: int = 1000, - t_max: int = 2**7, - **kwargs, -) -> list[ScheduledJob]: - """Calclulate a schedule for the given jobs and accelerators based on the extended MILP. - - The extended MILP includes the sequence dependent setup time between jobs. - This depends on the unique pre- and successor condition described in the paper. - Args: - jobs (list[CircuitJob]): The list of jobs to run. - accelerators (list[Accelerator]): The list of available accelerators. - big_m (int, optional): M hepler for LP. Defaults to 1000. - t_max (int, optional): Max number of Timesteps. Defaults to 2**7. - Returns: - list[ScheduledJob]: A list of Jobs scheduled to accelerators. - """ - lp_instance = _set_up_base_lp(jobs, accelerators, big_m, list(range(t_max))) - - # additional parameters - p_times = pulp.makeDict( - [lp_instance.jobs, lp_instance.machines], - _get_processing_times(jobs, accelerators), - 0, - ) - # TODO check if this works correctly for job "0" - s_times = pulp.makeDict( - [lp_instance.jobs, lp_instance.jobs, lp_instance.machines], - _get_setup_times(jobs, accelerators, kwargs.get("default_value", 50)), - 0, - ) - - # decision variables - y_ijk = pulp.LpVariable.dicts( - "y_ijk", - (lp_instance.jobs, lp_instance.jobs, lp_instance.machines), - cat="Binary", - ) - a_ij = pulp.LpVariable.dicts( - "a_ij", (lp_instance.jobs, lp_instance.jobs), cat="Binary" - ) # a: Job i ends before job j starts - b_ij = pulp.LpVariable.dicts( - "b_ij", (lp_instance.jobs, lp_instance.jobs), cat="Binary" - ) # b: Job i ends before job j ends - d_ijk = pulp.LpVariable.dicts( - "d_ijk", - (lp_instance.jobs, lp_instance.jobs, lp_instance.machines), - cat="Binary", - ) # d: Job i and j run on the same machine - - e_ijlk = pulp.LpVariable.dicts( - "e_ijlk", - (lp_instance.jobs, lp_instance.jobs, lp_instance.jobs, lp_instance.machines), - cat="Binary", - ) - - for job in lp_instance.jobs[1:]: - lp_instance.problem += ( # (4) - pulp.lpSum( - y_ijk[job_j][job][machine] - for machine in lp_instance.machines - for job_j in lp_instance.jobs - ) - >= 1 # each job has a predecessor - ) - lp_instance.problem += lp_instance.c_j[job] >= lp_instance.s_j[ # (7) - job - ] + pulp.lpSum( - lp_instance.x_ik[job][machine] * p_times[job][machine] - for machine in lp_instance.machines - ) + pulp.lpSum( - y_ijk[job_j][job][machine] * s_times[job_j][job][machine] - for machine in lp_instance.machines - for job_j in lp_instance.jobs - ) - for machine in lp_instance.machines: - lp_instance.problem += ( # predecessor (6) - lp_instance.x_ik[job][machine] - >= pulp.lpSum(y_ijk[job_j][job][machine] for job_j in lp_instance.jobs) - / big_m - ) - lp_instance.problem += ( # successor - lp_instance.x_ik[job][machine] - >= pulp.lpSum(y_ijk[job][job_j][machine] for job_j in lp_instance.jobs) - / big_m - ) - lp_instance.problem += ( # (5) - lp_instance.z_ikt[job][machine][0] == y_ijk["0"][job][machine] - ) - for job_j in lp_instance.jobs: - lp_instance.problem += ( - lp_instance.c_j[job_j] - + ( - pulp.lpSum( - y_ijk[job_j][job][machine] for machine in lp_instance.machines - ) - - 1 - ) - * big_m - <= lp_instance.s_j[job] - ) - - # Extended constraints - for job in lp_instance.jobs[1:]: - for job_j in lp_instance.jobs[1:]: - if job == job_j: - lp_instance.problem += a_ij[job][job_j] == 0 - lp_instance.problem += b_ij[job][job_j] == 0 - continue - lp_instance.problem += ( - a_ij[job][job_j] - >= (lp_instance.s_j[job_j] - lp_instance.c_j[job]) / big_m - ) - lp_instance.problem += ( - b_ij[job][job_j] - >= (lp_instance.c_j[job_j] - lp_instance.c_j[job]) / big_m - ) - for machine in lp_instance.machines: - lp_instance.problem += ( - d_ijk[job][job_j][machine] - >= lp_instance.x_ik[job][machine] - + lp_instance.x_ik[job_j][machine] - - 1 - ) - for job_l in lp_instance.jobs[1:]: - lp_instance.problem += ( - e_ijlk[job][job_j][job_l][machine] - >= b_ij[job][job_l] - + a_ij[job_l][job_j] - + d_ijk[job][job_j][machine] - + d_ijk[job][job_l][machine] - - 3 - ) - - for job in lp_instance.jobs[1:]: - for job_j in lp_instance.jobs[1:]: - for machine in lp_instance.machines: - lp_instance.problem += ( - y_ijk[job][job_j][machine] - >= a_ij[job][job_j] - + ( - pulp.lpSum( - e_ijlk[job][job_j][job_l][machine] - for job_l in lp_instance.jobs[1:] - ) - / big_m - ) - + d_ijk[job][job_j][machine] - - 2 - ) - return _solve_lp(lp_instance, jobs, accelerators) - - -def _set_up_base_lp( - base_jobs: list[CircuitJob], - accelerators: list[Accelerator], - big_m: int, - timesteps: list[int], -) -> LPInstance: - # Set up input params - jobs = ["0"] + [str(job.uuid) for job in base_jobs] - job_capacities = { - str(job.uuid): job.instance.num_qubits - for job in base_jobs - if job.instance is not None - } - job_capacities["0"] = 0 - machines = [str(qpu.uuid) for qpu in accelerators] - machine_capacities = {str(qpu.uuid): qpu.qubits for qpu in accelerators} - - # set up problem variables - x_ik = pulp.LpVariable.dicts("x_ik", (jobs, machines), cat="Binary") - z_ikt = pulp.LpVariable.dicts("z_ikt", (jobs, machines, timesteps), cat="Binary") - - c_j = pulp.LpVariable.dicts("c_j", (jobs), 0, cat="Continuous") - s_j = pulp.LpVariable.dicts("s_j", (jobs), 0, cat="Continuous") - c_max = pulp.LpVariable("makespan", 0, cat="Continuous") - - problem = pulp.LpProblem("Scheduling", pulp.LpMinimize) - # set up problem constraints - problem += pulp.lpSum(c_max) # (obj) - problem += c_j["0"] == 0 # (8) - for job in jobs[1:]: - problem += c_j[job] <= c_max # (1) - problem += pulp.lpSum(x_ik[job][machine] for machine in machines) == 1 # (3) - problem += c_j[job] - s_j[job] + 1 == pulp.lpSum( # (11) - z_ikt[job][machine][timestep] - for timestep in timesteps - for machine in machines - ) - for machine in machines: - problem += ( # (12) - pulp.lpSum(z_ikt[job][machine][timestep] for timestep in timesteps) - <= x_ik[job][machine] * big_m - ) - - for timestep in timesteps: - problem += ( # (13) - pulp.lpSum(z_ikt[job][machine][timestep] for machine in machines) - * timestep - <= c_j[job] - ) - problem += s_j[job] <= pulp.lpSum( # (14) - z_ikt[job][machine][timestep] for machine in machines - ) * timestep + big_m * ( - 1 - pulp.lpSum(z_ikt[job][machine][timestep] for machine in machines) - ) - for timestep in timesteps: - for machine in machines: - problem += ( # (15) - pulp.lpSum( - z_ikt[job][machine][timestep] * job_capacities[job] - for job in jobs[1:] - ) - <= machine_capacities[machine] - ) - return LPInstance( - problem=problem, - jobs=jobs, - machines=machines, - x_ik=x_ik, - z_ikt=z_ikt, - c_j=c_j, - s_j=s_j, - ) - - -def _get_processing_times( - base_jobs: list[CircuitJob], - accelerators: list[Accelerator], -) -> list[list[float]]: - return [ - [qpu.compute_processing_time(job.instance) for qpu in accelerators] - for job in base_jobs - if job.instance is not None - ] - - -def _get_setup_times( - base_jobs: list[CircuitJob], accelerators: list[Accelerator], default_value: int -) -> list[list[list[float]]]: - return [ - [ - [ - qpu.compute_setup_time(job_i.instance, job_j.instance) - for qpu in accelerators - ] - for job_i in base_jobs - if job_i.instance is not None - ] - for job_j in base_jobs - if job_j.instance is not None - ] - - -def _get_simple_setup_times( - base_jobs: list[CircuitJob], accelerators: list[Accelerator] -) -> list[list[float]]: - return [ - [ - qpu.compute_setup_time(job_i.instance, circuit_to=None) - for qpu in accelerators - ] - for job_i in base_jobs - if job_i.instance is not None - ] - - -def _solve_lp( - lp_instance: LPInstance, jobs: list[CircuitJob], accelerators: list[Accelerator] -) -> list[ScheduledJob]: - solver_list = pulp.listSolvers(onlyAvailable=True) - gurobi = "GUROBI_CMD" - if gurobi in solver_list: - solver = pulp.getSolver(gurobi) - lp_instance.problem.solve(solver) - else: - lp_instance.problem.solve() - return _generate_schedule_from_lp(lp_instance, jobs, accelerators) - - -def _generate_schedule_from_lp( - lp_instance: LPInstance, jobs: list[CircuitJob], accelerators: list[Accelerator] -) -> list[ScheduledJob]: - assigned_jobs = { - job: JobResultInfo(name=job, machine="", start_time=-1.0, completion_time=-1.0) - for job in lp_instance.jobs - } - for var in lp_instance.problem.variables(): - if var.name.startswith("x_") and var.varValue > 0.0: - name = var.name.split("_")[2:] - assigned_jobs["-".join(name[:5])].machine = "-".join(name[-5:]) - elif var.name.startswith("s_"): - name = "-".join(var.name.split("_")[2:]) - assigned_jobs[name].start_time = float(var.varValue) - elif var.name.startswith("c_"): - name = "-".join(var.name.split("_")[2:]) - assigned_jobs[name].completion_time = float(var.varValue) - del assigned_jobs["0"] - machine_assignments: dict[str, list[JobResultInfo]] = defaultdict(list) - for job in assigned_jobs.values(): - if job.machine != "": - machine_assignments[job.machine].append(job) - - closed_bins = [] - accelerator_uuids = [str(qpu.uuid) for qpu in accelerators] - for machine, machine_jobs in machine_assignments.items(): - try: - machine_idx = accelerator_uuids.index(machine) - except ValueError: - continue - machine_capacity = accelerators[machine_idx].qubits - closed_bins += _form_bins(machine_capacity, machine_idx, machine_jobs, jobs) - combined_jobs = [] - - for _bin in sorted(closed_bins, key=lambda x: x.index): - if len(_bin.jobs) > 0: - combined_jobs.append( - ScheduledJob(job=assemble_job(_bin.jobs), qpu=_bin.qpu) - ) - return combined_jobs - - -def _form_bins( - machine_capacity: int, - machine_id: int, - assigned_jobs: list[JobResultInfo], - jobs: list[CircuitJob], -) -> list[Bin]: - # TODO: adapat number of shots - bins: list[Bin] = [] - current_time = -1.0 - open_jobs: list[JobResultInfo] = [] - counter = -1 - current_bin = Bin(capacity=machine_capacity, index=counter, qpu=machine_id) - - for job in sorted(assigned_jobs, key=lambda x: x.start_time): - if job.start_time == current_time: - # s_i = s_j -> add to same bin - _append_if_exists(job, current_bin, jobs, open_jobs=open_jobs) - continue - - # s_i > s_j -> add to new bin - counter += 1 - _bin = Bin(capacity=machine_capacity, index=counter, qpu=machine_id) - if len(open_jobs) == 0: - # no open jobs -> add simply add to new bin - _append_if_exists(job, _bin, jobs, open_jobs=open_jobs) - - elif open_jobs[0].completion_time > job.start_time: - # noone finishes before job starts -> add to new bin which includes all open jobs - _append_if_exists( - job, _bin, jobs, current_bin=current_bin, open_jobs=open_jobs - ) - else: - # someone finishes before job starts - # -> add bin for each job that finishes before job starts - open_jobs_copy = open_jobs.copy() - for open_job in open_jobs_copy: - if open_job.completion_time > job.start_time: - # found the first that is still running, can stop - _append_if_exists( - job, _bin, jobs, current_bin=current_bin, open_jobs=open_jobs - ) - break - if open_job not in open_jobs: - # has been removed in the meantime - continue - # remove the last job and all that end at the same time - _bin.jobs = current_bin.jobs - for second_job in open_jobs_copy: - if second_job.completion_time == open_job.completion_time: - _append_if_exists( - second_job, _bin, jobs, open_jobs=open_jobs, do_remove=True - ) - current_bin = _bin - counter += 1 - _bin = Bin(capacity=machine_capacity, index=counter, qpu=machine_id) - - bins.append(_bin) - current_time = job.start_time - current_bin = _bin - - return bins - - -def _append_if_exists( - job: JobResultInfo, - _bin: Bin, - jobs: list[CircuitJob], - current_bin: Bin | None = None, - open_jobs: list[JobResultInfo] | None = None, - do_remove: bool = False, -) -> None: - if cjob := next((j for j in jobs if str(j.uuid) == job.name), None): - if current_bin is not None: - _bin.jobs = current_bin.jobs - _bin.jobs.append(cjob) - if open_jobs is not None: - if do_remove: - open_jobs.remove(job) - else: - insort(open_jobs, job, key=lambda x: x.completion_time) diff --git a/src/scheduling/__init__.py b/src/scheduling/__init__.py new file mode 100644 index 0000000..b14bdb6 --- /dev/null +++ b/src/scheduling/__init__.py @@ -0,0 +1,5 @@ +"""Scheduling related types and functions.""" +from .types import * + +from .generate_schedule import generate_schedule +from .scheduler import Scheduler diff --git a/src/scheduling/bin_schedule.py b/src/scheduling/bin_schedule.py new file mode 100644 index 0000000..8cbf861 --- /dev/null +++ b/src/scheduling/bin_schedule.py @@ -0,0 +1,148 @@ +"""Generate baseline schedules.""" +from uuid import uuid4 + +from qiskit import QuantumCircuit + +from src.common import CircuitJob, ScheduledJob +from src.provider import Accelerator +from src.tools import assemble_job +from .types import Bin, JobResultInfo + + +def generate_bin_info_schedule( + circuits: list[QuantumCircuit], + accelerators: dict[str, int], +) -> list[JobResultInfo]: + """Generates a baseline schedule for the given jobs and accelerators using binpacking. + + First generates the schedule using binpacking and then calculates the makespan + by executing the schedule with the correct p_ij and s_ij values. + + Args: + circuits (list[QuantumCircuit]): The list of circuits (jobs) to schedule. + accelerators (dict[str, int]): The list of accelerators to schedule on (bins). + + Returns: + tuple[float, list[JobResultInfo]]: List of jobs with their assigned machine and + start and completion times. + """ + jobs = [ + CircuitJob( + uuid=uuid4(), + circuit=job, + coefficient=None, + cregs=1, + index=0, + n_shots=1024, + observable="", + partition_label="", + result_counts={}, + ) + for job in circuits + ] + # Build combined jobs from bins + closed_bins = _do_bin_pack(jobs, list(accelerators.values())) + combined_jobs: list[JobResultInfo] = [] + for _bin in sorted(closed_bins, key=lambda x: x.index): + for job in _bin.jobs: + if job is None or job.circuit is None: + continue + combined_jobs.append( + JobResultInfo( + name=str(job.uuid), + machine=list(accelerators.keys())[_bin.qpu], + start_time=_bin.index, + completion_time=-1.0, + capacity=job.circuit.num_qubits, + ) + ) + + return combined_jobs + + +def generate_bin_executable_schedule( + jobs: list[CircuitJob], accelerators: list[Accelerator] +) -> list[ScheduledJob]: + """Schedule jobs onto qpus. + + Each qpu represents a bin. + Since all jobs are asumed to take the same amount of time, they are associated + with a timestep (index). + k-first fit bin means we keep track of all bins that still have space left. + Once a qpu is full, we add a new bin for all qpus at the next timestep. + We can't run circuits with one qubit, scheduling doesn't take this into account. + + Args: + jobs (list[CircuitJob]): The list of jobs to run. + accelerators (list[Accelerator]): The list of available accelerators. + + Returns: + list[ScheduledJob]: A list of Jobs scheduled to accelerators. + """ + # Use binpacking to combine circuits into qpu sized jobs + # placeholder for propper scheduling + # TODO set a flag when an experiment is done + # TODO consider number of shots + # Assumption: bins should be equally loaded and take same amount of time + closed_bins = _do_bin_pack(jobs, [qpu.qubits for qpu in accelerators]) + # Build combined jobs from bins + combined_jobs = [] + for _bin in sorted(closed_bins, key=lambda x: x.index): + combined_jobs.append(ScheduledJob(job=assemble_job(_bin.jobs), qpu=_bin.qpu)) + return combined_jobs + + +def _do_bin_pack( + jobs: list[CircuitJob], accelerator_capacities: list[int] +) -> list[Bin]: + open_bins = [ + Bin(index=0, capacity=qpu, qpu=idx) + for idx, qpu in enumerate(accelerator_capacities) + ] + closed_bins = [] + index = 1 + for job in jobs: + if job.circuit is None: + continue + # Find the index of a fitting bin + bin_idx = _find_fitting_bin(job, open_bins) + + if bin_idx is None: + # Open new bins + new_bins = [ + Bin(index=index, capacity=qpu, qpu=idx) + for idx, qpu in enumerate(accelerator_capacities) + ] + index += 1 + + # Search for a fitting bin among the new ones + bin_idx = _find_fitting_bin(job, new_bins) + assert bin_idx is not None, "Job doesn't fit onto any qpu" + bin_idx += len(open_bins) + open_bins += new_bins + + # Add job to selected bin + selected_bin = open_bins[bin_idx] + selected_bin.jobs.append(job) + selected_bin.capacity -= job.circuit.num_qubits + + # Close bin if full + if selected_bin.capacity == 0: + selected_bin.full = True + closed_bins.append(selected_bin) + del open_bins[bin_idx] + + # Close all open bins + for obin in open_bins: + if len(obin.jobs) > 0: + closed_bins.append(obin) + return closed_bins + + +def _find_fitting_bin(job: CircuitJob, bins: list[Bin]) -> int | None: + if job.circuit is None: + raise ValueError("Job has no circuit") + for idx, b in enumerate(bins): + if b.capacity >= job.circuit.num_qubits: + return idx + return None diff --git a/src/scheduling/calculate_makespan.py b/src/scheduling/calculate_makespan.py new file mode 100644 index 0000000..fc30699 --- /dev/null +++ b/src/scheduling/calculate_makespan.py @@ -0,0 +1,137 @@ +"""__summary__""" +from collections import defaultdict +from copy import deepcopy + +import pulp + +from .types import JobResultInfo, LPInstance, PTimes, STimes + + +def calculate_makespan( + lp_instance: LPInstance, + jobs: list[JobResultInfo], + process_times: PTimes, + setup_times: STimes, +) -> float: + """Calculates the actual makespan from the list of results. + + Executes the schedule with the corret p_ij and s_ij values. + + Args: + lp_instance (LPInstance): The base LP instance. + jobs (list[JobResultInfo]): The list of job results. + process_times (PTimes): The correct p_ij. + setup_times (STimes) The correct s_ij. + + Returns: + float: The makespan of the schedule. + """ + return _calc_makespan( + jobs, + process_times, + setup_times, + lp_instance.jobs, + lp_instance.machines, + ) + + +def _find_last_completed( + job_name: str, jobs: list[JobResultInfo], machine: str +) -> JobResultInfo: + """Finds the last completed job before the given job from the original schedule.""" + for job in jobs: + if job.name == job_name: + original_starttime = job.start_time + break + else: + raise ValueError(f"Job {job_name} not found in {jobs}") + completed_before = [j for j in jobs if j.completion_time <= original_starttime] + if len(completed_before) == 0: + return JobResultInfo("0", machine, 0.0, 0.0, 0) + + return max(completed_before, key=lambda x: x.completion_time) + + +def calculate_bin_makespan( + jobs: list[JobResultInfo], + process_times: PTimes, + setup_times: STimes, + accelerators: dict[str, int], +) -> float: + """Calculates the actual makespan from the list of jobs. + By executing the schedule with the corret p_im and s_ijm values. + + Args: + jobs (list[JobResultInfo]): The scheduled jobs. + process_times (PTimes): The processing times. + setup_times (STimes): The setup times. + accelerators (dict[str, int]): The list of available accelerators. + + Returns: + float: Makespan according to the given schedule. + """ + lp_jobs = ["0"] + [job.name for job in jobs] + machines = list(accelerators.keys()) + return _calc_makespan(jobs, process_times, setup_times, lp_jobs, machines, True) + + +def _calc_makespan( + jobs: list[JobResultInfo], + process_times: PTimes, + setup_times: STimes, + job_names: list[str], + machines: list[str], + for_bin: bool = False, +) -> float: + s_times = pulp.makeDict( + [job_names, job_names, machines], + setup_times, + 0, + ) + p_times = pulp.makeDict( + [job_names[1:], machines], + process_times, + 0, + ) + + assigned_machines: defaultdict[str, list[JobResultInfo]] = defaultdict(list) + for job in jobs: + assigned_machines[job.machine].append(job) + makespans = [] + for machine, assigned_jobs in assigned_machines.items(): + assigned_jobs_copy = deepcopy(assigned_jobs) + for job in sorted(assigned_jobs, key=lambda x: x.start_time): + # Find the last predecessor that is completed before the job starts + # this can technically change the correct predecessor to a wrong one + # because completion times are updated in the loop + # I'm not sure if copying before the loop corrects this + if for_bin: + last_completed = max( + (job for job in assigned_jobs), key=lambda x: x.completion_time + ) + if job.start_time == 0.0: + last_completed = JobResultInfo("0", machine, 0.0, 0.0, 0) + job.start_time = last_completed.completion_time + else: + last_completed = _find_last_completed( + job.name, assigned_jobs_copy, machine + ) + if job.start_time == 0.0: + last_completed = JobResultInfo("0", machine, 0.0, 0.0, 0) + job.start_time = next( + ( + j.completion_time + for j in assigned_jobs + if last_completed.name == j.name + ), + 0.0, + ) + # calculate p_j + s_ij + job.completion_time = ( # check if this order is correct + last_completed.completion_time + + p_times[job.name][machine] + + s_times[last_completed.name][job.name][machine] + ) + makespans.append(max(job.completion_time for job in assigned_jobs)) + + return max(makespans) diff --git a/src/scheduling/extract_schedule.py b/src/scheduling/extract_schedule.py new file mode 100644 index 0000000..2fb3d6a --- /dev/null +++ b/src/scheduling/extract_schedule.py @@ -0,0 +1,244 @@ +"""Process the results of the LP solver to generate a schedule.""" +from collections import defaultdict +from bisect import insort +from typing import Callable + +import pulp + +from src.common import CircuitJob, ScheduledJob +from src.provider import Accelerator +from src.tools import assemble_job + +from .calculate_makespan import calculate_bin_makespan +from .types import Bin, LPInstance, JobResultInfo, PTimes, STimes + + +def generate_bin_info_schedule( + jobs: list[JobResultInfo], + process_times: PTimes, + setup_times: STimes, + accelerators: dict[str, int], +) -> tuple[float, list[JobResultInfo]]: + """Generates a schedule for evaluation purposes from bin packing. + + TODO make in to a overloaded function, summarize input params + Args: + jobs (list[JobResultInfo]): The list of the scheduled jobs. + process_times (PTimes): The original processing times. + setup_times (STimes): The original setup times. + accelerators (dict[str, int]): The list of used accelerators. + + Returns: + tuple[float, list[JobResultInfo]]: The objective value and the schedule. + """ + lp_jobs = ["0"] + [job.name for job in jobs] # TODO + machines = list(accelerators.keys()) + p_times = pulp.makeDict( + [lp_jobs[1:], machines], + process_times, + 0, + ) + s_times = pulp.makeDict( + [lp_jobs, lp_jobs, machines], + setup_times, + 0, + ) + return calculate_bin_makespan(jobs, p_times, s_times, accelerators), jobs + + +def extract_info_schedule( + lp_instance: LPInstance, +) -> tuple[float, list[JobResultInfo]]: + """Extracts a schedule for evaluation purposes. + + Args: + lp_instance (LPInstance): A solved LP instance. + + Returns: + tuple[float, list[JobResultInfo]]: The objective value and the list of jobs with their + with their assigned machine and start and completion times. + """ + # TODO check if _first_name_func is needed once we change to uuids + assigned_jobs = _extract_gurobi_results(lp_instance, _first_name_func) + return lp_instance.problem.objective.value(), list(assigned_jobs.values()) + + +def extract_executable_schedule( + lp_instance: LPInstance, jobs: list[CircuitJob], accelerators: list[Accelerator] +) -> list[ScheduledJob]: + """Extracts a schedule for execution purposes. + + Solves the problem and generates bins to execute simultaneous jobs. + TODO still assumes that bins take same time. This is only done for acceleratorgroup. + Args: + lp_instance (LPInstance): A solved LP instance. + jobs (list[CircuitJob]): The list of original jobs. + accelerators (list[Accelerator]): The list of available accelerators. + + Returns: + list[ScheduledJob]: _description_ + """ + assigned_jobs = _extract_gurobi_results(lp_instance, _second_name_func) + return _generate_schedule_from_lp(assigned_jobs, jobs, accelerators) + + +def _extract_gurobi_results( + lp_instance: LPInstance, name_function: Callable[[str], tuple[str, str]] +) -> dict[str, JobResultInfo]: + assigned_jobs = { + job.name: JobResultInfo( + name=job.name, + machine="", + start_time=-1.0, + completion_time=-1.0, + capacity=job.circuit.num_qubits, + ) + if job.circuit is not None + else JobResultInfo( + name=job.name, + machine="", + start_time=-1.0, + completion_time=-1.0, + capacity=0, + ) + for job in lp_instance.named_circuits + } + for var in lp_instance.problem.variables(): + if var.name.startswith("x_") and var.varValue > 0.0: + names = name_function(var.name) + assigned_jobs[names[0]].machine = names[1] + elif var.name.startswith("s_"): + name = "-".join(var.name.split("_")[2:]) + assigned_jobs[name].start_time = float(var.varValue) + elif var.name.startswith("c_"): + name = "-".join(var.name.split("_")[2:]) + # TODO for some reason this was name[0] before + assigned_jobs[name].completion_time = float(var.varValue) + del assigned_jobs["0"] + return assigned_jobs + + +def _generate_schedule_from_lp( + assigned_jobs: dict[str, JobResultInfo], + jobs: list[CircuitJob], + accelerators: list[Accelerator], +) -> list[ScheduledJob]: + machine_assignments: dict[str, list[JobResultInfo]] = defaultdict(list) + for job in assigned_jobs.values(): + if job.machine != "": + machine_assignments[job.machine].append(job) + + closed_bins = [] + accelerator_uuids = [str(qpu.uuid) for qpu in accelerators] + for machine, machine_jobs in machine_assignments.items(): + try: + machine_idx = accelerator_uuids.index(machine) + except ValueError: + continue + machine_capacity = accelerators[machine_idx].qubits + closed_bins += _form_bins(machine_capacity, machine_idx, machine_jobs, jobs) + combined_jobs = [] + + for _bin in sorted(closed_bins, key=lambda x: x.index): + combined_jobs.append(ScheduledJob(job=assemble_job(_bin.jobs), qpu=_bin.qpu)) + return combined_jobs + + +def _form_bins( + machine_capacity: int, + machine_id: int, + assigned_jobs: list[JobResultInfo], + jobs: list[CircuitJob], +) -> list[Bin]: + # TODO: adapat number of shots + bins: list[Bin] = [] + current_time = -1.0 + open_jobs: list[JobResultInfo] = [] + counter = -1 + current_bin = Bin(capacity=machine_capacity, index=counter, qpu=machine_id) + + for job in sorted(assigned_jobs, key=lambda x: x.start_time): + if job.start_time == current_time: + # s_i = s_j -> add to same bin + _append_if_exists(job, current_bin, jobs, open_jobs=open_jobs) + continue + + # s_i > s_j -> add to new bin + counter += 1 + _bin = Bin(capacity=machine_capacity, index=counter, qpu=machine_id) + if len(open_jobs) == 0: + # no open jobs -> add simply add to new bin + _append_if_exists(job, _bin, jobs, open_jobs=open_jobs) + current_bin = _bin + current_time = job.start_time + elif open_jobs[0].completion_time > job.start_time: + # noone finishes before job starts -> add to new bin which includes all open jobs + _append_if_exists( + job, _bin, jobs, current_bin=current_bin, open_jobs=open_jobs + ) + bins.append(current_bin) + current_bin = _bin + current_time = job.start_time + else: + # someone finishes before job starts + # -> add bin for each job that finishes before job starts + open_jobs_copy = open_jobs.copy() + for open_job in open_jobs_copy: + if open_job.completion_time > job.start_time: + # found the first that is still running, can stop + _append_if_exists( + job, _bin, jobs, current_bin=current_bin, open_jobs=open_jobs + ) + break + if open_job not in open_jobs: + # has been removed in the meantime + continue + # remove the last job and all that end at the same time + _bin.jobs = current_bin.jobs + for second_job in open_jobs_copy: + if second_job == open_job: + continue + if second_job.completion_time == open_job.completion_time: + _append_if_exists( + second_job, _bin, jobs, open_jobs=open_jobs, do_remove=True + ) + current_bin = _bin + counter += 1 + _bin = Bin(capacity=machine_capacity, index=counter, qpu=machine_id) + + bins.append(current_bin) + current_bin = _bin + current_time = job.start_time + + return bins + + +def _append_if_exists( + job: JobResultInfo, + _bin: Bin, + jobs: list[CircuitJob], + current_bin: Bin | None = None, + open_jobs: list[JobResultInfo] | None = None, + do_remove: bool = False, +) -> None: + if cjob := next((j for j in jobs if str(j.uuid) == job.name), None): + if current_bin is not None: + _bin.jobs = current_bin.jobs + _bin.jobs.append(cjob) + if open_jobs is not None: + if do_remove: + open_jobs.remove(job) + else: + insort(open_jobs, job, key=lambda x: x.completion_time) + + +def _first_name_func(name: str) -> tuple[str, str]: + # For single character jobs + names = name.split("_")[2:] + return names[0], names[1] + + +def _second_name_func(name: str) -> tuple[str, str]: + # For UUIDS + names = name.split("_")[2:] + return "-".join(names[:5]), "-".join(names[-5:]) diff --git a/src/scheduling/generate_schedule.py b/src/scheduling/generate_schedule.py new file mode 100644 index 0000000..ccfa8a4 --- /dev/null +++ b/src/scheduling/generate_schedule.py @@ -0,0 +1,180 @@ +"""Wrapper for schedule generation.""" +from uuid import uuid4 + +from src.common import CircuitJob, ScheduledJob +from src.provider import Accelerator + +from .bin_schedule import ( + generate_bin_info_schedule, + generate_bin_executable_schedule, +) +from .calculate_makespan import calculate_makespan, calculate_bin_makespan +from .extract_schedule import extract_info_schedule, extract_executable_schedule +from .setup_lp import set_up_base_lp, set_up_extended_lp, set_up_simple_lp +from .solve_lp import solve_lp +from .types import ( + ExecutableProblem, + InfoProblem, + JobResultInfo, + LPInstance, + PTimes, + SchedulerType, + STimes, +) + + +def generate_schedule( + problem: InfoProblem | ExecutableProblem, + schedule_type: SchedulerType, +) -> tuple[float, list[JobResultInfo], LPInstance | None] | list[ScheduledJob]: + """Generates the schedule for the given problem and schedule type. + + Baseline: Generates a schedule using binpacking. + Else generates the schedule using MILP and then calculates the makespan + by executing the schedule with the correct p_ij and s_ij values. + Args: + problem (InfoProblem | ExecutableProblem ): The full problem definition. + schedule_type (SchedulerType): The type of schedule to use. + + Returns: + list[ScheduledJob]: List of ScheduledJobs. | + tuple[float, list[JobResultInfo]]: The makespan and the list of jobs with their + assigned machine and start and completion times. + + Raises: + NotImplementedError: Unsupported types. + """ + if isinstance(problem, InfoProblem): + return _generate_schedule_info(problem, schedule_type) + if isinstance(problem, ExecutableProblem): + return _generate_schedule_exec(problem, schedule_type) + raise NotImplementedError("Unsupported type") + + +def _generate_schedule_info( + problem: InfoProblem, + schedule_type: SchedulerType, +) -> tuple[float, list[JobResultInfo], LPInstance | None]: + """Generates the schedule for the given problem and schedule type. + + Calculates the true makespan by 'executing' the schedlue. + Args: + problem (InfoProblem): The full problem definition. + schedule_type (SchedulerType): The type of schedule to use. + + Returns: + tuple[float, list[JobResultInfo]]: The makespan and the list of jobs with their + assigned machine and start and completion times. + """ + if schedule_type == SchedulerType.BASELINE: + jobs = generate_bin_info_schedule(problem.base_jobs, problem.accelerators) + makespan = calculate_bin_makespan( + jobs, problem.process_times, problem.setup_times, problem.accelerators + ) + return makespan, jobs, None + + lp_instance = set_up_base_lp( + problem.base_jobs, problem.accelerators, problem.big_m, problem.timesteps + ) + + if schedule_type == SchedulerType.EXTENDED: + lp_instance = set_up_extended_lp( + lp_instance=lp_instance, + process_times=problem.process_times, + setup_times=problem.setup_times, + ) + else: + lp_instance = set_up_simple_lp( + lp_instance=lp_instance, + process_times=problem.process_times, + setup_times=problem.setup_times, + ) + + lp_instance = solve_lp(lp_instance) + _, jobs = extract_info_schedule(lp_instance) + makespan = calculate_makespan( + lp_instance, jobs, problem.process_times, problem.setup_times + ) + + return makespan, jobs, lp_instance + + +def _generate_schedule_exec( + problem: ExecutableProblem, + schedule_type: SchedulerType, +) -> list[ScheduledJob]: + """Generates the schedule for the given problem and schedule type. + + Process and setup times are calculated on the fly. + The jobs are returned in a format that can be executed by AcceleratorGroup. + Args: + problem (ExecutableProblem): The full problem definition. + schedule_type (SchedulerType): The type of schedule to use. + + Returns: + list[ScheduledJob]: List of ScheduledJobs. + """ + if schedule_type == SchedulerType.BASELINE: + return generate_bin_executable_schedule(problem.base_jobs, problem.accelerators) + + lp_instance = set_up_base_lp( + problem.base_jobs, problem.accelerators, problem.big_m, problem.timesteps + ) + process_times = _get_processing_times(problem.base_jobs, problem.accelerators) + setup_times = _get_setup_times(problem.base_jobs, problem.accelerators) + if schedule_type == SchedulerType.EXTENDED: + lp_instance = set_up_extended_lp( + lp_instance=lp_instance, + process_times=process_times, + setup_times=setup_times, + ) + else: + lp_instance = set_up_simple_lp( + lp_instance=lp_instance, + process_times=process_times, + setup_times=setup_times, + ) + + lp_instance = solve_lp(lp_instance) + return extract_executable_schedule( + lp_instance, problem.base_jobs, problem.accelerators + ) + + +def _get_setup_times( + base_jobs: list[CircuitJob], accelerators: list[Accelerator] +) -> STimes: + job_0 = CircuitJob( + coefficient=None, + cregs=0, + index=0, + circuit=None, + n_shots=0, + observable="", + partition_label="", + result_counts=None, + uuid=uuid4(), + ) + return [ + [ + [ + 50.0 + if job_i.circuit is None or job_j.circuit is None + else qpu.compute_setup_time(job_i.circuit, job_j.circuit) + for qpu in accelerators + ] + for job_i in [job_0] + base_jobs + ] + for job_j in [job_0] + base_jobs + ] + + +def _get_processing_times( + base_jobs: list[CircuitJob], + accelerators: list[Accelerator], +) -> PTimes: + return [[0.0 for _ in accelerators]] + [ + [qpu.compute_processing_time(job.circuit) for qpu in accelerators] + for job in base_jobs + if job.circuit is not None + ] diff --git a/src/provider/scheduler.py b/src/scheduling/scheduler.py similarity index 86% rename from src/provider/scheduler.py rename to src/scheduling/scheduler.py index e5aa32a..5bbedf6 100644 --- a/src/provider/scheduler.py +++ b/src/scheduling/scheduler.py @@ -1,5 +1,4 @@ """A scheduler for quantum circuits.""" -from enum import auto, Enum from uuid import UUID from qiskit import QuantumCircuit @@ -12,22 +11,9 @@ ScheduledJob, ) from src.tools import cut_circuit - -from .accelerator import Accelerator -from .accelerator_group import AcceleratorGroup -from .generate_schedule import ( - generate_baseline_schedule, - generate_extended_schedule, - generate_simple_schedule, -) - - -class SchedulerType(Enum): - """The type of scheduler to use.""" - - BASELINE = auto() - SIMPLE = auto() - EXTENDED = auto() +from src.provider import Accelerator, AcceleratorGroup +from .generate_schedule import generate_schedule +from .types import ExecutableProblem, SchedulerType class Scheduler: @@ -39,6 +25,7 @@ class Scheduler: - Consider 1 free qubit remaining when scheduling - Make a continuous run function / sumbit new circuits - Keep track of current schedule and update it + - Find out the maximum number timesteps needed """ def __init__( @@ -81,20 +68,17 @@ def generate_schedule(self, circuits: list[QuantumCircuit]) -> list[ScheduledJob """ jobs = sorted( self._convert_to_jobs(circuits), - key=lambda x: x.instance.num_qubits if x.instance is not None else 0, + key=lambda x: x.circuit.num_qubits if x.circuit is not None else 0, reverse=True, ) - schedule_function = generate_baseline_schedule - match self.stype: - case SchedulerType.SIMPLE: - schedule_function = generate_simple_schedule - case SchedulerType.EXTENDED: - schedule_function = generate_extended_schedule - case SchedulerType.BASELINE: - schedule_function = generate_baseline_schedule - case _: - raise ValueError("Unknown scheduler type") - return schedule_function(accelerators=self.accelerator.accelerators, jobs=jobs) + problem = ExecutableProblem( + base_jobs=jobs, + accelerators=self.accelerator.accelerators, + big_m=1000, + timesteps=2**7, + ) + + return generate_schedule(problem, self.stype) def _convert_to_jobs(self, circuits: list[QuantumCircuit]) -> list[CircuitJob]: """Generates jobs from circuits. diff --git a/data/benchmark/generate_milp_schedules.py b/src/scheduling/setup_lp.py similarity index 59% rename from data/benchmark/generate_milp_schedules.py rename to src/scheduling/setup_lp.py index 6564398..07a3fd5 100644 --- a/data/benchmark/generate_milp_schedules.py +++ b/src/scheduling/setup_lp.py @@ -1,21 +1,94 @@ -"""Helpers to generate MILP based schedules.""" -from collections import defaultdict -from copy import deepcopy - +"""Module for setting up the base LP instance.""" +from qiskit import QuantumCircuit import numpy as np import pulp -from qiskit import QuantumCircuit -from .types import JobHelper, JobResultInfo, LPInstance, PTimes, STimes +from src.common import CircuitJob +from src.provider import Accelerator +from .types import LPInstance, JobHelper, PTimes, STimes def set_up_base_lp( + base_jobs: list[CircuitJob] | list[QuantumCircuit], + accelerators: list[Accelerator] | dict[str, int], + big_m: int, + timesteps: int, +) -> LPInstance: + """Wrapper to set up the base LP instance through one function. + + Generates a base LP instance with the given jobs and accelerators. + It contains all the default constraints and variables. + Does not contain the constraints regarding the successor relationship. + + Args: + base_jobs (list[CircuitJob] | list[QuantumCircuit]): The list of quantum cirucits (jobs). + accelerators (list[Accelerator] | dict[str, int]): + The list of available accelerators (machines). + big_m (int): Metavariable for the LP. + timesteps (int): Meta variable for the LP, big enough to cover largest makespan. + + Returns: + LPInstance: The LP instance object. + + Raises: + NotImplementedError: If the input types are not supported. + """ + if isinstance(accelerators, list): + return _set_up_base_lp_exec(base_jobs, accelerators, big_m, timesteps) + if isinstance(accelerators, dict): + return _set_up_base_lp_info(base_jobs, accelerators, big_m, timesteps) + + raise NotImplementedError + + +def _set_up_base_lp_exec( + base_jobs: list[CircuitJob], + accelerators: list[Accelerator], + big_m: int, + timesteps: int, +) -> LPInstance: + """Sets up the base LP instance for use in the provider. + + Generates a base LP instance with the given jobs and accelerators. + It contains all the default constraints and variables. + Does not contain the constraints regarding the successor relationship. + + Args: + base_jobs (list[CircuitJob]): The list of quantum cirucits (jobs). + accelerators (list[Accelerator]): The list of available accelerators (machines). + big_m (int): Metavariable for the LP. + timesteps (int): Meta variable for the LP, big enough to cover largest makespan. + + Returns: + LPInstance: The LP instance object. + """ + # Set up input params + job_capacities = { + str(job.uuid): job.circuit.num_qubits + for job in base_jobs + if job.circuit is not None + } + job_capacities = {"0": 0} | job_capacities + machine_capacities = {str(qpu.uuid): qpu.qubits for qpu in accelerators} + + lp_instance = _define_lp( + job_capacities, machine_capacities, list(range(timesteps)), big_m + ) + lp_instance.named_circuits = [JobHelper("0", None)] + [ + JobHelper(str(job.uuid), job.circuit) + for job in base_jobs + if job.circuit is not None + ] + return lp_instance + + +def _set_up_base_lp_info( base_jobs: list[QuantumCircuit], accelerators: dict[str, int], big_m: int, - timesteps: list[int], + timesteps: int, ) -> LPInstance: - """Sets up the base LP instance. + """Sets up the base LP instance for use outside of provider. Generates a base LP instance with the given jobs and accelerators. It contains all the default constraints and variables. @@ -25,19 +98,34 @@ def set_up_base_lp( base_jobs (list[QuantumCircuit]): The list of quantum cirucits (jobs). accelerators (dict[str, int]): The list of available accelerators (machines). big_m (int): Metavariable for the LP. - timesteps (list[int]): Meta variable for the LP, big enough to cover largest makespan. + timesteps (int): Meta variable for the LP, big enough to cover largest makespan. Returns: LPInstance: The LP instance object. """ # Set up input params - jobs = ["0"] + [str(idx + 1) for idx, _ in enumerate(base_jobs)] job_capacities = {str(idx + 1): job.num_qubits for idx, job in enumerate(base_jobs)} - job_capacities["0"] = 0 - machines = list(accelerators.keys()) + job_capacities = {"0": 0} | job_capacities + machine_capacities = accelerators - # set up problem variables + lp_instance = _define_lp( + job_capacities, machine_capacities, list(range(timesteps)), big_m + ) + lp_instance.named_circuits = [JobHelper("0", None)] + [ + JobHelper(str(idx + 1), job) for idx, job in enumerate(base_jobs) + ] + return lp_instance + + +def _define_lp( + job_capacities: dict[str, int], + machine_capacities: dict[str, int], + timesteps: list[int], + big_m: int, +) -> LPInstance: + jobs = list(job_capacities.keys()) + machines = list(machine_capacities.keys()) x_ik = pulp.LpVariable.dicts("x_ik", (jobs, machines), cat="Binary") z_ikt = pulp.LpVariable.dicts("z_ikt", (jobs, machines, timesteps), cat="Binary") @@ -91,30 +179,26 @@ def set_up_base_lp( z_ikt=z_ikt, c_j=c_j, s_j=s_j, - named_circuits=[JobHelper("0", None)] - + [JobHelper(str(idx + 1), job) for idx, job in enumerate(base_jobs)], + named_circuits=[], ) -def generate_simple_schedule( +def set_up_simple_lp( lp_instance: LPInstance, process_times: PTimes, setup_times: STimes, -) -> tuple[float, list[JobResultInfo]]: - """Generates a simple schedule for the given jobs and accelerators using a simple MILP. +) -> LPInstance: + """Sets up the LP for the simple scheduling problem. - First generates the schedule using MILP and then calculates the makespan - by executing the schedule with the correct p_ij and s_ij values. - The MILP uses setup times depending on the maximum over all possible values. + Setup times are overestimated, and not depending on the sequence. Args: - lp_instance (LPInstance): The base LP instance. - process_times (PTimes): The process times for each job on each machine. - setup_times (STimes): The setup times for each job on each machine. + lp_instance (LPInstance): The base LP. + process_times (PTimes): Original process times. + setup_times (STimes): Original setup times. Returns: - tuple[float, list[JobResultInfo]]: List of jobs with their assigned machine and - start and completion times. + LPInstance: The updated LP instance. """ p_times = pulp.makeDict( [lp_instance.jobs[1:], lp_instance.machines], @@ -135,35 +219,47 @@ def generate_simple_schedule( * (p_times[job][machine] + s_times[job][machine]) for machine in lp_instance.machines ) - _, jobs = _solve_lp(lp_instance) - s_times = pulp.makeDict( - [lp_instance.jobs, lp_instance.jobs, lp_instance.machines], - setup_times, - 0, - ) - return calculate_makespan(jobs, p_times, s_times), jobs + return lp_instance -def generate_extended_schedule( +def _get_simple_setup_times( + setup_times: STimes, +) -> list[list[float]]: + """Overestimates the actual setup times for the simple LP.""" + new_times = [ + list( + np.max( + times[[t not in [0, idx] for t, _ in enumerate(times)]].transpose(), + axis=1, + ) + ) + for idx, times in enumerate(np.array(setup_times)) + ] + # remove job 0 + del new_times[0] + for times in new_times: + del times[0] + return new_times + + +def set_up_extended_lp( lp_instance: LPInstance, process_times: PTimes, setup_times: STimes, big_m: int = 1000, -) -> tuple[float, list[JobResultInfo]]: - """Generates the extended schedule for the given jobs and accelerators using a complex MILP. +) -> LPInstance: + """Sets up the LP for the extended scheduling problem. - First generates the schedule using MILP and then calculates the makespan - by executing the schedule with the correct p_ij and s_ij values. + This uses the complex successor relationship. Args: - lp_instance (LPInstance): The base LP instance. - process_times (PTimes): The process times for each job on each machine. - setup_times (STimes): The setup times for each job on each machine. + lp_instance (LPInstance): The base LP. + process_times (PTimes): Original process times. + setup_times (STimes): Original setup times. big_m (int, optional): Metavariable for the LP. Defaults to 1000. Returns: - tuple[float, list[JobResultInfo]]: List of jobs with their assigned machine and - start and completion times. + LPInstance: The updated LP instance. """ p_times = pulp.makeDict( [lp_instance.jobs[1:], lp_instance.machines], @@ -292,130 +388,4 @@ def generate_extended_schedule( + d_ijk[job][job_j][machine] - 2 ) - _, jobs = _solve_lp(lp_instance) - return calculate_makespan(jobs, p_times, s_times), jobs - - -def _solve_lp(lp_instance: LPInstance) -> tuple[float, list[JobResultInfo]]: - """Solves a LP using gurobi and generates the results.""" - solver_list = pulp.listSolvers(onlyAvailable=True) - gurobi = "GUROBI_CMD" - if gurobi in solver_list: - solver = pulp.getSolver(gurobi) - lp_instance.problem.solve(solver) - else: - lp_instance.problem.solve() - return _generate_results(lp_instance) - - -def _generate_results(lp_instance: LPInstance) -> tuple[float, list[JobResultInfo]]: - assigned_jobs = { - job.name: JobResultInfo(name=job.name, capacity=job.circuit.num_qubits) - if job.circuit is not None - else JobResultInfo(name=job.name) - for job in lp_instance.named_circuits - } - for var in lp_instance.problem.variables(): - if var.name.startswith("x_") and var.varValue > 0.0: - name = var.name.split("_")[2:] - assigned_jobs[name[0]].machine = name[1] - elif var.name.startswith("s_"): - name = var.name.split("_")[2] - assigned_jobs[name].start_time = float(var.varValue) - elif var.name.startswith("c_"): - name = var.name.split("_")[2] - assigned_jobs[name[0]].completion_time = float(var.varValue) - del assigned_jobs["0"] - return lp_instance.problem.objective.value(), list(assigned_jobs.values()) - - -def calculate_makespan( - jobs: list[JobResultInfo], - p_times: defaultdict[str, defaultdict[str, float]], - s_times: defaultdict[str, defaultdict[str, defaultdict[str, float]]], -) -> float: - """Calculates the actual makespan from the list of results. - - Executes the schedule with the corret p_ij and s_ij values. - - Args: - jobs (list[JobResultInfo]): The list of job results. - p_times (defaultdict[str, defaultdict[str, float]]): The correct p_ij. - s_times (defaultdict[str, defaultdict[str, defaultdict[str, float]]]): The correct s_ij. - - Returns: - float: The makespan of the schedule. - """ - assigned_machines: defaultdict[str, list[JobResultInfo]] = defaultdict(list) - for job in jobs: - assigned_machines[job.machine].append(job) - makespans = [] - for machine, assigned_jobs in assigned_machines.items(): - assigned_jobs_copy = deepcopy(assigned_jobs) - for job in sorted(assigned_jobs, key=lambda x: x.start_time): - # Find the last predecessor that is completed before the job starts - # this can technically change the correct predecessor to a wrong one - # because completion times are updated in the loop - # I'm not sure if copying before the loop corrects this - last_completed = _find_last_completed(job.name, assigned_jobs_copy, machine) - - if job.start_time == 0.0: - last_completed = JobResultInfo("0", machine, 0.0, 0.0) - job.start_time = next( - ( - j.completion_time - for j in assigned_jobs - if last_completed.name == j.name - ), - 0.0, - ) - # calculate p_j + s_ij - completion_time = ( # check if this order is correct - job.start_time - + p_times[job.name][machine] - + s_times[last_completed.name][job.name][machine] - ) - job.completion_time = completion_time - makespans.append(max(job.completion_time for job in assigned_jobs)) - - return max(makespans) - - -def _get_simple_setup_times( - setup_times: STimes, -) -> list[list[float]]: - """Overestimates the actual setup times for the simple LP.""" - new_times = [ - list( - np.max( - times[[t not in [0, idx] for t, _ in enumerate(times)]].transpose(), - axis=1, - ) - ) - for idx, times in enumerate(np.array(setup_times)) - ] - # remove job 0 - del new_times[0] - for times in new_times: - del times[0] - return new_times - - -def _find_last_completed( - job_name: str, jobs: list[JobResultInfo], machine: str -) -> JobResultInfo: - """Finds the last completed job before the given job from the original schedule.""" - original_starttime = next( - (j.start_time for j in jobs if job_name == j.name), - 0, - ) - completed_before = [j for j in jobs if j.completion_time <= original_starttime] - if len(completed_before) == 0: - return JobResultInfo("0", machine, 0.0, 0.0) - - completed_before = sorted( - completed_before, - key=lambda x: x.completion_time, - reverse=True, - ) - return completed_before[0] + return lp_instance diff --git a/src/scheduling/solve_lp.py b/src/scheduling/solve_lp.py new file mode 100644 index 0000000..9476c83 --- /dev/null +++ b/src/scheduling/solve_lp.py @@ -0,0 +1,23 @@ +"""Solves a LP using gurobi if available.""" +import pulp + +from .types import LPInstance + + +def solve_lp(lp_instance: LPInstance) -> LPInstance: + """Solves a LP using gurobi. + + Args: + lp_instance (LPInstance): The input LP instance. + + Returns: + lp_instance (LPInstance): The LP instance with the solved problem object.. + """ + solver_list = pulp.listSolvers(onlyAvailable=True) + gurobi = "GUROBI_CMD" + if gurobi in solver_list: + solver = pulp.getSolver(gurobi) + lp_instance.problem.solve(solver) + else: + lp_instance.problem.solve() + return lp_instance diff --git a/src/scheduling/types.py b/src/scheduling/types.py new file mode 100644 index 0000000..e5c7c69 --- /dev/null +++ b/src/scheduling/types.py @@ -0,0 +1,107 @@ +"""Helper Classes for Scheduling Tasks.""" +from dataclasses import dataclass, field +from enum import auto, Enum + +from qiskit import QuantumCircuit +import pulp + +from src.common import CircuitJob +from src.provider import Accelerator + + +class SchedulerType(Enum): + """The type of scheduler to use.""" + + BASELINE = auto() + SIMPLE = auto() + EXTENDED = auto() + + +@dataclass +class Bin: + """Helper to keep track of binning problem.""" + + capacity: int + index: int + qpu: int + jobs: list[QuantumCircuit] = field(default_factory=list) + full: bool = False + + +@dataclass +class JobHelper: + """Helper to keep track of job names.""" + + name: str + circuit: QuantumCircuit | None # TODO optional necessary? + + +@dataclass +class LPInstance: + """Helper to keep track of LP problem.""" + + problem: pulp.LpProblem + jobs: list[str] + machines: list[str] + x_ik: dict[str, dict[str, pulp.LpVariable]] + z_ikt: dict[str, dict[str, dict[int, pulp.LpVariable]]] + c_j: dict[str, pulp.LpVariable] + s_j: dict[str, pulp.LpVariable] + named_circuits: list[JobHelper] + + +@dataclass +class JobResultInfo: + """Helper to keep track of job results.""" + + name: str + machine: str + start_time: float + completion_time: float + capacity: int + + +@dataclass +class Result: + """Benchmark result for one instance of setting+jobs.""" + + makespan: float + jobs: list[JobResultInfo] + time: float + + +# Typedef +PTimes = list[list[float]] +STimes = list[list[list[float]]] +Benchmark = list[ # TODO should we move this? + dict[str, dict[str, int] | list[dict[str, PTimes | STimes | dict[str, Result]]]] +] + + +@dataclass +class ExecutableProblem: + """Defines an executable problem. + + This calculates setup and process times based on the accelerators. + """ + + base_jobs: list[CircuitJob] + accelerators: list[Accelerator] + big_m: int + timesteps: int + + +@dataclass +class InfoProblem: + """Defines an "InfoProblem" whis is used for evaluation purposes. + + This requires setup and process times to be defined as they are + not calculated from the accelerators. + """ + + base_jobs: list[QuantumCircuit] + accelerators: dict[str, int] + big_m: int + timesteps: int + process_times: PTimes + setup_times: STimes diff --git a/src/tools/assembling.py b/src/tools/assembling.py index e8b2c42..efc6421 100644 --- a/src/tools/assembling.py +++ b/src/tools/assembling.py @@ -48,22 +48,24 @@ def assemble_job(circuit_jobs: list[CircuitJob]) -> CombinedJob: Returns: CombinedJob: The combined job """ + if len(circuit_jobs) == 0: + return CombinedJob() combined_job = CombinedJob(n_shots=circuit_jobs[0].n_shots) circuits = [] qubit_count = 0 observable = PauliList("") for job in circuit_jobs: combined_job.indices.append(job.index) - circuits.append(job.instance) + circuits.append(job.circuit) combined_job.coefficients.append(job.coefficient) combined_job.mapping.append( - slice(qubit_count, qubit_count + job.instance.num_qubits) + slice(qubit_count, qubit_count + job.circuit.num_qubits) ) - qubit_count += job.instance.num_qubits + qubit_count += job.circuit.num_qubits observable = observable.expand(job.observable) - combined_job.partition_lables.append(job.partition_lable) + combined_job.partition_lables.append(job.partition_label) combined_job.uuids.append(job.uuid) combined_job.cregs.append(job.cregs) - combined_job.instance = assemble_circuit(circuits) + combined_job.circuit = assemble_circuit(circuits) combined_job.observable = observable return combined_job diff --git a/src/tools/cutting.py b/src/tools/cutting.py index 2cddfe9..08358a6 100644 --- a/src/tools/cutting.py +++ b/src/tools/cutting.py @@ -22,15 +22,16 @@ def cut_circuit( Args: circuit (QuantumCircuit): The circuit to cut partitions (list[int]): The partitions to cut the circuit into (given as a list of qubits) - observables (PauliList | None, optional): The observables for each qubit. Defaults to None (= Z measurements). + observables (PauliList | None, optional): The observables for each qubit. + Defaults to None (= Z measurements). Returns: tuple[list[Experiment], UUID]: _description_ """ if observables is None: observables = PauliList("Z" * circuit.num_qubits) - partitions = _generate_partition_labels(partitions) - partitioned_problem = partition_problem(circuit, partitions, observables) + gen_partitions = _generate_partition_labels(partitions) + partitioned_problem = partition_problem(circuit, gen_partitions, observables) experiments, coefficients = generate_cutting_experiments( partitioned_problem.subcircuits, partitioned_problem.subobservables, diff --git a/src/tools/reconstructing.py b/src/tools/reconstructing.py index ee048eb..1642e68 100644 --- a/src/tools/reconstructing.py +++ b/src/tools/reconstructing.py @@ -21,11 +21,11 @@ def reconstruct_experiments_from_circuits(jobs: list[CircuitJob]) -> list[Experi experiments = [] for uuid in uuids: uuid_jobs = list(filter(partial(lambda j, u: j.uuid == u, u=uuid), jobs)) - partitions = set(job.partition_lable for job in uuid_jobs) + partitions = set(job.partition_label for job in uuid_jobs) for partition in partitions: partition_jobs = sorted( filter( - partial(lambda j, p: j.partition_lable == p, p=partition), uuid_jobs + partial(lambda j, p: j.partition_label == p, p=partition), uuid_jobs ), key=lambda x: x.index, ) @@ -69,10 +69,10 @@ def reconstruct_counts_from_job(job: CombinedJob) -> list[CircuitJob]: coefficient=job.coefficients[idx], cregs=job.cregs[idx], index=job.indices[idx], - instance=None, + circuit=None, n_shots=job.n_shots, observable=observable, - partition_lable=job.partition_lables[idx], + partition_label=job.partition_lables[idx], result_counts=counts, uuid=job.uuids[idx], ) diff --git a/tests/scheduling/__init__.py b/tests/scheduling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/provider/test_generate_schedule.py b/tests/scheduling/test_generate_schedule.py similarity index 55% rename from tests/provider/test_generate_schedule.py rename to tests/scheduling/test_generate_schedule.py index 92b914d..53324b2 100644 --- a/tests/provider/test_generate_schedule.py +++ b/tests/scheduling/test_generate_schedule.py @@ -3,15 +3,18 @@ from src.circuits import create_ghz from src.common import IBMQBackend, job_from_circuit from src.provider import Accelerator -from src.provider.generate_schedule import generate_simple_schedule +from src.scheduling import generate_schedule, SchedulerType, ExecutableProblem -def test_generate_simple_schedule() -> None: +def test_generate_schedule() -> None: """_summary_""" accelerators = [ Accelerator(IBMQBackend.BELEM, shot_time=1, reconfiguration_time=1), Accelerator(IBMQBackend.QUITO, shot_time=1, reconfiguration_time=1), ] jobs = [job_from_circuit(create_ghz(i)) for i in range(2, 6)] - schedule = generate_simple_schedule(jobs, accelerators, t_max=20) - assert len(schedule) <= 3 + problem = ExecutableProblem(jobs, accelerators, big_m=100, timesteps=20) + + schedule = generate_schedule(problem, SchedulerType.SIMPLE) + assert isinstance(schedule, list) + assert len(schedule) <= 4 diff --git a/tests/provider/test_scheduling.py b/tests/scheduling/test_scheduling.py similarity index 91% rename from tests/provider/test_scheduling.py rename to tests/scheduling/test_scheduling.py index ccc50c5..885004a 100644 --- a/tests/provider/test_scheduling.py +++ b/tests/scheduling/test_scheduling.py @@ -3,7 +3,8 @@ from src.circuits import create_ghz, create_quantum_only_ghz from src.common import CombinedJob, IBMQBackend -from src.provider import Accelerator, Scheduler +from src.provider import Accelerator +from src.scheduling import Scheduler from src.tools import optimize_circuit_offline @@ -41,8 +42,8 @@ def test_generate_schedule() -> None: [slice(0, 3), slice(3, 5)], [slice(0, 2), slice(2, 4)], ] - for job, qubits in zip(jobs_0, qubits_0): - assert job.mapping == qubits + for _job, qubits in zip(jobs_0, qubits_0): + assert _job.mapping == qubits qubits_1 = [ [slice(0, 5)], [slice(0, 5)], @@ -50,8 +51,8 @@ def test_generate_schedule() -> None: [slice(0, 2), slice(2, 4)], [slice(0, 2)], ] - for job, qubits in zip(jobs_1, qubits_1): - assert job.mapping == qubits + for _job, qubits in zip(jobs_1, qubits_1): + assert _job.mapping == qubits @patch("qiskit_aer.AerSimulator.run") diff --git a/tests/tools/test_assembling.py b/tests/tools/test_assembling.py index 181b476..19c3a94 100644 --- a/tests/tools/test_assembling.py +++ b/tests/tools/test_assembling.py @@ -29,5 +29,5 @@ def test_assemble_and_reconstruct_job() -> None: jobs += jobs_from_experiment(experiment) combined_job = assemble_job([jobs[0], jobs[6]]) - assert combined_job.instance.num_qubits == 5 + assert combined_job.circuit.num_qubits == 5 assert len(combined_job.observable[0]) == 5 diff --git a/tests/tools/test_reconstructing.py b/tests/tools/test_reconstructing.py index 4317186..80e3150 100644 --- a/tests/tools/test_reconstructing.py +++ b/tests/tools/test_reconstructing.py @@ -42,7 +42,7 @@ def test_reconstruct_counts_from_job() -> None: jobs += jobs_from_experiment(experiment) combined_job = assemble_job([jobs[2], jobs[10]]) - combined_job.result_counts = accelerator.run_and_get_counts(combined_job.instance) + combined_job.result_counts = accelerator.run_and_get_counts(combined_job.circuit) jobs = reconstruct_counts_from_job(combined_job) assert len(jobs[0].result_counts) == 8 assert len(jobs[1].result_counts) == 16 @@ -62,7 +62,7 @@ def test_reconstruct_experiments_from_circuits() -> None: for job_1, job_2 in zip(jobs[:6], jobs[6:]): combined_job = assemble_job([job_1, job_2]) combined_job.result_counts = accelerator.run_and_get_counts( - combined_job.instance + combined_job.circuit ) job_results += reconstruct_counts_from_job(combined_job) # TODO how do I know that all circuits from an experiment have run?