Skip to content

Commit

Permalink
Add documentation and type hints. Remove support for monolithic probl…
Browse files Browse the repository at this point in the history
…em to simplify code.
  • Loading branch information
BenjaminRodenberg committed Sep 19, 2024
1 parent 7e4fb6f commit 23ede95
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 80 deletions.
7 changes: 7 additions & 0 deletions oscillator/solver-python/mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[mypy]

[mypy-scipy.*]
ignore_missing_imports = True

[mypy-precice.*]
ignore_missing_imports = True
42 changes: 24 additions & 18 deletions oscillator/solver-python/oscillator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from enum import Enum
import csv
import os
from typing import Type

import problemDefinition
import timeSteppers
from timeSteppers import TimeSteppingSchemes, GeneralizedAlpha, RungeKutta4, RadauIIA


class Participant(Enum):
Expand All @@ -25,20 +26,24 @@ class Participant(Enum):
help="Time stepping scheme being used.",
type=str,
choices=[
s.value for s in timeSteppers.TimeSteppingSchemes],
default=timeSteppers.TimeSteppingSchemes.NEWMARK_BETA.value)
s.value for s in TimeSteppingSchemes],
default=TimeSteppingSchemes.NEWMARK_BETA.value)
args = parser.parse_args()

participant_name = args.participantName

this_mass: Type[problemDefinition.MassRight] | Type[problemDefinition.MassLeft]
other_mass: Type[problemDefinition.MassRight] | Type[problemDefinition.MassLeft]
this_spring: Type[problemDefinition.SpringRight] | Type[problemDefinition.SpringLeft]
connecting_spring = problemDefinition.SpringMiddle

if participant_name == Participant.MASS_LEFT.value:
write_data_name = 'Force-Left'
read_data_name = 'Force-Right'
mesh_name = 'Mass-Left-Mesh'

this_mass = problemDefinition.MassLeft
this_spring = problemDefinition.SpringLeft
connecting_spring = problemDefinition.SpringMiddle
other_mass = problemDefinition.MassRight

elif participant_name == Participant.MASS_RIGHT.value:
Expand All @@ -48,7 +53,6 @@ class Participant(Enum):

this_mass = problemDefinition.MassRight
this_spring = problemDefinition.SpringRight
connecting_spring = problemDefinition.SpringMiddle
other_mass = problemDefinition.MassLeft

else:
Expand Down Expand Up @@ -89,25 +93,27 @@ class Participant(Enum):
a = a0
t = 0

if args.time_stepping == timeSteppers.TimeSteppingSchemes.GENERALIZED_ALPHA.value:
time_stepper = timeSteppers.GeneralizedAlpha(stiffness=stiffness, mass=mass, alpha_f=0.4, alpha_m=0.2)
elif args.time_stepping == timeSteppers.TimeSteppingSchemes.NEWMARK_BETA.value:
time_stepper = timeSteppers.GeneralizedAlpha(stiffness=stiffness, mass=mass, alpha_f=0.0, alpha_m=0.0)
elif args.time_stepping == timeSteppers.TimeSteppingSchemes.RUNGE_KUTTA_4.value:
time_stepper: GeneralizedAlpha | RungeKutta4 | RadauIIA

if args.time_stepping == TimeSteppingSchemes.GENERALIZED_ALPHA.value:
time_stepper = GeneralizedAlpha(stiffness=stiffness, mass=mass, alpha_f=0.4, alpha_m=0.2)
elif args.time_stepping == TimeSteppingSchemes.NEWMARK_BETA.value:
time_stepper = GeneralizedAlpha(stiffness=stiffness, mass=mass, alpha_f=0.0, alpha_m=0.0)
elif args.time_stepping == TimeSteppingSchemes.RUNGE_KUTTA_4.value:
ode_system = np.array([
[0, 1], # du
[-stiffness / mass, 0], # dv
])
time_stepper = timeSteppers.RungeKutta4(ode_system=ode_system)
elif args.time_stepping == timeSteppers.TimeSteppingSchemes.Radau_IIA.value:
time_stepper = RungeKutta4(ode_system=ode_system)
elif args.time_stepping == TimeSteppingSchemes.Radau_IIA.value:
ode_system = np.array([
[0, 1], # du
[-stiffness / mass, 0], # dv
])
time_stepper = timeSteppers.RadauIIA(ode_system=ode_system)
time_stepper = RadauIIA(ode_system=ode_system)
else:
raise Exception(
f"Invalid time stepping scheme {args.time_stepping}. Please use one of {[ts.value for ts in timeSteppers.TimeSteppingSchemes]}")
f"Invalid time stepping scheme {args.time_stepping}. Please use one of {[ts.value for ts in TimeSteppingSchemes]}")


positions = []
Expand All @@ -133,12 +139,12 @@ class Participant(Enum):
precice_dt = participant.get_max_time_step_size()
dt = np.min([precice_dt, my_dt])

def f(t): return participant.read_data(mesh_name, read_data_name, vertex_ids, t)[0]
def f(t: float) -> float: return participant.read_data(mesh_name, read_data_name, vertex_ids, t)[0]

