diff --git a/.gitignore b/.gitignore index 0850b563d..a6f4b5c8d 100644 --- a/.gitignore +++ b/.gitignore @@ -147,3 +147,8 @@ dmypy.json # Pyre type checker .pyre/ + + +# acados +acados_ocp_nlp.json +*c_generated_code diff --git a/README.md b/README.md index 24c94fc78..4cfbc9088 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,13 @@ or sudo apt-get install libgmp-dev ``` + #### (optional) Additional requirements for MPC + +You may need to separately install [`acados`](https://github.com/acados/acados) for fast MPC implementations. + +- To build and install acados, see their [installation guide](https://docs.acados.org/installation/index.html). +- To set up the acados python interface, check out [these installtion steps](https://docs.acados.org/python_interface/index.html). + ## Architecture Overview of [`safe-control-gym`](https://arxiv.org/abs/2109.06325)'s API: diff --git a/examples/mpc/config_overrides/cartpole/mpc_acados_cartpole_stabilization.yaml b/examples/mpc/config_overrides/cartpole/mpc_acados_cartpole_stabilization.yaml new file mode 100644 index 000000000..c9f361bf4 --- /dev/null +++ b/examples/mpc/config_overrides/cartpole/mpc_acados_cartpole_stabilization.yaml @@ -0,0 +1,17 @@ +algo: mpc_acados +algo_config: + horizon: 20 + r_mpc: + - 0.1 + q_mpc: + - 5.0 + - 0.1 + - 5.0 + - 0.1 + # Prior info + prior_info: + prior_prop: null + randomize_prior_prop: False + prior_prop_rand_info: null + warmstart: True + use_RTI: False diff --git a/examples/mpc/config_overrides/cartpole/mpc_acados_cartpole_tracking.yaml b/examples/mpc/config_overrides/cartpole/mpc_acados_cartpole_tracking.yaml new file mode 100644 index 000000000..c9f361bf4 --- /dev/null +++ b/examples/mpc/config_overrides/cartpole/mpc_acados_cartpole_tracking.yaml @@ -0,0 +1,17 @@ +algo: mpc_acados +algo_config: + horizon: 20 + r_mpc: + - 0.1 + q_mpc: + - 5.0 + - 0.1 + - 5.0 + - 0.1 + # Prior info + prior_info: + prior_prop: null + randomize_prior_prop: False + prior_prop_rand_info: null + warmstart: True + use_RTI: False diff --git a/examples/mpc/config_overrides/quadrotor_2D/mpc_acados_quadrotor_2D_stabilization.yaml b/examples/mpc/config_overrides/quadrotor_2D/mpc_acados_quadrotor_2D_stabilization.yaml new file mode 100644 index 000000000..262159f29 --- /dev/null +++ b/examples/mpc/config_overrides/quadrotor_2D/mpc_acados_quadrotor_2D_stabilization.yaml @@ -0,0 +1,20 @@ +algo: mpc_acados +algo_config: + horizon: 20 + r_mpc: + - 0.1 + - 0.1 + q_mpc: + - 5.0 + - 0.1 + - 5.0 + - 0.1 + - 0.1 + - 0.1 + # Prior info + prior_info: + prior_prop: null + randomize_prior_prop: False + prior_prop_rand_info: null + warmstart: True + use_RTI: False diff --git a/examples/mpc/config_overrides/quadrotor_2D/mpc_acados_quadrotor_2D_tracking.yaml b/examples/mpc/config_overrides/quadrotor_2D/mpc_acados_quadrotor_2D_tracking.yaml new file mode 100644 index 000000000..262159f29 --- /dev/null +++ b/examples/mpc/config_overrides/quadrotor_2D/mpc_acados_quadrotor_2D_tracking.yaml @@ -0,0 +1,20 @@ +algo: mpc_acados +algo_config: + horizon: 20 + r_mpc: + - 0.1 + - 0.1 + q_mpc: + - 5.0 + - 0.1 + - 5.0 + - 0.1 + - 0.1 + - 0.1 + # Prior info + prior_info: + prior_prop: null + randomize_prior_prop: False + prior_prop_rand_info: null + warmstart: True + use_RTI: False diff --git a/examples/mpc/config_overrides/quadrotor_3D/mpc_acados_quadrotor_3D_stabilization.yaml b/examples/mpc/config_overrides/quadrotor_3D/mpc_acados_quadrotor_3D_stabilization.yaml new file mode 100644 index 000000000..b6254f0a2 --- /dev/null +++ b/examples/mpc/config_overrides/quadrotor_3D/mpc_acados_quadrotor_3D_stabilization.yaml @@ -0,0 +1,28 @@ +algo: mpc_acados +algo_config: + horizon: 20 + r_mpc: + - 0.1 + - 0.1 + - 0.1 + - 0.1 + q_mpc: + - 5.0 + - 0.1 + - 5.0 + - 0.1 + - 5.0 + - 0.1 + - 0.1 + - 0.1 + - 0.1 + - 0.1 + - 0.1 + - 0.1 + # Prior info + prior_info: + prior_prop: null + randomize_prior_prop: False + prior_prop_rand_info: null + warmstart: True + use_RTI: False diff --git a/examples/mpc/config_overrides/quadrotor_3D/mpc_acados_quadrotor_3D_tracking.yaml b/examples/mpc/config_overrides/quadrotor_3D/mpc_acados_quadrotor_3D_tracking.yaml new file mode 100644 index 000000000..b6254f0a2 --- /dev/null +++ b/examples/mpc/config_overrides/quadrotor_3D/mpc_acados_quadrotor_3D_tracking.yaml @@ -0,0 +1,28 @@ +algo: mpc_acados +algo_config: + horizon: 20 + r_mpc: + - 0.1 + - 0.1 + - 0.1 + - 0.1 + q_mpc: + - 5.0 + - 0.1 + - 5.0 + - 0.1 + - 5.0 + - 0.1 + - 0.1 + - 0.1 + - 0.1 + - 0.1 + - 0.1 + - 0.1 + # Prior info + prior_info: + prior_prop: null + randomize_prior_prop: False + prior_prop_rand_info: null + warmstart: True + use_RTI: False diff --git a/examples/mpc/mpc_experiment.sh b/examples/mpc/mpc_experiment.sh index 4d2ddaed6..821f3b93f 100755 --- a/examples/mpc/mpc_experiment.sh +++ b/examples/mpc/mpc_experiment.sh @@ -2,14 +2,15 @@ # MPC and Linear MPC Experiment. -#SYS='cartpole' -#SYS='quadrotor_2D' +# SYS='cartpole' +# SYS='quadrotor_2D' SYS='quadrotor_3D' #TASK='stabilization' TASK='tracking' -#ALGO='mpc' +# ALGO='mpc' +# ALGO='mpc_acados' ALGO='linear_mpc' if [ "$SYS" == 'cartpole' ]; then diff --git a/safe_control_gym/controllers/__init__.py b/safe_control_gym/controllers/__init__.py index f08271727..f019f582b 100644 --- a/safe_control_gym/controllers/__init__.py +++ b/safe_control_gym/controllers/__init__.py @@ -49,3 +49,7 @@ register(idx='rap', entry_point='safe_control_gym.controllers.rarl.rap:RAP', config_entry_point='safe_control_gym.controllers.rarl:rap.yaml') + +register(idx='mpc_acados', + entry_point='safe_control_gym.controllers.mpc.mpc_acados:MPC_ACADOS', + config_entry_point='safe_control_gym.controllers.mpc:mpc_acados.yaml') diff --git a/safe_control_gym/controllers/mpc/gp_mpc.py b/safe_control_gym/controllers/mpc/gp_mpc.py index 0333c1210..d9d7901bd 100644 --- a/safe_control_gym/controllers/mpc/gp_mpc.py +++ b/safe_control_gym/controllers/mpc/gp_mpc.py @@ -466,14 +466,14 @@ def setup_gp_optimizer(self, n_ind_points): cost += cost_func(x=x_var[:, i], u=u_var[:, i], Xr=x_ref[:, i], - Ur=np.zeros((nu, 1)), + Ur=self.U_EQ, Q=self.Q, R=self.R)['l'] # Terminal cost. cost += cost_func(x=x_var[:, -1], u=np.zeros((nu, 1)), Xr=x_ref[:, -1], - Ur=np.zeros((nu, 1)), + Ur=self.U_EQ, Q=self.Q, R=self.R)['l'] z = cs.vertcat(x_var[:, :-1], u_var) diff --git a/safe_control_gym/controllers/mpc/linear_mpc.py b/safe_control_gym/controllers/mpc/linear_mpc.py index 1858faa66..10e71a13e 100644 --- a/safe_control_gym/controllers/mpc/linear_mpc.py +++ b/safe_control_gym/controllers/mpc/linear_mpc.py @@ -11,11 +11,13 @@ import casadi as cs import numpy as np +from termcolor import colored from safe_control_gym.controllers.lqr.lqr_utils import discretize_linear_system from safe_control_gym.controllers.mpc.mpc import MPC from safe_control_gym.controllers.mpc.mpc_utils import compute_discrete_lqr_gain_from_cont_linear_system from safe_control_gym.envs.benchmark_env import Task +from safe_control_gym.utils.utils import timing class LinearMPC(MPC): @@ -29,6 +31,7 @@ def __init__( r_mpc=[1], warmstart=True, soft_constraints=False, + soft_penalty: float = 10000, terminate_run_on_done=True, constraint_tol: float = 1e-8, solver: str = 'sqpmethod', @@ -36,6 +39,8 @@ def __init__( # shared/base args output_dir='results/temp', additional_constraints=None, + use_lqr_gain_and_terminal_cost: bool = False, + compute_initial_guess_method=None, **kwargs): '''Creates task and controller. @@ -51,6 +56,8 @@ def __init__( solver (str): Specify which solver you wish to use (qrqp, qpoases, ipopt, sqpmethod) output_dir (str): output directory to write logs and results. additional_constraints (list): list of constraints. + use_lqr_gain_and_terminal_cost (bool): Use LQR ancillary gain and terminal cost in the MPC. + compute_initial_guess_method (str): Method to compute the initial guess for the MPC. Options: None, 'ipopt', 'lqr'. ''' # Store all params/args. for k, v in locals().items(): @@ -64,18 +71,18 @@ def __init__( r_mpc=r_mpc, warmstart=warmstart, soft_constraints=soft_constraints, + soft_penalty=soft_penalty, terminate_run_on_done=terminate_run_on_done, constraint_tol=constraint_tol, # prior_info=prior_info, output_dir=output_dir, additional_constraints=additional_constraints, + use_lqr_gain_and_terminal_cost=use_lqr_gain_and_terminal_cost, + compute_initial_guess_method=compute_initial_guess_method, **kwargs ) # TODO: setup environment equilibrium - # self.X_EQ = np.atleast_2d(self.env.X_GOAL)[0,:].T - # self.U_EQ = np.atleast_2d(self.env.U_GOAL)[0,:] - self.X_EQ = np.atleast_2d(self.model.X_EQ)[0, :].T self.U_EQ = np.atleast_2d(self.model.U_EQ)[0, :].T assert solver in ['qpoases', 'qrqp', 'sqpmethod', 'ipopt'], '[Error]. MPC Solver not supported.' @@ -105,36 +112,22 @@ def set_dynamics_func(self): [x_dot_lin], ['x0', 'p'], ['xf']) + self.lqr_gain, _, _, self.P = compute_discrete_lqr_gain_from_cont_linear_system(dfdx, + dfdu, + self.Q, + self.R, + self.dt) self.dfdx = dfdx self.dfdu = dfdu - def compute_initial_guess(self, init_state, goal_states, x_lin, u_lin): - '''Use LQR to get an initial guess of the ''' - dfdxdfdu = self.model.df_func(x=x_lin, u=u_lin) - dfdx = dfdxdfdu['dfdx'].toarray() - dfdu = dfdxdfdu['dfdu'].toarray() - lqr_gain, _, _ = compute_discrete_lqr_gain_from_cont_linear_system(dfdx, dfdu, self.Q, self.R, self.dt) - - x_guess = np.zeros((self.model.nx, self.T + 1)) - u_guess = np.zeros((self.model.nu, self.T)) - x_guess[:, 0] = init_state - - for i in range(self.T): - u = lqr_gain @ (x_guess[:, i] - goal_states[:, i]) + u_lin - u_guess[:, i] = u - x_guess[:, i + 1, None] = self.linear_dynamics_func(x0=x_guess[:, i], p=u)['xf'].toarray() - - return x_guess, u_guess - - def setup_optimizer(self): + def setup_optimizer(self, solver='qrqp'): '''Sets up convex optimization problem. - Including cost objective, variable bounds and dynamics constraints. ''' nx, nu = self.model.nx, self.model.nu T = self.T # Define optimizer and variables. - if self.solver in ['qrqp', 'qpoases']: + if solver in ['qrqp', 'qpoases']: opti = cs.Opti('conic') else: opti = cs.Opti() @@ -157,15 +150,15 @@ def setup_optimizer(self): cost += cost_func(x=x_var[:, i] + self.X_EQ[:, None], u=u_var[:, i] + self.U_EQ[:, None], Xr=x_ref[:, i], - Ur=np.zeros((nu, 1)), + Ur=self.U_EQ, Q=self.Q, R=self.R)['l'] # Terminal cost. cost += cost_func(x=x_var[:, -1] + self.X_EQ[:, None], u=np.zeros((nu, 1)) + self.U_EQ[:, None], Xr=x_ref[:, -1], - Ur=np.zeros((nu, 1)), - Q=self.Q, + Ur=self.U_EQ, + Q=self.Q if not self.use_lqr_gain_and_terminal_cost else self.P, R=self.R)['l'] for i in range(self.T): # Dynamics constraints. @@ -173,7 +166,7 @@ def setup_optimizer(self): opti.subject_to(x_var[:, i + 1] == next_state) # State and input constraints - soft_con_coeff = 10 + soft_con_coeff = self.soft_penalty for sc_i, state_constraint in enumerate(self.state_constraints_sym): if self.soft_constraints: opti.subject_to(state_constraint(x_var[:, i] + self.X_EQ.T) <= state_slack[sc_i]) @@ -206,7 +199,7 @@ def setup_optimizer(self): opts = {'expand': True} if platform == 'linux': opts.update({'print_time': 1, 'print_header': 0}) - opti.solver(self.solver, opts) + opti.solver(solver, opts) elif platform == 'darwin': opts.update({'ipopt.max_iter': 100}) opti.solver('ipopt', opts) @@ -222,6 +215,7 @@ def setup_optimizer(self): 'cost': cost } + @timing def select_action(self, obs, info=None @@ -263,18 +257,20 @@ def select_action(self, self.results_dict['horizon_inputs'].append(deepcopy(self.u_prev) + self.U_EQ[:, None]) except RuntimeError as e: print(e) + print(colored('Infeasible MPC Problem', 'red')) return_status = opti.return_status() + print(colored(f'Optimization failed with status: {return_status}', 'red')) if return_status == 'unknown': - self.terminate_loop = True - u_val = self.u_prev - if u_val is None: - print('[WARN]: MPC Infeasible first step.') - u_val = np.zeros((1, self.model.nu)) - elif return_status == 'Maximum_Iterations_Exceeded': - self.terminate_loop = True - u_val = opti.debug.value(u_var) - elif return_status == 'Search_Direction_Becomes_Too_Small': - self.terminate_loop = True + # self.terminate_loop = True + if self.u_prev is None: + print(colored('[WARN]: MPC Infeasible first step.', 'yellow')) + u_val = np.zeros((self.model.nu, self.T)) + x_val = np.zeros((self.model.nx, self.T + 1)) + else: + u_val = self.u_prev + x_val = self.x_prev + elif return_status in ['Infeasible_Problem_Detected', 'Infeasible_Problem']: + # self.terminate_loop = True u_val = opti.debug.value(u_var) # take first one from solved action sequence @@ -283,5 +279,7 @@ def select_action(self, else: action = np.array([u_val[0]]) action += self.U_EQ + if self.use_lqr_gain_and_terminal_cost: + action += self.lqr_gain @ (obs - x_val[:, 0]) self.prev_action = action return action diff --git a/safe_control_gym/controllers/mpc/mpc.py b/safe_control_gym/controllers/mpc/mpc.py index faaba5c15..ac2ed4e59 100644 --- a/safe_control_gym/controllers/mpc/mpc.py +++ b/safe_control_gym/controllers/mpc/mpc.py @@ -1,16 +1,18 @@ '''Model Predictive Control.''' - from copy import deepcopy import casadi as cs import numpy as np +from termcolor import colored from safe_control_gym.controllers.base_controller import BaseController +from safe_control_gym.controllers.lqr.lqr_utils import discretize_linear_system from safe_control_gym.controllers.mpc.mpc_utils import (compute_discrete_lqr_gain_from_cont_linear_system, compute_state_rmse, get_cost_weight_matrix, reset_constraints, rk_discrete) from safe_control_gym.envs.benchmark_env import Task from safe_control_gym.envs.constraints import GENERAL_CONSTRAINTS, create_constraint_list +from safe_control_gym.utils.utils import timing class MPC(BaseController): @@ -24,6 +26,7 @@ def __init__( r_mpc: list = [1], warmstart: bool = True, soft_constraints: bool = False, + soft_penalty: float = 10000, terminate_run_on_done: bool = True, constraint_tol: float = 1e-6, # runner args @@ -32,6 +35,10 @@ def __init__( additional_constraints: list = None, use_gpu: bool = False, seed: int = 0, + compute_initial_guess_method: str = 'ipopt', + use_lqr_gain_and_terminal_cost: bool = False, + init_solver: str = 'ipopt', + solver: str = 'ipopt', **kwargs ): '''Creates task and controller. @@ -43,20 +50,23 @@ def __init__( r_mpc (list): diagonals of input/action cost weight. warmstart (bool): if to initialize from previous iteration. soft_constraints (bool): Formulate the constraints as soft constraints. + soft_penalty (float): Penalty added in the cost function for soft constraints. terminate_run_on_done (bool): Terminate the run when the environment returns done or not. constraint_tol (float): Tolerance to add the the constraint as sometimes solvers are not exact. output_dir (str): output directory to write logs and results. additional_constraints (list): List of additional constraints use_gpu (bool): False (use cpu) True (use cuda). seed (int): random seed. + compute_initial_guess_method (str): Method to compute the initial guess. Options: None, 'ipopt', 'lqr'. + use_lqr_gain_and_terminal_cost (bool): Use the LQR ancillary gain and terminal cost in the MPC. + init_solver (str): Solver to use for initial guess computation. + solver (str): Solver to use for MPC optimization. ''' super().__init__(env_func=env_func, output_dir=output_dir, use_gpu=use_gpu, seed=seed, **kwargs) for k, v in locals().items(): if k != 'self' and k != 'kwargs' and '__' not in k: self.__dict__.update({k: v}) - # if prior_info is None: - # self.prior_info = {} # Task. self.env = env_func() if additional_constraints is not None: @@ -74,16 +84,19 @@ def __init__( self.T = horizon self.Q = get_cost_weight_matrix(self.q_mpc, self.model.nx) self.R = get_cost_weight_matrix(self.r_mpc, self.model.nu) - # self.prior_info = prior_info + self.constraint_tol = constraint_tol self.soft_constraints = soft_constraints + self.soft_penalty = soft_penalty self.warmstart = warmstart self.terminate_run_on_done = terminate_run_on_done - # self.X_EQ = self.env.X_EQ - # self.U_EQ = self.env.U_EQ - # logging - # self.logger = ExperimentLogger(output_dir) + self.X_EQ = self.env.X_GOAL + self.U_EQ = self.env.U_GOAL + self.compute_initial_guess_method = compute_initial_guess_method + self.use_lqr_gain_and_terminal_cost = use_lqr_gain_and_terminal_cost + self.init_solver = init_solver + self.solver = solver def add_constraints(self, constraints @@ -116,6 +129,7 @@ def close(self): def reset(self): '''Prepares for training or evaluation.''' + print(colored('Resetting MPC', 'green')) # Setup reference input. if self.env.TASK == Task.STABILIZATION: self.mode = 'stabilization' @@ -128,7 +142,7 @@ def reset(self): # Dynamics model. self.set_dynamics_func() # CasADi optimizer. - self.setup_optimizer() + self.setup_optimizer(self.solver) # Previously solved states & inputs, useful for warm start. self.x_prev = None self.u_prev = None @@ -137,38 +151,85 @@ def reset(self): def set_dynamics_func(self): '''Updates symbolic dynamics with actual control frequency.''' - # self.dynamics_func = cs.integrator('fd', 'rk', - # { - # 'x': self.model.x_sym, - # 'p': self.model.u_sym, - # 'ode': self.model.x_dot - # }, - # {'tf': self.dt}) + # linear dynamics for LQR ancillary gain and terminal cost + dfdxdfdu = self.model.df_func(x=np.atleast_2d(self.model.X_EQ)[0, :].T, + u=np.atleast_2d(self.model.U_EQ)[0, :].T) + dfdx = dfdxdfdu['dfdx'].toarray() + dfdu = dfdxdfdu['dfdu'].toarray() + delta_x = cs.MX.sym('delta_x', self.model.nx, 1) + delta_u = cs.MX.sym('delta_u', self.model.nu, 1) + Ad, Bd = discretize_linear_system(dfdx, dfdu, self.dt, exact=True) + x_dot_lin = Ad @ delta_x + Bd @ delta_u + self.linear_dynamics_func = cs.Function('linear_discrete_dynamics', + [delta_x, delta_u], + [x_dot_lin], + ['x0', 'p'], + ['xf']) + self.dfdx = dfdx + self.dfdu = dfdu + self.lqr_gain, _, _, self.P = compute_discrete_lqr_gain_from_cont_linear_system(dfdx, + dfdu, + self.Q, + self.R, + self.dt) + # nonlinear dynamics self.dynamics_func = rk_discrete(self.model.fc_func, self.model.nx, self.model.nu, self.dt) - def compute_initial_guess(self, init_state, goal_states, x_lin, u_lin): - '''Use LQR to get an initial guess of the ''' - dfdxdfdu = self.model.df_func(x=x_lin, u=u_lin) - dfdx = dfdxdfdu['dfdx'].toarray() - dfdu = dfdxdfdu['dfdu'].toarray() - lqr_gain, _, _ = compute_discrete_lqr_gain_from_cont_linear_system(dfdx, dfdu, self.Q, self.R, self.dt) + @timing + def compute_initial_guess(self, init_state, goal_states=None): + '''Compute an initial guess of the solution to the optimization problem.''' + if goal_states is None: + goal_states = self.get_references() + print(colored(f'computing initial guess using {self.compute_initial_guess_method}', 'green')) + if self.compute_initial_guess_method == 'ipopt': + self.setup_optimizer(solver=self.init_solver) + opti_dict = self.opti_dict + opti = opti_dict['opti'] + x_var = opti_dict['x_var'] # optimization variables + u_var = opti_dict['u_var'] # optimization variables + x_init = opti_dict['x_init'] # initial state + x_ref = opti_dict['x_ref'] # reference state/trajectory + + # Assign the initial state. + opti.set_value(x_init, init_state) # initial state should have dim (nx,) + # Assign reference trajectory within horizon. + opti.set_value(x_ref, goal_states) + # Solve the optimization problem. + try: + sol = opti.solve() + x_val, u_val = sol.value(x_var), sol.value(u_var) + except RuntimeError: + print(colored('Warm-starting fails', 'red')) + x_val, u_val = opti.debug.value(x_var), opti.debug.value(u_var) + x_guess = x_val + u_guess = u_val + elif self.compute_initial_guess_method == 'lqr': + # initialize the guess solutions + x_guess = np.zeros((self.model.nx, self.T + 1)) + u_guess = np.zeros((self.model.nu, self.T)) + x_guess[:, 0] = init_state + # add the lqr gain and states to the guess + for i in range(self.T): + u = self.lqr_gain @ (x_guess[:, i] - goal_states[:, i]) + np.atleast_2d(self.model.U_EQ)[0, :].T + u_guess[:, i] = u + x_guess[:, i + 1, None] = self.dynamics_func(x0=x_guess[:, i], p=u)['xf'].toarray() + else: + raise Exception('Initial guess method not implemented.') - x_guess = np.zeros((self.model.nx, self.T + 1)) - u_guess = np.zeros((self.model.nu, self.T)) - x_guess[:, 0] = init_state + self.x_prev = x_guess + self.u_prev = u_guess - for i in range(self.T): - u = lqr_gain @ (x_guess[:, i] - goal_states[:, i]) + u_lin - u_guess[:, i] = u - x_guess[:, i + 1, None] = self.dynamics_func(x0=x_guess[:, i], p=u)['xf'].toarray() + # set the solver back + self.setup_optimizer(solver=self.solver) return x_guess, u_guess - def setup_optimizer(self): + def setup_optimizer(self, solver='qrsqp'): '''Sets up nonlinear optimization problem.''' + print(colored(f'Setting up optimizer with {solver}', 'green')) nx, nu = self.model.nx, self.model.nu T = self.T # Define optimizer and variables. @@ -193,15 +254,15 @@ def setup_optimizer(self): cost += cost_func(x=x_var[:, i], u=u_var[:, i], Xr=x_ref[:, i], - Ur=np.zeros((nu, 1)), + Ur=self.U_EQ, Q=self.Q, R=self.R)['l'] # Terminal cost. cost += cost_func(x=x_var[:, -1], u=np.zeros((nu, 1)), Xr=x_ref[:, -1], - Ur=np.zeros((nu, 1)), - Q=self.Q, + Ur=self.U_EQ, + Q=self.Q if not self.use_lqr_gain_and_terminal_cost else self.P, R=self.R)['l'] # Constraints for i in range(self.T): @@ -212,14 +273,14 @@ def setup_optimizer(self): for sc_i, state_constraint in enumerate(self.state_constraints_sym): if self.soft_constraints: opti.subject_to(state_constraint(x_var[:, i]) <= state_slack[sc_i]) - cost += 10000 * state_slack[sc_i]**2 + cost += self.soft_penalty * state_slack[sc_i]**2 opti.subject_to(state_slack[sc_i] >= 0) else: opti.subject_to(state_constraint(x_var[:, i]) < -self.constraint_tol) for ic_i, input_constraint in enumerate(self.input_constraints_sym): if self.soft_constraints: opti.subject_to(input_constraint(u_var[:, i]) <= input_slack[ic_i]) - cost += 10000 * input_slack[ic_i]**2 + cost += self.soft_penalty * input_slack[ic_i]**2 opti.subject_to(input_slack[ic_i] >= 0) else: opti.subject_to(input_constraint(u_var[:, i]) < -self.constraint_tol) @@ -228,7 +289,7 @@ def setup_optimizer(self): for sc_i, state_constraint in enumerate(self.state_constraints_sym): if self.soft_constraints: opti.subject_to(state_constraint(x_var[:, -1]) <= state_slack[sc_i]) - cost += 10000 * state_slack[sc_i] ** 2 + cost += self.soft_penalty * state_slack[sc_i] ** 2 opti.subject_to(state_slack[sc_i] >= 0) else: opti.subject_to(state_constraint(x_var[:, -1]) <= -self.constraint_tol) @@ -236,10 +297,10 @@ def setup_optimizer(self): opti.subject_to(x_var[:, 0] == x_init) opti.minimize(cost) - # Create solver (IPOPT solver in this version) - # opts = {'ipopt.print_level': 0, 'ipopt.sb': 'yes', 'print_time': 0} - opts = {'expand': True} - opti.solver('ipopt', opts) + # Create solver + opts = {'expand': True, 'error_on_fail': False} + opti.solver(solver, opts) + self.opti_dict = { 'opti': opti, 'x_var': x_var, @@ -249,6 +310,7 @@ def setup_optimizer(self): 'cost': cost } + @timing def select_action(self, obs, info=None @@ -262,26 +324,23 @@ def select_action(self, Returns: action (ndarray): Input/action to the task/env. ''' - opti_dict = self.opti_dict opti = opti_dict['opti'] - x_var = opti_dict['x_var'] - u_var = opti_dict['u_var'] - x_init = opti_dict['x_init'] - x_ref = opti_dict['x_ref'] + x_var = opti_dict['x_var'] # optimization variables + u_var = opti_dict['u_var'] # optimization variables + x_init = opti_dict['x_init'] # initial state + x_ref = opti_dict['x_ref'] # reference state/trajectory # Assign the initial state. opti.set_value(x_init, obs) # Assign reference trajectory within horizon. goal_states = self.get_references() opti.set_value(x_ref, goal_states) - if self.mode == 'tracking': - self.traj_step += 1 - # if self.warmstart and self.x_prev is None and self.u_prev is None: - # x_guess, u_guess = self.compute_initial_guess(obs, goal_states, self.X_EQ, self.U_EQ) - # opti.set_initial(x_var, x_guess) - # opti.set_initial(u_var, u_guess) # Initial guess for optimization problem. - # elif self.warmstart and self.x_prev is not None and self.u_prev is not None: + + if self.compute_initial_guess_method is not None and self.x_prev is None and self.u_prev is None: + x_guess, u_guess = self.compute_initial_guess(obs) + opti.set_initial(x_var, x_guess) + opti.set_initial(u_var, u_guess) # Initial guess for optimization problem. if self.warmstart and self.x_prev is not None and self.u_prev is not None: # shift previous solutions by 1 step x_guess = deepcopy(self.x_prev) @@ -290,20 +349,49 @@ def select_action(self, u_guess[:-1] = u_guess[1:] opti.set_initial(x_var, x_guess) opti.set_initial(u_var, u_guess) + + if self.mode == 'tracking': + # increment the trajectory step after update the reference and initial guess + self.traj_step += 1 + # Solve the optimization problem. - sol = opti.solve() - x_val, u_val = sol.value(x_var), sol.value(u_var) + try: + sol = opti.solve() + x_val, u_val = sol.value(x_var), sol.value(u_var) + except RuntimeError: + print(colored('Infeasible MPC Problem', 'red')) + return_status = opti.return_status() + print(colored(f'Optimization failed with status: {return_status}', 'red')) + if self.solver == 'ipopt': + x_val, u_val = opti.debug.value(x_var), opti.debug.value(u_var) + elif self.solver == 'qrsqp': + if return_status == 'unknown': + # self.terminate_loop = True + if self.u_prev is None: + print(colored('[WARN]: MPC Infeasible first step.', 'yellow')) + u_val = np.zeros((self.model.nu, self.T)) + x_val = np.zeros((self.model.nx, self.T + 1)) + else: + u_val = self.u_prev + x_val = self.x_prev + elif return_status in ['Maximum_Iterations_Exceeded', 'Infeasible_Problem_Detected']: + self.terminate_loop = True + u_val = opti.debug.value(u_var) + x_val = opti.debug.value(x_var) self.x_prev = x_val self.u_prev = u_val self.results_dict['horizon_states'].append(deepcopy(self.x_prev)) self.results_dict['horizon_inputs'].append(deepcopy(self.u_prev)) self.results_dict['goal_states'].append(deepcopy(goal_states)) - self.results_dict['t_wall'].append(opti.stats()['t_wall_total']) + if self.solver == 'ipopt': + self.results_dict['t_wall'].append(opti.stats()['t_wall_total']) # Take the first action from the solved action sequence. if u_val.ndim > 1: action = u_val[:, 0] else: action = np.array([u_val[0]]) + if self.use_lqr_gain_and_terminal_cost: + action += self.lqr_gain @ (obs - x_val[:, 0]) self.prev_action = action return action @@ -407,14 +495,13 @@ def run(self, self.results_dict['action'].append(action) self.results_dict['state'].append(env.state) self.results_dict['state_mse'].append(info['mse']) - # self.results_dict['state_error'].append(env.state - env.X_GOAL[i,:]) - + self.results_dict['state_error'].append(env.state - env.X_GOAL[i, :]) common_metric += info['mse'] print(i, '-th step.') - print(action) - print(obs) - print(reward) - print(done) + print('action:', action) + print('obs', obs) + print('reward', reward) + print('done', done) print(info) print() if render: diff --git a/safe_control_gym/controllers/mpc/mpc_acados.py b/safe_control_gym/controllers/mpc/mpc_acados.py new file mode 100644 index 000000000..83d4b47a2 --- /dev/null +++ b/safe_control_gym/controllers/mpc/mpc_acados.py @@ -0,0 +1,396 @@ +'''Model Predictive Control using Acados.''' + +from copy import deepcopy + +import casadi as cs +import numpy as np +import scipy +from termcolor import colored + +from safe_control_gym.controllers.mpc.mpc import MPC +from safe_control_gym.controllers.mpc.mpc_utils import set_acados_constraint_bound +from safe_control_gym.utils.utils import timing + +try: + from acados_template import AcadosModel, AcadosOcp, AcadosOcpSolver +except ImportError as e: + print(colored(f'Error: {e}', 'red')) + print(colored('acados not installed, cannot use acados-based controller. Exiting.', 'red')) + print(colored('- To build and install acados, follow the instructions at https://docs.acados.org/installation/index.html', 'yellow')) + print(colored('- To set up the acados python interface, follow the instructions at https://docs.acados.org/python_interface/index.html', 'yellow')) + print() + exit() + + +class MPC_ACADOS(MPC): + '''MPC with full nonlinear model.''' + + def __init__( + self, + env_func, + horizon: int = 5, + q_mpc: list = [1], + r_mpc: list = [1], + warmstart: bool = True, + soft_constraints: bool = False, + soft_penalty: float = 10000, + terminate_run_on_done: bool = True, + constraint_tol: float = 1e-6, + # runner args + # shared/base args + output_dir: str = 'results/temp', + additional_constraints: list = None, + use_gpu: bool = False, + seed: int = 0, + use_RTI: bool = False, + use_lqr_gain_and_terminal_cost: bool = False, + **kwargs + ): + '''Creates task and controller. + + Args: + env_func (Callable): function to instantiate task/environment. + horizon (int): mpc planning horizon. + q_mpc (list): diagonals of state cost weight. + r_mpc (list): diagonals of input/action cost weight. + warmstart (bool): if to initialize from previous iteration. + soft_constraints (bool): Formulate the constraints as soft constraints. + soft_penalty (float): Penalty added to acados formulation for soft constraints. + terminate_run_on_done (bool): Terminate the run when the environment returns done or not. + constraint_tol (float): Tolerance to add the the constraint as sometimes solvers are not exact. + output_dir (str): output directory to write logs and results. + additional_constraints (list): List of additional constraints + use_gpu (bool): False (use cpu) True (use cuda). + seed (int): random seed. + use_RTI (bool): Real-time iteration for acados. + use_lqr_gain_and_terminal_cost (bool): Use LQR ancillary gain and terminal cost for the MPC. + ''' + for k, v in locals().items(): + if k != 'self' and k != 'kwargs' and '__' not in k: + self.__dict__.update({k: v}) + super().__init__( + env_func, + horizon=horizon, + q_mpc=q_mpc, + r_mpc=r_mpc, + warmstart=warmstart, + soft_constraints=soft_constraints, + soft_penalty=soft_penalty, + terminate_run_on_done=terminate_run_on_done, + constraint_tol=constraint_tol, + output_dir=output_dir, + additional_constraints=additional_constraints, + # compute_initial_guess_method='ipopt', # use ipopt initial guess by default + use_lqr_gain_and_terminal_cost=use_lqr_gain_and_terminal_cost, + use_gpu=use_gpu, + seed=seed, + **kwargs + ) + # acados settings + self.use_RTI = use_RTI + + @timing + def reset(self): + '''Prepares for training or evaluation.''' + print(colored('Resetting MPC', 'green')) + self.x_guess = None + self.u_guess = None + super().reset() + self.acados_model = None + self.ocp = None + self.acados_ocp_solver = None + # Dynamics model. + self.setup_acados_model() + # Acados optimizer. + self.setup_acados_optimizer() + self.acados_ocp_solver = AcadosOcpSolver(self.ocp) + + def setup_acados_model(self) -> AcadosModel: + '''Sets up symbolic model for acados. + + Returns: + acados_model (AcadosModel): acados model object. + + Other options to set up the model: + f_expl = self.model.x_dot (explicit continuous-time dynamics) + f_impl = self.model.x_dot_acados - f_expl (implicit continuous-time dynamics) + model.f_impl_expr = f_impl + model.f_expl_expr = f_expl + ''' + + acados_model = AcadosModel() + acados_model.x = self.model.x_sym + acados_model.u = self.model.u_sym + acados_model.name = self.env.NAME + + # set up rk4 (acados need symbolic expression of dynamics, not function) + k1 = self.model.fc_func(acados_model.x, acados_model.u) + k2 = self.model.fc_func(acados_model.x + self.dt / 2 * k1, acados_model.u) + k3 = self.model.fc_func(acados_model.x + self.dt / 2 * k2, acados_model.u) + k4 = self.model.fc_func(acados_model.x + self.dt * k3, acados_model.u) + f_disc = acados_model.x + self.dt / 6 * (k1 + 2 * k2 + 2 * k3 + k4) + + acados_model.disc_dyn_expr = f_disc + + # store meta information # NOTE: unit is missing + acados_model.x_labels = self.env.STATE_LABELS + acados_model.u_labels = self.env.ACTION_LABELS + acados_model.t_label = 'time' + + self.acados_model = acados_model + + @timing + def compute_initial_guess(self, init_state, goal_states=None): + '''Use IPOPT to get an initial guess of the solution. + + Args: + init_state (ndarray): Initial state. + goal_states (ndarray): Goal states. + ''' + x_val, u_val = super().compute_initial_guess(init_state, goal_states) + self.x_guess = x_val + self.u_guess = u_val + + def setup_acados_optimizer(self): + '''Sets up nonlinear optimization problem.''' + nx, nu = self.model.nx, self.model.nu + ny = nx + nu + ny_e = nx + + # create ocp object to formulate the OCP + ocp = AcadosOcp() + ocp.model = self.acados_model + + # set dimensions + ocp.dims.N = self.T # prediction horizon + + # set cost (NOTE: safe-control-gym uses quadratic cost) + ocp.cost.cost_type = 'LINEAR_LS' + ocp.cost.cost_type_e = 'LINEAR_LS' + ocp.cost.W = scipy.linalg.block_diag(self.Q, self.R) + ocp.cost.W_e = self.Q if not self.use_lqr_gain_and_terminal_cost else self.P + ocp.cost.Vx = np.zeros((ny, nx)) + ocp.cost.Vx[:nx, :nx] = np.eye(nx) + ocp.cost.Vu = np.zeros((ny, nu)) + ocp.cost.Vu[nx:(nx + nu), :nu] = np.eye(nu) + ocp.cost.Vx_e = np.eye(nx) + # placeholder y_ref and y_ref_e (will be set in select_action) + ocp.cost.yref = np.zeros((ny, )) + ocp.cost.yref_e = np.zeros((ny_e, )) + + # Constraints + # general constraint expressions + state_constraint_expr_list = [] + input_constraint_expr_list = [] + for sc_i, state_constraint in enumerate(self.state_constraints_sym): + state_constraint_expr_list.append(state_constraint(ocp.model.x)) + for ic_i, input_constraint in enumerate(self.input_constraints_sym): + input_constraint_expr_list.append(input_constraint(ocp.model.u)) + + h_expr_list = state_constraint_expr_list + input_constraint_expr_list + h_expr = cs.vertcat(*h_expr_list) + h0_expr = cs.vertcat(*h_expr_list) + he_expr = cs.vertcat(*state_constraint_expr_list) # terminal constraints are only state constraints + # pass the constraints to the ocp object + ocp = self.processing_acados_constraints_expression(ocp, h0_expr, h_expr, he_expr) + + # slack costs for nonlinear constraints + if self.soft_constraints: + # slack variables for all constraints + ocp.constraints.Jsh_0 = np.eye(h0_expr.shape[0]) + ocp.constraints.Jsh = np.eye(h_expr.shape[0]) + ocp.constraints.Jsh_e = np.eye(he_expr.shape[0]) + # slack penalty + L2_pen = self.soft_penalty + L1_pen = self.soft_penalty + ocp.cost.Zl_0 = L2_pen * np.ones(h0_expr.shape[0]) + ocp.cost.Zu_0 = L2_pen * np.ones(h0_expr.shape[0]) + ocp.cost.zl_0 = L1_pen * np.ones(h0_expr.shape[0]) + ocp.cost.zu_0 = L1_pen * np.ones(h0_expr.shape[0]) + ocp.cost.Zu = L2_pen * np.ones(h_expr.shape[0]) + ocp.cost.Zl = L2_pen * np.ones(h_expr.shape[0]) + ocp.cost.zl = L1_pen * np.ones(h_expr.shape[0]) + ocp.cost.zu = L1_pen * np.ones(h_expr.shape[0]) + ocp.cost.Zl_e = L2_pen * np.ones(he_expr.shape[0]) + ocp.cost.Zu_e = L2_pen * np.ones(he_expr.shape[0]) + ocp.cost.zl_e = L1_pen * np.ones(he_expr.shape[0]) + ocp.cost.zu_e = L1_pen * np.ones(he_expr.shape[0]) + + # placeholder initial state constraint + x_init = np.zeros((nx)) + ocp.constraints.x0 = x_init + + # set up solver options + ocp.solver_options.qp_solver = 'PARTIAL_CONDENSING_HPIPM' + ocp.solver_options.hessian_approx = 'GAUSS_NEWTON' + ocp.solver_options.integrator_type = 'DISCRETE' + ocp.solver_options.nlp_solver_type = 'SQP' if not self.use_RTI else 'SQP_RTI' + ocp.solver_options.nlp_solver_max_iter = 25 if not self.use_RTI else 1 + ocp.solver_options.globalization = 'MERIT_BACKTRACKING' + # ocp.solver_options.globalization = 'FUNNEL_L1PEN_LINESEARCH' if not self.use_RTI else 'MERIT_BACKTRACKING' + ocp.solver_options.tf = self.T * self.dt # prediction horizon + + # c code generation + # NOTE: when using GP-MPC, a separated directory is needed; + # otherwise, Acados solver can read the wrong c code + ocp.code_export_directory = self.output_dir + '/mpc_c_generated_code' + + self.ocp = ocp + + def processing_acados_constraints_expression(self, + ocp: AcadosOcp, + h0_expr: cs.MX, + h_expr: cs.MX, + he_expr: cs.MX, + ) -> AcadosOcp: + '''Preprocess the constraints to be compatible with acados. + Args: + ocp (AcadosOcp): acados ocp object + h0_expr (cs.MX expression): initial state constraints + h_expr (cs.MX expression): state and input constraints + he_expr (cs.MX expression): terminal state constraints + Returns: + ocp (AcadosOcp): acados ocp object with constraints set. + + An alternative way to set the constraints is to use bounded constraints of acados: + # bounded input constraints + idxbu = np.where(np.sum(self.env.constraints.input_constraints[0].constraint_filter, axis=0) != 0)[0] + ocp.constraints.Jbu = np.eye(nu) + ocp.constraints.lbu = self.env.constraints.input_constraints[0].lower_bounds + ocp.constraints.ubu = self.env.constraints.input_constraints[0].upper_bounds + ocp.constraints.idxbu = idxbu # active constraints dimension + ''' + + ub = {'h': set_acados_constraint_bound(h_expr, 'ub', self.constraint_tol), + 'h0': set_acados_constraint_bound(h0_expr, 'ub', self.constraint_tol), + 'he': set_acados_constraint_bound(he_expr, 'ub', self.constraint_tol), } + + lb = {'h': set_acados_constraint_bound(h_expr, 'lb'), + 'h0': set_acados_constraint_bound(h0_expr, 'lb'), + 'he': set_acados_constraint_bound(he_expr, 'lb'), } + + # make sure all the ub and lb are 1D numpy arrays + # (see: https://discourse.acados.org/t/infeasible-qps-when-using-nonlinear-casadi-constraint-expressions/1595/5?u=mxche) + for key in ub.keys(): + ub[key] = ub[key].flatten() if ub[key].ndim != 1 else ub[key] + lb[key] = lb[key].flatten() if lb[key].ndim != 1 else lb[key] + # check ub and lb dimensions + for key in ub.keys(): + assert ub[key].ndim == 1, f'ub[{key}] is not 1D numpy array' + assert lb[key].ndim == 1, f'lb[{key}] is not 1D numpy array' + assert ub['h'].shape == lb['h'].shape, 'h_ub and h_lb have different shapes' + + # pass the constraints to the ocp object + ocp.model.con_h_expr_0, ocp.model.con_h_expr, ocp.model.con_h_expr_e = \ + h0_expr, h_expr, he_expr + ocp.dims.nh_0, ocp.dims.nh, ocp.dims.nh_e = \ + h0_expr.shape[0], h_expr.shape[0], he_expr.shape[0] + # assign constraints upper and lower bounds + ocp.constraints.uh_0 = ub['h0'] + ocp.constraints.lh_0 = lb['h0'] + ocp.constraints.uh = ub['h'] + ocp.constraints.lh = lb['h'] + ocp.constraints.uh_e = ub['he'] + ocp.constraints.lh_e = lb['he'] + + return ocp + + @timing + def select_action(self, + obs, + info=None + ): + '''Solves nonlinear mpc problem to get next action. + + Args: + obs (ndarray): Current state/observation. + info (dict): Current info + + Returns: + action (ndarray): Input/action to the task/env. + ''' + nx, nu = self.model.nx, self.model.nu + # set initial condition (0-th state) + self.acados_ocp_solver.set(0, 'lbx', obs) + self.acados_ocp_solver.set(0, 'ubx', obs) + + # warm-starting solver + # NOTE: only for ipopt warm-starting; since acados + # has a built-in warm-starting mechanism. + if self.warmstart: + if self.x_guess is None or self.u_guess is None: + # compute initial guess with IPOPT + self.compute_initial_guess(obs) + for idx in range(self.T + 1): + init_x = self.x_guess[:, idx] + self.acados_ocp_solver.set(idx, 'x', init_x) + for idx in range(self.T): + if nu == 1: + init_u = np.array([self.u_guess[idx]]) + else: + init_u = self.u_guess[:, idx] + self.acados_ocp_solver.set(idx, 'u', init_u) + + # set reference for the control horizon + goal_states = self.get_references() + if self.mode == 'tracking': + self.traj_step += 1 + + y_ref = np.concatenate((goal_states[:, :-1], + np.repeat(self.U_EQ.reshape(-1, 1), self.T, axis=1)), axis=0) + for idx in range(self.T): + self.acados_ocp_solver.set(idx, 'yref', y_ref[:, idx]) + y_ref_e = goal_states[:, -1] + self.acados_ocp_solver.set(self.T, 'yref', y_ref_e) + + # solve the optimization problem + if self.use_RTI: + # preparation phase + self.acados_ocp_solver.options_set('rti_phase', 1) + status = self.acados_ocp_solver.solve() + + # feedback phase + self.acados_ocp_solver.options_set('rti_phase', 2) + status = self.acados_ocp_solver.solve() + + if status not in [0, 2]: + self.acados_ocp_solver.print_statistics() + raise Exception(f'acados returned status {status}. Exiting.') + if status == 2: + print(f'acados returned status {status}. ') + + action = self.acados_ocp_solver.get(0, 'u') + + elif not self.use_RTI: + status = self.acados_ocp_solver.solve() + if status not in [0, 2]: + self.acados_ocp_solver.print_statistics() + raise Exception(f'acados returned status {status}. Exiting.') + if status == 2: + print(f'acados returned status {status}. ') + action = self.acados_ocp_solver.get(0, 'u') + + # get the open-loop solution + if self.x_prev is None and self.u_prev is None: + self.x_prev = np.zeros((nx, self.T + 1)) + self.u_prev = np.zeros((nu, self.T)) + if self.u_prev is not None and nu == 1: + self.u_prev = self.u_prev.reshape((1, -1)) + for i in range(self.T + 1): + self.x_prev[:, i] = self.acados_ocp_solver.get(i, 'x') + for i in range(self.T): + self.u_prev[:, i] = self.acados_ocp_solver.get(i, 'u') + if nu == 1: + self.u_prev = self.u_prev.flatten() + + self.x_guess = self.x_prev + self.u_guess = self.u_prev + self.results_dict['horizon_states'].append(deepcopy(self.x_prev)) + self.results_dict['horizon_inputs'].append(deepcopy(self.u_prev)) + self.results_dict['goal_states'].append(deepcopy(goal_states)) + + self.prev_action = action + if self.use_lqr_gain_and_terminal_cost: + action += self.lqr_gain @ (obs - self.x_prev[:, 0]) + + return action diff --git a/safe_control_gym/controllers/mpc/mpc_acados.yaml b/safe_control_gym/controllers/mpc/mpc_acados.yaml new file mode 100644 index 000000000..4a0d257f4 --- /dev/null +++ b/safe_control_gym/controllers/mpc/mpc_acados.yaml @@ -0,0 +1,14 @@ +# MPC args +horizon: 50 +r_mpc: + - 0.1 +q_mpc: + - 1. +warmstart: True +soft_constraints: False +constraint_tol: 1.0e-6 + +# Runner args +deque_size: 10 +eval_batch_size: 10 +use_RTI: False diff --git a/safe_control_gym/controllers/mpc/mpc_utils.py b/safe_control_gym/controllers/mpc/mpc_utils.py index 49db1c50e..8e691d21c 100644 --- a/safe_control_gym/controllers/mpc/mpc_utils.py +++ b/safe_control_gym/controllers/mpc/mpc_utils.py @@ -1,9 +1,11 @@ '''General MPC utility functions.''' import casadi as cs +import matplotlib.pyplot as plt import numpy as np import scipy import scipy.linalg +from termcolor import colored from safe_control_gym.controllers.lqr.lqr_utils import discretize_linear_system from safe_control_gym.envs.constraints import ConstraintList @@ -40,7 +42,7 @@ def compute_discrete_lqr_gain_from_cont_linear_system(dfdx, dfdu, Q_lqr, R_lqr, btp = np.dot(B.T, P) lqr_gain = -np.dot(np.linalg.inv(R_lqr + np.dot(btp, B)), np.dot(btp, A)) - return lqr_gain, A, B + return lqr_gain, A, B, P def rk_discrete(f, n, m, dt): @@ -90,3 +92,67 @@ def reset_constraints(constraints): if len(constraints_list.input_state_constraints) > 0: raise NotImplementedError('[Error] Cannot handle combined state input constraints yet.') return constraints_list, state_constraints_sym, input_constraints_sym + + +def set_acados_constraint_bound(constraint, + bound_type, + bound_value=None, + ): + '''Set the acados constraint bound. + + Args: + constraint (casadi expression): Constraint expression. + bound_type (str): Type of bound (lb, ub). + bound_value (float): Value of the bound. + + Returns: + bound (np.array): Constraint bound value. + + Note: + all constraints in safe-control-gym are defined as g(x, u) <= constraint_tol + However, acados requires the constraints to be defined as lb <= g(x, u) <= ub + Thus, a large negative number (-1e8) is used as the lower bound. + See: https://github.com/acados/acados/issues/650 + ''' + if bound_value is None: + if bound_type == 'lb': + bound_value = -1e8 + elif bound_type == 'ub': + bound_value = 1e-6 + + return bound_value * np.ones(constraint.shape) + + +def plot_open_loop_sol(ctrl): + ''' Plot the open loop prediction of the MPC controller. + + Args: + ctrl (MPC): MPC controller object. + ''' + if ctrl.x_prev is not None and ctrl.u_prev is not None: + nx = ctrl.x_prev.shape[0] # state dim + nu = ctrl.u_prev.shape[0] # input dim + steps = ctrl.T # prediction horizon + dt = ctrl.dt # ctrl frequency + x = ctrl.x_prev # open loop state (nx, steps + 1) + u = ctrl.u_prev # open loop input (nu, steps) + + # get the reference trajectory + goal_states = ctrl.get_references() + + # Plot the open loop prediction + fig, axs = plt.subplots(nx + nu, 1, figsize=(5, 8)) + fig.tight_layout() + for i in range(nx): + axs[i].plot(np.arange(steps + 1) * dt, x[i, :], 'b', label='pred') + axs[i].plot(np.arange(steps + 1) * dt, goal_states[i, :], 'r--', label='ref', ) + axs[i].set_ylabel(f'$x_{i}$') + axs[i].legend() + for i in range(nu): + axs[nx + i].plot(np.arange(steps) * dt, u[i, :], 'b', label='pred') + axs[nx + i].set_ylabel(f'$u_{i}$') + + plt.xlabel('Time [s]') + plt.show() + else: + print(colored('[Warning] No open loop solution to plot.', 'yellow')) diff --git a/safe_control_gym/envs/gym_control/cartpole.py b/safe_control_gym/envs/gym_control/cartpole.py index 09257fc6b..6a1c91317 100644 --- a/safe_control_gym/envs/gym_control/cartpole.py +++ b/safe_control_gym/envs/gym_control/cartpole.py @@ -438,12 +438,16 @@ def _set_action_space(self): def _set_observation_space(self): '''Sets the observation space of the environment.''' - # Angle at which to fail the episode. - self.theta_threshold_radians = 90 * math.pi / 180 # NOTE: different value in PyBullet gym (0.4) and OpenAI gym (2.4). self.x_threshold = 2.4 + self.x_dot_threshold = 20 + self.theta_threshold_radians = 90 * math.pi / 180 # Angle at which to fail the episode. + self.theta_dot_threshold = 20 # Limit set to 2x: i.e. a failing observation is still within bounds. - obs_bound = np.array([self.x_threshold * 2, np.finfo(np.float32).max, self.theta_threshold_radians * 2, np.finfo(np.float32).max]) + obs_bound = np.array([self.x_threshold * 2, + self.x_dot_threshold, + self.theta_threshold_radians * 2, + self.theta_dot_threshold]) self.state_space = spaces.Box(low=-obs_bound, high=obs_bound, dtype=np.float32) # Concatenate goal info for RL diff --git a/safe_control_gym/envs/gym_pybullet_drones/quadrotor.py b/safe_control_gym/envs/gym_pybullet_drones/quadrotor.py index 82f8bcdc6..2e2cb1887 100644 --- a/safe_control_gym/envs/gym_pybullet_drones/quadrotor.py +++ b/safe_control_gym/envs/gym_pybullet_drones/quadrotor.py @@ -634,48 +634,54 @@ def _set_action_space(self): def _set_observation_space(self): '''Sets the observation space of the environment.''' self.x_threshold = 2 + self.x_dot_threshold = 30 self.y_threshold = 2 + self.y_dot_threshold = 30 self.z_threshold = 2 + self.z_dot_threshold = 30 self.phi_threshold_radians = 85 * math.pi / 180 self.theta_threshold_radians = 85 * math.pi / 180 self.psi_threshold_radians = 180 * math.pi / 180 # Do not bound yaw. + self.phi_dot_threshold_radians = 500 * math.pi / 180 + self.theta_dot_threshold_radians = 500 * math.pi / 180 + self.psi_dot_threshold_radians = 500 * math.pi / 180 # Define obs/state bounds, labels and units. if self.QUAD_TYPE == QuadType.ONE_D: # obs/state = {z, z_dot}. - low = np.array([self.GROUND_PLANE_Z, -np.finfo(np.float32).max]) - high = np.array([self.z_threshold, np.finfo(np.float32).max]) + low = np.array([self.GROUND_PLANE_Z, -self.z_dot_threshold]) + high = np.array([self.z_threshold, self.z_dot_threshold]) self.STATE_LABELS = ['z', 'z_dot'] self.STATE_UNITS = ['m', 'm/s'] elif self.QUAD_TYPE == QuadType.TWO_D: # obs/state = {x, x_dot, z, z_dot, theta, theta_dot}. low = np.array([ - -self.x_threshold, -np.finfo(np.float32).max, - self.GROUND_PLANE_Z, -np.finfo(np.float32).max, - -self.theta_threshold_radians, -np.finfo(np.float32).max + -self.x_threshold, -self.x_dot_threshold, + self.GROUND_PLANE_Z, -self.z_dot_threshold, + -self.theta_threshold_radians, -self.theta_dot_threshold_radians ]) high = np.array([ - self.x_threshold, np.finfo(np.float32).max, - self.z_threshold, np.finfo(np.float32).max, - self.theta_threshold_radians, np.finfo(np.float32).max + self.x_threshold, self.x_dot_threshold, + self.z_threshold, self.z_dot_threshold, + self.theta_threshold_radians, self.theta_dot_threshold_radians ]) self.STATE_LABELS = ['x', 'x_dot', 'z', 'z_dot', 'theta', 'theta_dot'] self.STATE_UNITS = ['m', 'm/s', 'm', 'm/s', 'rad', 'rad/s'] elif self.QUAD_TYPE == QuadType.THREE_D: # obs/state = {x, x_dot, y, y_dot, z, z_dot, phi, theta, psi, p_body, q_body, r_body}. low = np.array([ - -self.x_threshold, -np.finfo(np.float32).max, - -self.y_threshold, -np.finfo(np.float32).max, - self.GROUND_PLANE_Z, -np.finfo(np.float32).max, + -self.x_threshold, -self.x_dot_threshold, + -self.y_threshold, -self.y_dot_threshold, + self.GROUND_PLANE_Z, -self.z_dot_threshold, -self.phi_threshold_radians, -self.theta_threshold_radians, -self.psi_threshold_radians, - -np.finfo(np.float32).max, -np.finfo(np.float32).max, -np.finfo(np.float32).max + -self.phi_dot_threshold_radians, -self.theta_dot_threshold_radians, -self.psi_dot_threshold_radians ]) high = np.array([ - self.x_threshold, np.finfo(np.float32).max, - self.y_threshold, np.finfo(np.float32).max, - self.z_threshold, np.finfo(np.float32).max, + self.x_threshold, self.x_dot_threshold, + self.y_threshold, self.y_dot_threshold, + self.z_threshold, self.z_dot_threshold, self.phi_threshold_radians, self.theta_threshold_radians, self.psi_threshold_radians, - np.finfo(np.float32).max, np.finfo(np.float32).max, np.finfo(np.float32).max + self.phi_dot_threshold_radians, self.theta_dot_threshold_radians, self.psi_dot_threshold_radians ]) self.STATE_LABELS = ['x', 'x_dot', 'y', 'y_dot', 'z', 'z_dot', 'phi', 'theta', 'psi', 'p', 'q', 'r'] diff --git a/safe_control_gym/utils/utils.py b/safe_control_gym/utils/utils.py index 4b638d6fa..efabcc80c 100644 --- a/safe_control_gym/utils/utils.py +++ b/safe_control_gym/utils/utils.py @@ -7,6 +7,8 @@ import random import subprocess import sys +import time +from functools import wraps import gymnasium as gym import imageio @@ -14,6 +16,7 @@ import numpy as np import torch import yaml +from termcolor import colored def mkdirs(*paths): @@ -193,3 +196,18 @@ def unwrap_wrapper(env, wrapper_class): def is_wrapped(env, wrapper_class): '''Check if a given environment has been wrapped with a given wrapper.''' return unwrap_wrapper(env, wrapper_class) is not None + + +def timing(f): + '''Decorator for measuring the time of a function. + The elapsed time is stored in the function object. + ''' + @wraps(f) + def wrap(*args, **kw): + ts = time.time() + result = f(*args, **kw) + te = time.time() + wrap.elapsed_time = te - ts + print(colored(f'func:{f.__name__} took: {wrap.elapsed_time:.3f} sec', 'blue')) + return result + return wrap