Skip to content

Commit

Permalink
tests: clean up some underlining tests function (_run_xxx)
Browse files Browse the repository at this point in the history
Merge the call for compile and run_schemes into `Test_Implementations.test` function,
thus remove the repeated `if compile ... if run ...` part in test functions

Signed-off-by: Thing-han, Lim <[email protected]>
  • Loading branch information
potsrevennil authored and hanno-becker committed Nov 14, 2024
1 parent 77581d2 commit abfaf2d
Showing 1 changed file with 104 additions and 95 deletions.
199 changes: 104 additions & 95 deletions scripts/lib/mlkem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,25 @@ def run_schemes(
else:
return results

def test(
self,
opt: bool,
compile: bool,
run: bool,
extra_make_envs={},
extra_make_args=[],
actual_proc: Callable[[bytes], str] = None,
expect_proc: Callable[[SCHEME, str], tuple[bool, str]] = None,
cmd_prefix: [str] = [],
extra_args: [str] = [],
) -> TypedDict:
if compile:
self.compile(opt, extra_make_envs, extra_make_args)
if run:
return self.run_schemes(
opt, actual_proc, expect_proc, cmd_prefix, extra_args
)


"""
Underlying functional tests
Expand All @@ -272,23 +291,16 @@ def __init__(self, opts: Options):
copts = CompileOptions(
opts.cross_prefix, opts.cflags, opts.arch_flags, opts.auto, opts.verbose
)

self._f = lambda t: Test_Implementations(t, copts)
self.opt = opts.opt

self.verbose = opts.verbose
self._func = Test_Implementations(TEST_TYPES.MLKEM, copts)
self._nistkat = Test_Implementations(TEST_TYPES.NISTKAT, copts)
self._kat = Test_Implementations(TEST_TYPES.KAT, copts)
self._acvp = Test_Implementations(TEST_TYPES.ACVP, copts)
self._bench = Test_Implementations(TEST_TYPES.BENCH, copts)
self._bench_components = Test_Implementations(
TEST_TYPES.BENCH_COMPONENTS, copts
)

self.compile_mode = copts.compile_mode()
self.compile = opts.compile
self.run = opts.run

def _run_func(self, opt: bool):
def _func(self, opt: bool, compile: bool, run: bool) -> TypedDict:
"""Underlying function for functional test"""

def expect(scheme: SCHEME, actual: str) -> tuple[bool, str]:
Expand All @@ -307,32 +319,27 @@ def expect(scheme: SCHEME, actual: str) -> tuple[bool, str]:
f"Failed, expecting {expect}, but getting {actual}" if fail else "",
)

return self._func.run_schemes(
return self._f(TEST_TYPES.MLKEM).test(
opt,
compile,
run,
actual_proc=lambda result: str(result, encoding="utf-8"),
expect_proc=expect,
)

def func(self):
config_logger(self.verbose)

def _func(opt: bool):

if self.compile:
self._func.compile(opt)
if self.run:
return self._run_func(opt)

fail = False
if self.opt.lower() == "all" or self.opt.lower() == "no_opt":
fail = fail or _func(False)
fail = fail or self._func(False, self.compile, self.run)
if self.opt.lower() == "all" or self.opt.lower() == "opt":
fail = fail or _func(True)
fail = fail or self._func(True, self.compile, self.run)

if fail:
exit(1)

def _run_nistkat(self, opt: bool):
def _nistkat(self, opt: bool, compile: bool, run: bool) -> TypedDict:
def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]:
expect = parse_meta(scheme, "nistkat-sha256")
fail = expect != actual
Expand All @@ -342,31 +349,27 @@ def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]:
f"Failed, expecting {expect}, but getting {actual}" if fail else "",
)

return self._nistkat.run_schemes(
return self._f(TEST_TYPES.NISTKAT).test(
opt,
compile,
run,
actual_proc=sha256sum,
expect_proc=expect_proc,
)

def nistkat(self):
config_logger(self.verbose)

def _nistkat(opt: bool):
if self.compile:
self._nistkat.compile(opt)
if self.run:
return self._run_nistkat(opt)

fail = False
if self.opt.lower() == "all" or self.opt.lower() == "no_opt":
fail = fail or _nistkat(False)
fail = fail or self._nistkat(False, self.compile, self.run)
if self.opt.lower() == "all" or self.opt.lower() == "opt":
fail = fail or _nistkat(True)
fail = fail or self._nistkat(True, self.compile, self.run)

if fail:
exit(1)

def _run_kat(self, opt: bool):
def _kat(self, opt: bool, compile: bool, run: bool) -> TypedDict:
def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]:
expect = parse_meta(scheme, "kat-sha256")
fail = expect != actual
Expand All @@ -376,32 +379,28 @@ def expect_proc(scheme: SCHEME, actual: str) -> tuple[bool, str]:
f"Failed, expecting {expect}, but getting {actual}" if fail else "",
)

return self._kat.run_schemes(
return self._f(TEST_TYPES.KAT).test(
opt,
compile,
run,
actual_proc=sha256sum,
expect_proc=expect_proc,
)

def kat(self):
config_logger(self.verbose)

def _kat(opt: bool):
if self.compile:
self._kat.compile(opt)
if self.run:
return self._run_kat(opt)

fail = False

if self.opt.lower() == "all" or self.opt.lower() == "no_opt":
fail = fail or _kat(False)
fail = fail or self._kat(False, self.compile, self.run)
if self.opt.lower() == "all" or self.opt.lower() == "opt":
fail = fail or _kat(True)
fail = fail or self._kat(True, self.compile, self.run)

if fail:
exit(1)

def _run_acvp(self, opt: bool, acvp_dir: str = "test/acvp_data"):
def _run_acvp(self, opt: bool, acvp_dir: str = "test/acvp_data") -> bool:
acvp_keygen_json = f"{acvp_dir}/acvp_keygen_internalProjection.json"
acvp_encapDecap_json = f"{acvp_dir}/acvp_encapDecap_internalProjection.json"

Expand Down Expand Up @@ -468,7 +467,7 @@ def init_results() -> TypedDict:
f"c={tc['c']}",
]

rs = self._acvp.run_scheme(
rs = self._f(TEST_TYPES.ACVP).run_scheme(
opt,
scheme,
extra_args=extra_args,
Expand Down Expand Up @@ -505,7 +504,7 @@ def init_results() -> TypedDict:
f"d={tc['d']}",
]

rs = self._acvp.run_scheme(
rs = self._f(TEST_TYPES.ACVP).run_scheme(
opt,
scheme,
extra_args=extra_args,
Expand All @@ -526,29 +525,32 @@ def init_results() -> TypedDict:

return fail

def _acvp(self, opt: bool, compmile: bool, run: bool, acvp_dir: str) -> bool:
if compile:
self._f(TEST_TYPES.ACVP).compile(opt)
if run:
return self._run_acvp(opt, acvp_dir)

def acvp(self, acvp_dir: str):
config_logger(self.verbose)

def _acvp(opt: bool):
if self.compile:
self._acvp.compile(opt)
if self.run:
return self._run_acvp(opt, acvp_dir)

fail = False

if self.opt.lower() == "all" or self.opt.lower() == "no_opt":
fail = fail or _acvp(False)
fail = fail or self._acvp(False, self.compile, self.run, acvp_dir)
if self.opt.lower() == "all" or self.opt.lower() == "opt":
fail = fail or _acvp(True)
fail = fail or self._acvp(True, self.compile, self.run, acvp_dir)

if fail:
exit(1)

def _run_bench(
def _bench(
self,
t: Test_Implementations,
t: TEST_TYPES,
opt: bool,
compile: bool,
run: bool,
cycles,
run_as_root: bool,
exec_wrapper: str,
mac_taskpolicy,
Expand All @@ -572,7 +574,13 @@ def _run_bench(
exec_wrapper = exec_wrapper.split(" ")
cmd_prefix = cmd_prefix + exec_wrapper

return t.run_schemes(opt, cmd_prefix=cmd_prefix)
return self._f(t).test(
opt,
compile,
run,
extra_make_args=[f"CYCLES={cycles}"],
cmd_prefix=cmd_prefix,
)

def bench(
self,
Expand All @@ -586,34 +594,42 @@ def bench(
config_logger(self.verbose)

if components is False:
t = self._bench
t = TEST_TYPES.BENCH
else:
t = self._bench_components
t = TEST_TYPES.BENCH_COMPONENTS
output = False

# NOTE: We haven't yet decided how to output both opt/no-opt benchmark results
if self.opt.lower() == "all":
if self.compile:
t.compile(False, extra_make_args=[f"CYCLES={cycles}"])
if self.run:
self._run_bench(t, False, run_as_root, exec_wrapper)
if self.compile:
t.compile(True, extra_make_args=[f"CYCLES={cycles}"])
if self.run:
resultss = self._run_bench(t, True, run_as_root, exec_wrapper)
# NOTE: We haven't yet decided how to output both opt/no-opt benchmark results
self._bench(
t,
False,
self.compile,
self.run,
cycles,
run_as_root,
exec_wrapper,
mac_taskpolicy,
)
resultss = self._bench(
t,
True,
self.compile,
self.run,
cycles,
run_as_root,
exec_wrapper,
mac_taskpolicy,
)
else:
if self.compile:
t.compile(
True if self.opt.lower() == "opt" else False,
extra_make_args=[f"CYCLES={cycles}"],
)
if self.run:
resultss = self._run_bench(
t,
True if self.opt.lower() == "opt" else False,
run_as_root,
exec_wrapper,
)
resultss = self._bench(
t,
self.compile,
self.run,
True if self.opt.lower() == "opt" else False,
run_as_root,
exec_wrapper,
)

if resultss is None:
exit(0)
Expand Down Expand Up @@ -654,33 +670,26 @@ def all(self, func: bool, kat: bool, nistkat: bool, acvp: bool):

def all(opt: bool):
code = 0
if self.compile:
compiles = [
*([self._func.compile] if func else []),
*([self._nistkat.compile] if nistkat else []),
*([self._kat.compile] if kat else []),
*([self._acvp.compile] if acvp else []),
]
fs = [
*([self._func] if func else []),
*([self._nistkat] if nistkat else []),
*([self._kat] if kat else []),
*([self._acvp] if acvp else []),
]

for f in compiles:
if self.compile:
for f in fs:
try:
f(opt)
f(opt, True, False)
except SystemExit as e:
code = code or e

sys.stdout.flush()

if self.run:
runs = [
*([self._run_func] if func else []),
*([self._run_nistkat] if nistkat else []),
*([self._run_kat] if kat else []),
*([self._run_acvp] if acvp else []),
]

for f in runs:
for f in fs:
try:
code = code or int(f(opt))
code = code or int(f(opt, False, True))
except SystemExit as e:
code = code or e

Expand Down

0 comments on commit abfaf2d

Please sign in to comment.