# do time step, write data, and advance
u_new, v_new, a_new, *other = time_stepper.do_step(u, v, a, f, dt)
t_new = t + dt

if other:
# if dense output is available, performed adaptive time stepping. Do multiple write calls per time step
sol, *_ = other
Expand All @@ -153,12 +159,12 @@ def f(t): return participant.read_data(mesh_name, read_data_name, vertex_ids, t)
# perform n_pseudo pseudosteps
dt_pseudo = dt / n_pseudo
t_pseudo += dt_pseudo
write_data = [connecting_spring.k * sol(t_pseudo)[0]]
write_data = np.array([connecting_spring.k * sol(t_pseudo)[0]])
participant.write_data(mesh_name, write_data_name, vertex_ids, write_data)
participant.advance(dt_pseudo)

else: # simple time stepping without dense output; only a single write call per time step
write_data = [connecting_spring.k * u_new]
write_data = np.array([connecting_spring.k * u_new])
participant.write_data(mesh_name, write_data_name, vertex_ids, write_data)
participant.advance(dt)

Expand Down
9 changes: 7 additions & 2 deletions oscillator/solver-python/problemDefinition.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
from numpy.linalg import eig
from typing import Callable


class SpringLeft:
Expand All @@ -23,7 +24,9 @@ class MassLeft:
u0 = 1.0
v0 = 0.0

u_analytical, v_analytical = None, None # will be defined below
# will be defined below
u_analytical: Callable[[float | np.ndarray], float | np.ndarray]
v_analytical: Callable[[float | np.ndarray], float | np.ndarray]


class MassRight:
Expand All @@ -35,7 +38,9 @@ class MassRight:
u0 = 0.0
v0 = 0.0

u_analytical, v_analytical = None, None # will be defined below
# will be defined below
u_analytical: Callable[[float | np.ndarray], float | np.ndarray]
v_analytical: Callable[[float | np.ndarray], float | np.ndarray]


# Mass matrix
Expand Down
138 changes: 78 additions & 60 deletions oscillator/solver-python/timeSteppers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Tuple, List
from typing import Tuple, List, Callable
import numpy as np
import numbers
import scipy as sp
from scipy.integrate import OdeSolution
from enum import Enum


Expand All @@ -13,14 +14,7 @@ class TimeSteppingSchemes(Enum):


class GeneralizedAlpha():
alpha_f = None
alpha_m = None
gamma = None
beta = None
mass = None
stiffness = None

def __init__(self, stiffness, mass, alpha_f=0.4, alpha_m=0.2) -> None:
def __init__(self, stiffness: float, mass: float, alpha_f: float = 0.4, alpha_m: float = 0.2) -> None:
self.alpha_f = alpha_f
self.alpha_m = alpha_m

Expand All @@ -30,32 +24,43 @@ def __init__(self, stiffness, mass, alpha_f=0.4, alpha_m=0.2) -> None:
self.stiffness = stiffness
self.mass = mass

def do_step(self, u0, v0, a0, rhs, dt) -> Tuple[float, float, float]:
f = rhs((1 - self.alpha_f) * dt)

m = 3 * [None]
m[0] = (1 - self.alpha_m) / (self.beta * dt**2)
m[1] = (1 - self.alpha_m) / (self.beta * dt)
m[2] = (1 - self.alpha_m - 2 * self.beta) / (2 * self.beta)
def do_step(self,
u0: float,
v0: float,
a0: float,
rhs: Callable[[float], float],
dt: float
) -> Tuple[float, float, float]:
"""Perform a step with generalized Alpha or Newmark Beta scheme (depends on parameters alpha_f and alpha_m)
Args:
u0 (float): displacement at time t0
v0 (float): velocity at time t0
a0 (float): acceleration at time t0
rhs (Callable[[float],float]): time dependent right-hand side
dt (float): time step size
Returns:
Tuple[float, float, float]: returns computed displacement, velocity, and acceleration at time t1 = t0 + dt.
"""
f = rhs((1.0 - self.alpha_f) * dt)

m = [(1 - self.alpha_m) / (self.beta * dt**2),
(1 - self.alpha_m) / (self.beta * dt),
(1 - self.alpha_m - 2 * self.beta) / (2 * self.beta)]

k_bar = self.stiffness * (1 - self.alpha_f) + m[0] * self.mass

# do generalized alpha step
if (type(self.stiffness)) is np.ndarray:
u1 = np.linalg.solve(
k_bar,
(f - self.alpha_f * self.stiffness.dot(u0) + self.mass.dot((m[0] * u0 + m[1] * v0 + m[2] * a0)))
)
else:
u1 = (f - self.alpha_f * self.stiffness * u0 + self.mass * (m[0] * u0 + m[1] * v0 + m[2] * a0)) / k_bar

u1 = (f - self.alpha_f * self.stiffness * u0 + self.mass * (m[0] * u0 + m[1] * v0 + m[2] * a0)) / k_bar
a1 = 1.0 / (self.beta * dt**2) * (u1 - u0 - dt * v0) - (1 - 2 * self.beta) / (2 * self.beta) * a0
v1 = v0 + dt * ((1 - self.gamma) * a0 + self.gamma * a1)

return u1, v1, a1


class RungeKutta4():
# parameters from Butcher tableau of classic Runge Kutta scheme (RK4)
a = np.array([[0, 0, 0, 0],
[0.5, 0, 0, 0],
[0, 0.5, 0, 0],
Expand All @@ -67,19 +72,30 @@ def __init__(self, ode_system) -> None:
self.ode_system = ode_system
pass

def do_step(self, u0, v0, _, rhs, dt) -> Tuple[float, float, float]:
assert (isinstance(u0, type(v0)))
def do_step(self,
u0: float,
v0: float,
_: float,
rhs: Callable[[float], float],
dt: float
) -> Tuple[float, float, float]:
"""Perform a step with the classic Runge Kutta scheme (4 stages)
Args:
u0 (float): displacement at time t0
v0 (float): velocity at time t0
_ (float): ignored argument (acceleration for other time stepping schemes)
rhs (Callable[[float],float]): time dependent right-hand side
dt (float): time step size
Returns:
Tuple[float, float, float]: returns computed displacement and velocity at time t1 = t0 + dt.
"""

k = 4 * [None] # store stages in list

if isinstance(u0, np.ndarray):
x0 = np.concatenate([u0, v0])
def f(t,x): return self.ode_system.dot(x) + np.concatenate([np.array([0, 0]), rhs(t)])
elif isinstance(u0, numbers.Number):
x0 = np.array([u0, v0])
def f(t,x): return self.ode_system.dot(x) + np.array([0, rhs(t)])
else:
raise Exception(f"Cannot handle input type {type(u0)} of u and v")
x0 = np.array([u0, v0])
def f(t, x): return self.ode_system.dot(x) + np.array([0, rhs(t)])

k[0] = f(self.c[0] * dt, x0)
k[1] = f(self.c[1] * dt, x0 + self.a[1, 0] * k[0] * dt)
Expand All @@ -88,42 +104,44 @@ def f(t,x): return self.ode_system.dot(x) + np.array([0, rhs(t)])

x1 = x0 + dt * sum(b_i * k_i for k_i, b_i in zip(k, self.b))

if isinstance(u0, np.ndarray):
u1 = x1[0:2]
v1 = x1[2:4]
elif isinstance(u0, numbers.Number):
u1 = x1[0]
v1 = x1[1]

return u1, v1, None
return x1[0], x1[1], np.nan


class RadauIIA():
def __init__(self, ode_system) -> None:
self.ode_system = ode_system
pass

def do_step(self, u0, v0, _, rhs, dt) -> Tuple[float, float, float]:
assert (isinstance(u0, type(v0)))
def do_step(self,
u0: float,
v0: float,
_: float,
rhs: Callable[[float], float],
dt: float
) -> Tuple[float, float, float, OdeSolution]:
"""Perform a step with the adaptive RadauIIA time stepper of scipy.integrate
Args:
u0 (float): displacement at time t0
v0 (float): velocity at time t0
_ (float): ignored argument (acceleration for other time stepping schemes)
rhs (Callable[[float],float]): time dependent right-hand side
dt (float): time step size
t0 = 0
Raises:
Exception: if input u0 is malformed
if isinstance(u0, np.ndarray):
x0 = np.concatenate([u0, v0])
def f(t,x): return self.ode_system.dot(x) + np.concatenate([np.array([np.zeros_like(t), np.zeros_like(t)]), rhs(t)])
elif isinstance(u0, numbers.Number):
x0 = np.array([u0, v0])
def f(t,x): return self.ode_system.dot(x) + np.array([np.zeros_like(t), rhs(t)])
else:
raise Exception(f"Cannot handle input type {type(u0)} of u and v")
Returns:
Tuple[float, float, None, OdeSolution]: returns computed displacement and velocity at time t1 = t0 + dt and an OdeSolution with dense output for the integration interval.
"""

t0, t1 = 0, dt

x0 = np.array([u0, v0])
def f(t, x): return self.ode_system.dot(x) + np.array([np.zeros_like(t), rhs(t)])

# use adaptive time stepping; dense_output=True allows us to sample from continuous function later
ret = sp.integrate.solve_ivp(f, [t0, t0 + dt], x0, method="Radau",
ret = sp.integrate.solve_ivp(f, [t0, t1], x0, method="Radau",
dense_output=True, rtol=10e-5, atol=10e-9)

if isinstance(u0, np.ndarray):
u1, v1 = ret.y[0:2, -1], ret.y[2:4, -1]
elif isinstance(u0, numbers.Number):
u1, v1 = ret.y[:, -1]

return u1, v1, None, ret.sol
return ret.y[0, -1], ret.y[1, -1], np.nan, ret.sol

0 comments on commit 23ede95

Please sign in to comment.