diff --git a/Cargo.lock b/Cargo.lock index 3daac60..2809778 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1262,8 +1262,7 @@ dependencies = [ [[package]] name = "lnp-core" version = "0.9.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb157e66f06e267575e1c8468e61946a959d58c9f73d46784ebc54651e6525f3" +source = "git+https://github.com/joemphilips/lnp-core?branch=swap#931a5a504c3b1e1e7740d4cf869c261c62f34694" dependencies = [ "amplify", "bitcoin", @@ -1282,8 +1281,7 @@ dependencies = [ [[package]] name = "lnp2p" version = "0.9.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f2779e6016a334f8868e7cf501caff6a0f9204cae6b1b7af3111ca8ea5eec17" +source = "git+https://github.com/joemphilips/lnp-core?branch=swap#931a5a504c3b1e1e7740d4cf869c261c62f34694" dependencies = [ "amplify", "bitcoin", diff --git a/Cargo.toml b/Cargo.toml index 1076cf0..f93b8ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,10 @@ required-features = ["server"] name = "signd" required-features = ["server"] +[[bin]] +name = "swapd" +required-features = ["server"] + [dependencies] # LNP/BP crates amplify = "3.13.0" @@ -120,3 +124,6 @@ tor = ["microservices/tor", "internet2/tor"] [package.metadata.configure_me] spec = "config_spec.toml" + +[patch.crates-io] +lnp-core = { git = "https://github.com/joemphilips/lnp-core", branch = "swap" } diff --git a/cli/src/command.rs b/cli/src/command.rs index c052c2e..ae74560 100644 --- a/cli/src/command.rs +++ b/cli/src/command.rs @@ -33,6 +33,8 @@ impl Command { Command::Open { .. } => s!("Opening channel"), Command::Invoice { .. } => s!("Creating invoice"), Command::Pay { .. } => s!("Paying invoice"), + Command::SwapIn { .. } => s!("perform loop-in swap"), + Command::SwapOut { .. } => s!("perform loop-out swap"), } } } @@ -161,6 +163,12 @@ impl Exec for Opts { )?; runtime.report_progress()?; } + Command::SwapIn { amount_asset, address } => { + todo!() + } + Command::SwapOut { amount_asset, node, .. } => { + todo!() + } } Ok(()) } diff --git a/cli/src/opts.rs b/cli/src/opts.rs index e1af2ac..c769c31 100644 --- a/cli/src/opts.rs +++ b/cli/src/opts.rs @@ -235,6 +235,11 @@ pub enum Command { /// amount. Overrides amount provided by the invoice. amount_msat: Option, }, + + /// perform loop in swap + SwapIn { amount_asset: AmountOfAsset, address: String }, + /// perform loop out swap + SwapOut { node: NodeId, amount_asset: AmountOfAsset, max_swap_fee: u64 }, } #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display, Error, From)] diff --git a/rpc/src/error.rs b/rpc/src/error.rs index 6e72dd0..07e3162 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -35,6 +35,9 @@ pub enum FailureCode { /// LNPD-related error Lnpd = 0x010, + /// SWAPD-related error + Swap = 0x40, + /// Error coming from other ESB interface reported to a different sservice Nested = 0xFFE, } diff --git a/rpc/src/messages.rs b/rpc/src/messages.rs index 0e44cd9..e9d9e36 100644 --- a/rpc/src/messages.rs +++ b/rpc/src/messages.rs @@ -24,8 +24,9 @@ use internet2::addr::{InetSocketAddr, NodeAddr, NodeId}; use lightning_invoice::Invoice; use lnp::addr::LnpAddr; use lnp::channel::bolt::{AssetsBalance, ChannelState, CommonParams, PeerParams}; +use lnp::p2p::bifrost::SwapId; use lnp::p2p::bolt::{ChannelId, ChannelType}; -use lnpbp::chain::AssetId; +use lnpbp::chain::{AssetId, Chain}; use microservices::esb::ClientId; use microservices::rpc; use microservices::util::OptionDetails; @@ -130,12 +131,50 @@ pub enum RpcMsg { #[display("funds_info({0})", alt = "{0:#}")] #[from] FundsInfo(FundsInfo), + + #[display("swap_in({0})", alt = "{0:#}")] + #[from] + SwapIn(SwapIn), + + #[display("swap_out({0})", alt = "{0:#}")] + #[from] + SwapOut(SwapOut), + + #[display("swap_info({0})", alt = "{0:#}")] + #[from] + SwapInfo(SwapInfo), } impl RpcMsg { pub fn success() -> Self { RpcMsg::Success(OptionDetails::new()) } } +#[derive(Clone, PartialEq, Eq, Debug, Display, NetworkEncode, NetworkDecode)] +#[display(Debug)] +pub enum NodeOrChannelId { + NodeId(NodeId), + ChannelId(ChannelId), +} + +#[derive(Clone, PartialEq, Eq, Debug, Display, NetworkEncode, NetworkDecode)] +#[display(Debug)] +pub struct SwapIn { + pub amount: u64, + pub asset: Option, + pub address: String, + pub node_or_chan_id: NodeOrChannelId, +} + +#[derive(Clone, PartialEq, Eq, Debug, Display, NetworkEncode, NetworkDecode)] +#[display("swapout({amount}, {chain}, ...)")] +pub struct SwapOut { + pub amount: u64, + pub asset: Option, + pub chain: Chain, + pub node_or_chan_id: NodeOrChannelId, + pub max_swap_fee: u64, +} + /// Request to create channel originating from a client #[derive(Clone, PartialEq, Eq, Debug, Display, NetworkEncode, NetworkDecode)] #[display("{remote_peer}, {funding_sat}, ...")] @@ -328,6 +367,17 @@ pub struct ListPeerInfo { pub bifrost: Vec, } +#[cfg_attr(feature = "serde", serde_as)] +#[derive(Clone, PartialEq, Eq, Debug, Display, NetworkEncode, NetworkDecode)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(crate = "serde_crate"))] +#[display(SwapInfo::to_yaml_string)] +pub struct SwapInfo { + pub id: SwapId, +} + + +#[cfg(feature = "serde")] +impl ToYamlString for SwapInfo {} #[cfg(feature = "serde")] impl ToYamlString for NodeInfo {} #[cfg(feature = "serde")] diff --git a/rpc/src/service_id.rs b/rpc/src/service_id.rs index 298b4e9..a82e229 100644 --- a/rpc/src/service_id.rs +++ b/rpc/src/service_id.rs @@ -14,7 +14,7 @@ use std::str::FromStr; use internet2::addr::NodeId; -use lnp::p2p::bifrost::BifrostApp; +use lnp::p2p::bifrost::{BifrostApp, SwapId}; use lnp::p2p::bolt::{ChannelId, TempChannelId}; use microservices::esb::{self, ClientId, ServiceName}; use strict_encoding::{strict_deserialize, strict_serialize}; @@ -68,6 +68,14 @@ pub enum ServiceId { #[strict_encoding(value = 0x24)] ChannelApp(BifrostApp), + #[display("swapd")] + #[strict_encoding(value = 0x28)] + Swapd(SwapId), + + #[display("swapapp<{0}>")] + #[strict_encoding(value = 0x40)] + SwapApp(BifrostApp), + #[display("other<{0}>")] #[strict_encoding(value = 0xFF)] Other(ServiceName), diff --git a/shell/_lnp-cli b/shell/_lnp-cli index be044d4..0d4f0ba 100644 --- a/shell/_lnp-cli +++ b/shell/_lnp-cli @@ -162,6 +162,31 @@ _arguments "${_arguments_options[@]}" \ '::amount-msat -- Amount of milli-satoshis to pay. Required for invoices lacking amount. Overrides amount provided by the invoice:' \ && ret=0 ;; +(swap-in) +_arguments "${_arguments_options[@]}" \ +'-R+[ZMQ socket for connecting daemon RPC interface]:CONNECT: ' \ +'--rpc=[ZMQ socket for connecting daemon RPC interface]:CONNECT: ' \ +'-h[Print help information]' \ +'--help[Print help information]' \ +'*-v[Set verbosity level]' \ +'*--verbose[Set verbosity level]' \ +':amount-asset:' \ +':address:' \ +&& ret=0 +;; +(swap-out) +_arguments "${_arguments_options[@]}" \ +'-R+[ZMQ socket for connecting daemon RPC interface]:CONNECT: ' \ +'--rpc=[ZMQ socket for connecting daemon RPC interface]:CONNECT: ' \ +'-h[Print help information]' \ +'--help[Print help information]' \ +'*-v[Set verbosity level]' \ +'*--verbose[Set verbosity level]' \ +':node:' \ +':amount-asset:' \ +':max-swap-fee:' \ +&& ret=0 +;; (help) _arguments "${_arguments_options[@]}" \ '-R+[ZMQ socket for connecting daemon RPC interface]:CONNECT: ' \ @@ -189,6 +214,8 @@ _lnp-cli_commands() { 'open:Opens a new channel with a remote peer, which must be already connected' \ 'invoice:Create an invoice' \ 'pay:Pay the invoice' \ +'swap-in:perform loop in swap' \ +'swap-out:perform loop out swap' \ 'help:Print this message or the help of the given subcommand(s)' \ ) _describe -t commands 'lnp-cli commands' commands "$@" @@ -248,5 +275,15 @@ _lnp-cli__ping_commands() { local commands; commands=() _describe -t commands 'lnp-cli ping commands' commands "$@" } +(( $+functions[_lnp-cli__swap-in_commands] )) || +_lnp-cli__swap-in_commands() { + local commands; commands=() + _describe -t commands 'lnp-cli swap-in commands' commands "$@" +} +(( $+functions[_lnp-cli__swap-out_commands] )) || +_lnp-cli__swap-out_commands() { + local commands; commands=() + _describe -t commands 'lnp-cli swap-out commands' commands "$@" +} _lnp-cli "$@" diff --git a/shell/_lnp-cli.ps1 b/shell/_lnp-cli.ps1 index 4c0892d..f5d31d5 100644 --- a/shell/_lnp-cli.ps1 +++ b/shell/_lnp-cli.ps1 @@ -39,6 +39,8 @@ Register-ArgumentCompleter -Native -CommandName 'lnp-cli' -ScriptBlock { [CompletionResult]::new('open', 'open', [CompletionResultType]::ParameterValue, 'Opens a new channel with a remote peer, which must be already connected') [CompletionResult]::new('invoice', 'invoice', [CompletionResultType]::ParameterValue, 'Create an invoice') [CompletionResult]::new('pay', 'pay', [CompletionResultType]::ParameterValue, 'Pay the invoice') + [CompletionResult]::new('swap-in', 'swap-in', [CompletionResultType]::ParameterValue, 'perform loop in swap') + [CompletionResult]::new('swap-out', 'swap-out', [CompletionResultType]::ParameterValue, 'perform loop out swap') [CompletionResult]::new('help', 'help', [CompletionResultType]::ParameterValue, 'Print this message or the help of the given subcommand(s)') break } @@ -152,6 +154,24 @@ Register-ArgumentCompleter -Native -CommandName 'lnp-cli' -ScriptBlock { [CompletionResult]::new('--verbose', 'verbose', [CompletionResultType]::ParameterName, 'Set verbosity level') break } + 'lnp-cli;swap-in' { + [CompletionResult]::new('-R', 'R', [CompletionResultType]::ParameterName, 'ZMQ socket for connecting daemon RPC interface') + [CompletionResult]::new('--rpc', 'rpc', [CompletionResultType]::ParameterName, 'ZMQ socket for connecting daemon RPC interface') + [CompletionResult]::new('-h', 'h', [CompletionResultType]::ParameterName, 'Print help information') + [CompletionResult]::new('--help', 'help', [CompletionResultType]::ParameterName, 'Print help information') + [CompletionResult]::new('-v', 'v', [CompletionResultType]::ParameterName, 'Set verbosity level') + [CompletionResult]::new('--verbose', 'verbose', [CompletionResultType]::ParameterName, 'Set verbosity level') + break + } + 'lnp-cli;swap-out' { + [CompletionResult]::new('-R', 'R', [CompletionResultType]::ParameterName, 'ZMQ socket for connecting daemon RPC interface') + [CompletionResult]::new('--rpc', 'rpc', [CompletionResultType]::ParameterName, 'ZMQ socket for connecting daemon RPC interface') + [CompletionResult]::new('-h', 'h', [CompletionResultType]::ParameterName, 'Print help information') + [CompletionResult]::new('--help', 'help', [CompletionResultType]::ParameterName, 'Print help information') + [CompletionResult]::new('-v', 'v', [CompletionResultType]::ParameterName, 'Set verbosity level') + [CompletionResult]::new('--verbose', 'verbose', [CompletionResultType]::ParameterName, 'Set verbosity level') + break + } 'lnp-cli;help' { [CompletionResult]::new('-R', 'R', [CompletionResultType]::ParameterName, 'ZMQ socket for connecting daemon RPC interface') [CompletionResult]::new('--rpc', 'rpc', [CompletionResultType]::ParameterName, 'ZMQ socket for connecting daemon RPC interface') diff --git a/shell/lnp-cli.bash b/shell/lnp-cli.bash index 6352836..ac28a8d 100644 --- a/shell/lnp-cli.bash +++ b/shell/lnp-cli.bash @@ -45,6 +45,12 @@ _lnp-cli() { ping) cmd+="__ping" ;; + swap-in) + cmd+="__swap__in" + ;; + swap-out) + cmd+="__swap__out" + ;; *) ;; esac @@ -52,7 +58,7 @@ _lnp-cli() { case "${cmd}" in lnp__cli) - opts="-h -V -R -v --help --version --rpc --verbose listen connect ping info funds peers channels open invoice pay help" + opts="-h -V -R -v --help --version --rpc --verbose listen connect ping info funds peers channels open invoice pay swap-in swap-out help" if [[ ${cur} == -* || ${COMP_CWORD} -eq 1 ]] ; then COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) return 0 @@ -371,6 +377,50 @@ _lnp-cli() { COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) return 0 ;; + lnp__cli__swap__in) + opts="-h -R -v --help --rpc --verbose
" + if [[ ${cur} == -* || ${COMP_CWORD} -eq 2 ]] ; then + COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) + return 0 + fi + case "${prev}" in + --rpc) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + -R) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + *) + COMPREPLY=() + ;; + esac + COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) + return 0 + ;; + lnp__cli__swap__out) + opts="-h -R -v --help --rpc --verbose " + if [[ ${cur} == -* || ${COMP_CWORD} -eq 2 ]] ; then + COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) + return 0 + fi + case "${prev}" in + --rpc) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + -R) + COMPREPLY=($(compgen -f "${cur}")) + return 0 + ;; + *) + COMPREPLY=() + ;; + esac + COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) + return 0 + ;; esac } diff --git a/src/bin/swapd.rs b/src/bin/swapd.rs new file mode 100644 index 0000000..480cedf --- /dev/null +++ b/src/bin/swapd.rs @@ -0,0 +1,40 @@ +#![recursion_limit = "256"] +// Coding conventions +#![deny( + non_upper_case_globals, + non_camel_case_types, + non_snake_case, + unused_mut, + unused_imports, + dead_code, + missing_docs +)] +//! Main executable for swapd: microservice for performing submarine swap. +//! transactions. + +#[macro_use] +extern crate log; + +use clap::Parser; +use lnp_node::swapd::Opts; +use lnp_node::Config; + +fn main() { + println!("swapd: submarine swap service"); + + let mut opts = Opts::parse(); + trace!("Command-line arguments: {:?}", &opts); + opts.process(); + trace!("Processed arguments: {:?}", &opts); + + let config: Config = opts.clone().into(); + trace!("Daemon configuration: {:?}", &config); + debug!("MSG RPC socket {}", &config.msg_endpoint); + debug!("CTL RPC socket {}", &config.ctl_endpoint); + + debug!("Starting runtime ..."); + // swapd::run(config).expect("Error running swapd runtime"); + + todo!(); + unreachable!() +} diff --git a/src/bus/ctl.rs b/src/bus/ctl.rs index abb7440..697fe27 100644 --- a/src/bus/ctl.rs +++ b/src/bus/ctl.rs @@ -38,7 +38,7 @@ pub enum CtlMsg { // Node connectivity API // --------------------- - // Sent from lnpd to peerd + // Sent from lnpd to peerd, swapd to peerd/channeld #[display("get_info()")] GetInfo, diff --git a/src/error.rs b/src/error.rs index 34881ed..d4b27d6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -22,7 +22,7 @@ use wallet::psbt::sign::SignError; use crate::bus::ServiceBus; use crate::channeld; -use crate::lnpd::automata::launch; +use crate::lnpd::automata::{launch, swap}; use crate::lnpd::{funding, Daemon}; use crate::routed::PaymentError; use crate::rpc::{self, ServiceId}; @@ -100,6 +100,9 @@ pub enum Error { #[from] Signing(SignError), + #[from] + SwapLaunch(swap::Error), + /// bridge interface failure: {0} #[from(zmq::Error)] #[from] diff --git a/src/lib.rs b/src/lib.rs index 184d7bb..9737343 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,7 @@ pub mod peerd; pub mod routed; mod service; pub mod signd; +pub mod swapd; pub mod watchd; pub use config::Config; @@ -61,6 +62,7 @@ pub use service::{BridgeHandler, Endpoints, Responder, Service, TryToServiceId}; pub const LNP_NODE_MASTER_KEY_FILE: &str = "master.key"; pub const LNP_NODE_FUNDING_WALLET: &str = "funding.wallet"; +pub const LNP_NODE_SWAP_KEY: &str = "swap.key"; #[cfg(not(any(feature = "bolt", feature = "bifrost")))] compile_error!("either 'bolt' or 'bifrost' feature must be used"); diff --git a/src/lnpd/automata/launch.rs b/src/lnpd/automata/launch.rs index 1801761..d5dfa15 100644 --- a/src/lnpd/automata/launch.rs +++ b/src/lnpd/automata/launch.rs @@ -25,11 +25,12 @@ use lnp::channel::{FundingError, PsbtLnpFunding}; use lnp::p2p::bolt::{ChannelId, TempChannelId}; use lnp_rpc::FailureCode; use microservices::esb::{self, ClientId, Handler}; -use microservices::util::OptionDetails; use microservices::LauncherError; +use super::report::report_progress_or_failure; use crate::automata::{Event, StateMachine}; use crate::bus::{BusMsg, CtlMsg, FundChannel, OpenChannelWith, ServiceBus}; +use crate::lnpd::automata::report::{report_failure, report_progress, report_success}; use crate::lnpd::runtime::Runtime; use crate::lnpd::{funding, Daemon}; use crate::rpc::{CreateChannel, Failure, RpcMsg, ServiceId}; @@ -501,61 +502,3 @@ fn complete_signatures( report_success(enquirer, event.endpoints, "Channel created and active"); Ok(()) } - -fn report_failure(client_id: ClientId, endpoints: &mut Endpoints, err: Error) -> Result<(), Error> { - let enquirer = ServiceId::Client(client_id); - let report = RpcMsg::Failure(Failure::from(&err)); - // Swallowing error since we do not want to break channel creation workflow just because of - // not able to report back to the client - let _ = endpoints - .send_to(ServiceBus::Rpc, ServiceId::LnpBroker, enquirer, BusMsg::Rpc(report)) - .map_err(|err| error!("Can't report back to client #{}: {}", client_id, err)); - Err(err.into()) -} - -fn report_progress(client_id: ClientId, endpoints: &mut Endpoints, msg: T) -where - T: ToString, -{ - let enquirer = ServiceId::Client(client_id); - let report = RpcMsg::Progress(msg.to_string()); - // Swallowing error since we do not want to break channel creation workflow just because of - // not able to report back to the client - let _ = endpoints - .send_to(ServiceBus::Rpc, ServiceId::LnpBroker, enquirer, BusMsg::Rpc(report)) - .map_err(|err| error!("Can't report back to client #{}: {}", client_id, err)); -} - -fn report_success(client_id: ClientId, endpoints: &mut Endpoints, msg: T) -where - T: Into, -{ - let enquirer = ServiceId::Client(client_id); - let report = RpcMsg::Success(msg.into()); - // Swallowing error since we do not want to break channel creation workflow just because of - // not able to report back to the client - let _ = endpoints - .send_to(ServiceBus::Rpc, ServiceId::LnpBroker, enquirer, BusMsg::Rpc(report)) - .map_err(|err| error!("Can't report back to client #{}: {}", client_id, err)); -} - -fn report_progress_or_failure( - client_id: ClientId, - endpoints: &mut Endpoints, - result: Result, -) -> Result<(), Error> -where - T: ToString, -{ - let enquirer = ServiceId::Client(client_id); - let report = match result { - Ok(ref val) => RpcMsg::Progress(val.to_string()), - Err(ref err) => RpcMsg::Failure(Failure::from(err)), - }; - // Swallowing error since we do not want to break channel creation workflow just because of - // not able to report back to the client - let _ = endpoints - .send_to(ServiceBus::Rpc, ServiceId::LnpBroker, enquirer, BusMsg::Rpc(report)) - .map_err(|err| error!("Can't report back to client #{}: {}", client_id, err)); - result.map(|_| ()).map_err(Error::into) -} diff --git a/src/lnpd/automata/mod.rs b/src/lnpd/automata/mod.rs index 2d8ddb5..079e03d 100644 --- a/src/lnpd/automata/mod.rs +++ b/src/lnpd/automata/mod.rs @@ -16,3 +16,6 @@ pub mod launch; pub use launch::ChannelLauncher; +pub mod swap; + +mod report; diff --git a/src/lnpd/automata/report.rs b/src/lnpd/automata/report.rs new file mode 100644 index 0000000..f562b9f --- /dev/null +++ b/src/lnpd/automata/report.rs @@ -0,0 +1,69 @@ +use lnp_rpc::{RpcMsg, ServiceId}; +use microservices::esb::ClientId; +use microservices::util::OptionDetails; + +use crate::bus::{BusMsg, ServiceBus}; +use crate::rpc::Failure; +use crate::Endpoints; + +pub fn report_failure(client_id: ClientId, endpoints: &mut Endpoints, err: E) -> Result<(), E> +where + for<'a> &'a E: Into, +{ + let enquirer = ServiceId::Client(client_id); + let report = RpcMsg::Failure((&err).into()); + // Swallowing error since we do not want to break channel creation workflow just because of + // not able to report back to the client + let _ = endpoints + .send_to(ServiceBus::Rpc, ServiceId::LnpBroker, enquirer, BusMsg::Rpc(report)) + .map_err(|err| error!("Can't report back to client #{}: {}", client_id, err)); + Err(err) +} + +pub fn report_progress(client_id: ClientId, endpoints: &mut Endpoints, msg: T) +where + T: ToString, +{ + let enquirer = ServiceId::Client(client_id); + let report = RpcMsg::Progress(msg.to_string()); + // Swallowing error since we do not want to break channel creation workflow just because of + // not able to report back to the client + let _ = endpoints + .send_to(ServiceBus::Rpc, ServiceId::LnpBroker, enquirer, BusMsg::Rpc(report)) + .map_err(|err| error!("Can't report back to client #{}: {}", client_id, err)); +} + +pub fn report_success(client_id: ClientId, endpoints: &mut Endpoints, msg: T) +where + T: Into, +{ + let enquirer = ServiceId::Client(client_id); + let report = RpcMsg::Success(msg.into()); + // Swallowing error since we do not want to break channel creation workflow just because of + // not able to report back to the client + let _ = endpoints + .send_to(ServiceBus::Rpc, ServiceId::LnpBroker, enquirer, BusMsg::Rpc(report)) + .map_err(|err| error!("Can't report back to client #{}: {}", client_id, err)); +} + +pub fn report_progress_or_failure( + client_id: ClientId, + endpoints: &mut Endpoints, + result: Result, +) -> Result<(), E> +where + for<'a> &'a E: Into, + T: ToString, +{ + let enquirer = ServiceId::Client(client_id); + let report = match result { + Ok(ref val) => RpcMsg::Progress(val.to_string()), + Err(ref err) => RpcMsg::Failure(err.into()), + }; + // Swallowing error since we do not want to break channel creation workflow just because of + // not able to report back to the client + let _ = endpoints + .send_to(ServiceBus::Rpc, ServiceId::LnpBroker, enquirer, BusMsg::Rpc(report)) + .map_err(|err| error!("Can't report back to client #{}: {}", client_id, err)); + result.map(|_| ()) +} diff --git a/src/lnpd/automata/swap.rs b/src/lnpd/automata/swap.rs new file mode 100644 index 0000000..c795ec0 --- /dev/null +++ b/src/lnpd/automata/swap.rs @@ -0,0 +1,461 @@ +//! Workflow for launching swapd daemon by lnpd daemon in response to a user request for starting a +//! new swap with a remote peer. + +use std::fs; + +use bitcoin::secp256k1::{self, rand, Secp256k1}; +use bitcoin::util::bip32::{self, ChildNumber, ExtendedPrivKey}; +use bitcoin::{KeyPair, Txid}; +use internet2::addr::NodeId; +use lnp::p2p::bifrost::{self, SwapId, SwapOutRequestMsg}; +use lnp::p2p::bolt; +use lnp_rpc::{ + ChannelInfo, Failure, FailureCode, NodeOrChannelId, RpcMsg, ServiceId, SwapIn, SwapOut, +}; +use lnpbp::chain::{AssetId, Chain, ConversionImpossibleError}; +use microservices::esb::ClientId; +use microservices::{esb, LauncherError}; +use strict_encoding::StrictDecode; +use wallet::hd::UnhardenedIndex; + +use crate::automata::StateMachine; +use crate::bus::{BusMsg, CtlMsg, ServiceBus}; +use crate::lnpd::automata::report::{report_progress, report_progress_or_failure}; +use crate::lnpd::runtime::Runtime; +use crate::lnpd::Daemon; +use crate::{Config, Endpoints, Responder, LNP_NODE_SWAP_KEY}; + +#[derive(Clone, Debug, StrictEncode, StrictDecode)] +struct SwapKeyData { + pub xpriv: ExtendedPrivKey, + pub last_index: UnhardenedIndex, +} + +impl SwapKeyData { + fn get_next_keypair( + &self, + chain_index: u32, + secp: &Secp256k1, + ) -> Result { + let mut path = [chain_index, 1, 0] + .iter() + .map(|idx| ChildNumber::from_hardened_idx(*idx).expect("hardcoded index")) + .collect::>(); + + path.push(self.last_index.into()); + + let key_pair = self + .xpriv + .derive_priv(secp, &path) + .map(|xpriv| xpriv.to_keypair(secp)) + .map_err(Error::from)?; + + Ok(key_pair) + } +} + +impl Config { + fn read_swap_key(&self) -> Result { + let mut swap_key_path = self.data_dir.clone(); + swap_key_path.push(LNP_NODE_SWAP_KEY); + + let wallet_file = + fs::OpenOptions::new().read(true).write(true).create(false).open(swap_key_path)?; + let key_data = SwapKeyData::strict_decode(&wallet_file)?; + Ok(key_data) + } + + fn create_swap_key(&self) -> Result { + let mut swap_key_path = self.data_dir.clone(); + swap_key_path.push(LNP_NODE_SWAP_KEY); + info!("Creating swap intermediate key file at '{}'", swap_key_path.display()); + let wallet_file = fs::File::create(swap_key_path)?; + + let seed: Vec = (0..64).map(|_| rand::random::()).collect(); + + let network: bitcoin::Network = self.chain.clone().try_into()?; + let data = SwapKeyData { + xpriv: ExtendedPrivKey::new_master(network, &seed)?, + last_index: ::zero(), + }; + Ok(data) + } +} + +#[derive(Clone, PartialEq, Eq, Debug, Display, NetworkEncode, NetworkDecode)] +#[display(Debug)] +pub enum SwapRequest { + Out(SwapOut), + In(SwapIn), +} + +/// Errors for channel launching workflow +#[derive(Debug, Display, From, Error)] +#[display(doc_comments)] +pub enum ValidationError { + Foo, +} + +impl SwapRequest { + /// Validate request against current channel state. + /// Returns most plausible channel id to swap in case of success. + fn validate( + &self, + channel_infos: &Vec, + ) -> Result { + todo!() + } + + fn node_or_channel_id(&self) -> &NodeOrChannelId { + match self { + Self::Out(SwapOut { node_or_chan_id, .. }) + | Self::In(SwapIn { node_or_chan_id, .. }) => node_or_chan_id, + } + } + + fn asset(&self) -> &Option { + match self { + Self::Out(SwapOut { asset, .. }) | Self::In(SwapIn { asset, .. }) => asset, + } + } + + fn amount(&self) -> u64 { + match self { + Self::Out(SwapOut { amount, .. }) | Self::In(SwapIn { amount, .. }) => *amount, + } + } +} + +/// Errors for channel launching workflow +#[derive(Debug, Display, From, Error)] +#[display(doc_comments)] +pub enum Error { + /// the received message {0} was not expected at the {1} stage of the channel launch workflow + UnexpectedMessage(CtlMsg, &'static str), + + /// unable to launch swap daemon. Details: {0} + #[from(LauncherError)] + DaemonLaunch(Box>), + + /// failure sending RPC request during state transition. Details: {0} + #[from] + Esb(esb::Error), + + /// Failed to open file + #[from] + FileIo(std::io::Error), + + /// Failed to decode file + #[from] + Encoding(strict_encoding::Error), + + /// RPC request was invalid + #[from] + InvalidRequest(ValidationError), + + /// swap key creation error. + #[from] + BIP32(bip32::Error), + + /// could not create network from chain + #[from] + InvalidChain(ConversionImpossibleError), + + /// could not retrieve NodeId from peerd/channeld + NodeIdNotFound, +} + +impl From<&Error> for Failure { + fn from(err: &Error) -> Self { Failure { code: FailureCode::Swap, info: err.to_string() } } +} +impl From for Failure { + fn from(err: Error) -> Self { Failure { code: FailureCode::Swap, info: err.to_string() } } +} + +/// State machine for launching new swap by swapd in response to user swap request. +/// +/// State machine workflow: +/// ```ignore +/// INIT +/// | +/// --------+ +/// | V +/// | AWAITING_PEER_INFO +/// | | +/// +-------+ +/// | +/// AWAITING_CHANNEL_INFOS +/// | +/// V +/// NEGOTIATING +/// | +/// V +/// SIGNING +/// | +/// V +/// DONE +/// ``` +#[derive(Clone, Debug, Display, StrictEncode, StrictDecode)] +pub enum SwapLauncherState { + /// Awaiting for swapd to come online and report back to lnpd + for signd to derive keyset + /// in parallel. + #[display("INIT")] + Init(SwapId, SwapRequest, ClientId), + + /// Waiting peerd to report current status of the pee + /// (most importantly, channel ids we have against them). + #[display("AWAITING_PEER_INFO")] + AwaitingPeerInfo(SwapId, SwapRequest, ClientId), + + /// Waiting channeld to report current channel information against the peer. + #[display("AWAITING_CHANNEL_INFOS")] + AwaitingChannelInfos { + swap_id: SwapId, + num_expecting: u16, + infos_sofar: Vec, + request: SwapRequest, + node_id: Option, + enquirer: ClientId, + }, + + /// Awaiting for swapd to complete negotiations on swap with the remote peer. + /// At the end of this state lnpd will construct swap either + /// 1. transaction (swap in) + /// 2. off-chain payment (swap out) + #[display("NEGOTIATING")] + Negotiating(SwapId, ClientId), + + /// Awaiting for swapd to sign the construct transaction, after which it can be sent by lnpd to + /// bitcoin network and the workflow will be complete. + #[display("SIGNING")] + Signing(SwapId, Txid, ClientId), +} + +pub struct SwapLauncher { + state: SwapLauncherState, + chain: Chain, + key_data: SwapKeyData, + secp: Secp256k1, +} + +impl StateMachine for SwapLauncher { + type Error = Error; + + fn next( + self, + event: crate::automata::Event, + runtime: &mut Runtime, + ) -> Result, Self::Error> + where + Self: Sized, + { + debug!("SwapLauncher {:#} received {} event", self.swap_id(), event.message); + if let CtlMsg::Error { destination, request, error } = event.message { + let failure = Failure { code: FailureCode::Swap, info: error.clone() }; + runtime.send_rpc(event.endpoints, self.enquirer(), RpcMsg::Failure(failure))?; + return Ok(None); + } + + let next_state = match self.state { + SwapLauncherState::Init(swap_id, request, enquirer) => { + match event.message { + CtlMsg::Hello => { + debug_assert_eq!( + event.source, + ServiceId::Swapd(swap_id.clone()), + "swapd_launcher workflow inconsistency: `Hello` RPC CTL message \ + originating not from a swap daemon" + ); + report_progress( + enquirer, + event.endpoints, + format!("Swap daemon for {} launched.", request.node_or_channel_id()), + ); + + // Ask fellow microservices for information necessary to construct p2p + // messages. + match request.node_or_channel_id() { + NodeOrChannelId::ChannelId(c) => { + let mut e = event; + e.send_ctl_service(ServiceId::Channel(*c), CtlMsg::GetInfo)?; + Some(SwapLauncherState::AwaitingChannelInfos { + swap_id, + num_expecting: 1, + infos_sofar: Vec::with_capacity(1), + request, + node_id: None, + enquirer, + }) + } + NodeOrChannelId::NodeId(node_id) => { + let mut e = event; + e.send_ctl_service( + ServiceId::PeerBifrost(*node_id), + CtlMsg::GetInfo, + )?; + Some(SwapLauncherState::AwaitingPeerInfo( + swap_id, request, enquirer, + )) + } + } + } + _ => todo!(), + } + } + + SwapLauncherState::AwaitingPeerInfo(swap_id, request, enquirer) => { + match event.message { + CtlMsg::PeerInfo(_) => { + // dirty hack to circumvent the rust lifetime checker + let mut dest: Vec = none!(); + let mut len = 0; + let mut remote_id = None; + if let CtlMsg::PeerInfo(ref info) = event.message { + remote_id = Some(info.remote_id[0]); + len = (&info.channels).len(); + for c in &info.channels { + let id = bolt::ChannelId::from(c.clone()); + dest.push(id); + } + } + let mut e = event; + for id in dest { + &e.send_ctl_service(ServiceId::Channel(id), CtlMsg::GetInfo)?; + } + Some(SwapLauncherState::AwaitingChannelInfos { + swap_id, + num_expecting: len as u16, + infos_sofar: Vec::with_capacity(len), + request, + node_id: remote_id, + enquirer, + }) + } + // do nothing + _ => None, + } + } + + SwapLauncherState::AwaitingChannelInfos { + swap_id, + num_expecting, + infos_sofar, + request, + node_id, + enquirer, + } => { + match event.message { + CtlMsg::ChannelInfo(info) => { + let mut infos_sofar = infos_sofar.clone(); + infos_sofar.push(info); + + if num_expecting == infos_sofar.len() as u16 { + let chan_id = request.validate(&infos_sofar)?; + + let node_id = { + let mut maybe_id = None; + for i in infos_sofar { + if i.remote_id.is_some() { + maybe_id = i.remote_id; + break; + } + } + node_id.and(maybe_id).ok_or(Error::NodeIdNotFound) + }?; + + let key_pair = &self.key_data.get_next_keypair( + self.chain.chain_params().is_testnet as u32, + &self.secp, + )?; + let req = SwapOutRequestMsg { + protocol_version: lnp::p2p::bifrost::PROTOCOL_VERSION as u64, + swap_id: swap_id.clone(), + asset: request.asset().clone(), + network: runtime.config.chain.to_string(), + scid: chan_id, + amount: request.amount(), + pubkey: key_pair.public_key(), + }; + let message = BusMsg::Bifrost(bifrost::Messages::SwapOutRequest(req)); + event.endpoints.send_to( + ServiceBus::Ctl, + event.source, + ServiceId::PeerBifrost(node_id), + message, + ); + Some(SwapLauncherState::Negotiating(swap_id, enquirer)) + } else { + None + } + } + // do nothing + _ => None, + } + } + + SwapLauncherState::Negotiating(swap_id, enquirer) => { + todo!() + } + SwapLauncherState::Signing(_, _, _) => todo!(), + }; + + Ok(next_state.map(|state| SwapLauncher { state, ..self })) + } +} + +impl SwapLauncher { + pub fn with( + endpoints: &mut Endpoints, + enquirer: ClientId, + request: SwapRequest, + runtime: &mut Runtime, + ) -> Result { + let swap_id = SwapId::random(); + debug!("SwapLauncher with id {} is instantiated", swap_id); + + let report = runtime + .launch_daemon(Daemon::Swapd(swap_id.clone()), runtime.config.clone()) + .map(|handle| format!("Launched new instance of {}", handle)) + .map_err(Error::from); + report_progress_or_failure(enquirer, endpoints, report)?; + + // prepare keys + + let state = SwapLauncherState::Init(swap_id, request, enquirer); + let launcher = SwapLauncher { + state, + chain: runtime.config.chain.clone(), + key_data: todo!(), + secp: todo!(), + }; + + // prepare messages + info!("SwapLauncher {:#} entered LAUNCHING state", swap_id); + Ok(launcher) + } + + fn enquirer(&self) -> ClientId { self.state.enquirer() } + pub fn swap_id(&self) -> SwapId { self.state.swap_id() } +} + +impl SwapLauncherState { + pub fn swap_id(&self) -> SwapId { + match self { + Self::Init(swap_id, _, _) + | Self::AwaitingPeerInfo(swap_id, _, _) + | Self::AwaitingChannelInfos { swap_id, .. } + | Self::Negotiating(swap_id, _) + | Self::Signing(swap_id, _, _) => swap_id.clone(), + } + } + + pub fn enquirer(&self) -> ClientId { + match self { + Self::Init(_, _, enquirer) + | Self::AwaitingPeerInfo(_, _, enquirer) + | Self::AwaitingChannelInfos { enquirer, .. } + | Self::Negotiating(_, enquirer) + | Self::Signing(_, _, enquirer) => *enquirer, + } + } +} diff --git a/src/lnpd/daemons.rs b/src/lnpd/daemons.rs index f1f1b30..3073f42 100644 --- a/src/lnpd/daemons.rs +++ b/src/lnpd/daemons.rs @@ -21,13 +21,14 @@ use bitcoin::secp256k1::{SecretKey, SECP256K1}; use internet2::addr::LocalNode; use internet2::session::noise::FramingProtocol; use lnp::p2p; +use lnp::p2p::bifrost::SwapId; use lnp::p2p::bolt::ActiveChannelId; use microservices::cli::LogStyle; use microservices::peer::{supervisor, PeerSocket}; use microservices::{DaemonHandle, Launcher, LauncherError}; use crate::lnpd::runtime::Runtime; -use crate::{channeld, peerd, routed, signd, watchd, Config, Error}; +use crate::{channeld, peerd, routed, signd, swapd, watchd, Config, Error}; pub fn read_node_key_file(key_file: &Path) -> LocalNode { let mut file = fs::File::open(key_file).unwrap_or_else(|_| { @@ -70,6 +71,9 @@ pub enum Daemon { #[display("watchd")] Watchd, + + #[display("swapd")] + Swapd(SwapId), } impl Daemon { @@ -95,6 +99,7 @@ impl Launcher for Daemon { Daemon::Channeld(..) => "channeld", Daemon::Routed => "routed", Daemon::Watchd => "watchd", + Daemon::Swapd(_) => "swapd", } } @@ -170,6 +175,7 @@ impl Launcher for Daemon { Daemon::Channeld(channel_id) => channeld::run(config, channel_id), Daemon::Routed => routed::run(config), Daemon::Watchd => watchd::run(config), + Daemon::Swapd(id) => swapd::run(id, config), } } } diff --git a/src/lnpd/runtime.rs b/src/lnpd/runtime.rs index b797150..1a3e951 100644 --- a/src/lnpd/runtime.rs +++ b/src/lnpd/runtime.rs @@ -21,12 +21,14 @@ use bitcoin_scripts::address::AddressCompat; use internet2::addr::{NodeAddr, NodeId}; use lnp::addr::LnpAddr; use lnp::channel::bolt::{CommonParams, LocalKeyset, PeerParams, Policy}; -use lnp::p2p; +use lnp::p2p::{self, bifrost}; +use lnp::p2p::bifrost::SwapId; use lnp::p2p::bolt::{ ActiveChannelId, ChannelId, ChannelReestablish, Messages as LnMsg, TempChannelId, }; use lnp::p2p::Protocol; use lnp_rpc::{FailureCode, ListenAddr}; +use lnpbp::chain::AssetId; use microservices::cli::LogStyle; use microservices::esb::{self, ClientId, Handler}; use microservices::peer::PeerSocket; @@ -37,6 +39,7 @@ use crate::automata::{Event, StateMachine}; use crate::bus::{ AcceptChannelFrom, BusMsg, CtlMsg, IntoSuccessOrFalure, ServiceBus, Status, ToProgressOrFalure, }; +use crate::lnpd::automata::swap::{SwapLauncher, SwapRequest}; use crate::lnpd::automata::ChannelLauncher; use crate::lnpd::daemons::{read_node_key_file, Daemon}; use crate::lnpd::funding::{self, FundingWallet}; @@ -69,6 +72,7 @@ pub fn run<'a>( funding_channels: none!(), accepting_channels: none!(), reestablishing_channels: none!(), + ongoing_swaps: none!(), }; Service::run(config, runtime, true) @@ -107,6 +111,7 @@ pub struct Runtime { funding_channels: HashMap, accepting_channels: HashMap, reestablishing_channels: HashMap, + ongoing_swaps: HashMap, } impl Responder for Runtime {} @@ -147,6 +152,12 @@ impl esb::Handler for Runtime { (ServiceBus::Msg, BusMsg::Bolt(_), service) => { unreachable!("lnpd received peer message not from a peerd but from {}", service) } + (ServiceBus::Msg, BusMsg::Bifrost(msg), ServiceId::PeerBifrost(remote_id)) => { + self.handle_bifrost(endpoints, remote_id, msg) + }, + (ServiceBus::Msg, BusMsg::Bifrost(_), service) => { + unreachable!("lnpd received bifrost peer message not from a peerd but from {}", service) + }, (ServiceBus::Ctl, BusMsg::Ctl(msg), source) => self.handle_ctl(endpoints, source, msg), (ServiceBus::Rpc, BusMsg::Rpc(msg), ServiceId::Client(client_id)) => { self.handle_rpc(endpoints, client_id, msg) @@ -183,6 +194,9 @@ impl esb::Handler for Runtime { } impl Runtime { + fn handle_bifrost(&mut self, endpoints: &mut Endpoints, remote_id: NodeId, message: bifrost::Messages) -> Result<(), Error> { + todo!() + } fn handle_p2p( &mut self, endpoints: &mut Endpoints, @@ -369,6 +383,23 @@ impl Runtime { self.creating_channels.insert(channeld_id, launcher); } + RpcMsg::SwapIn(swap_in) => { + let asset = swap_in.asset.unwrap_or(AssetId::native(&self.config.chain)); + info!("Creating Swapd for swap_in of {} {}", swap_in.amount, asset); + let launcher = + SwapLauncher::with(endpoints, client_id, SwapRequest::In(swap_in), self)?; + let id = launcher.swap_id(); + self.ongoing_swaps.insert(id, launcher); + } + RpcMsg::SwapOut(swap_out) => { + let asset = swap_out.asset.unwrap_or(AssetId::native(&self.config.chain)); + info!("Creating Swapd for swap_out of {} {}", swap_out.amount, asset); + let launcher = + SwapLauncher::with(endpoints, client_id, SwapRequest::Out(swap_out), self)?; + let id = launcher.swap_id(); + self.ongoing_swaps.insert(id, launcher); + } + wrong_msg => { error!("Request is not supported by the RPC interface"); return Err(Error::wrong_esb_msg(ServiceBus::Rpc, &wrong_msg)); @@ -392,7 +423,7 @@ impl Runtime { let launcher = self .creating_channels .remove(&service_id) - .unwrap_or_else(|| panic!("unregistered channel launcher for {}", service_id)); + .unwrap_or_else(|| panic!("Unregistered channel launcher for {}", service_id)); let launcher = launcher .next(Event::with(endpoints, self.identity(), source, message), self)? .expect("channel launcher should not be complete"); diff --git a/src/peerd/runtime.rs b/src/peerd/runtime.rs index 86496ed..ee8c3b3 100644 --- a/src/peerd/runtime.rs +++ b/src/peerd/runtime.rs @@ -312,12 +312,17 @@ impl Runtime { fn handle_ctl( &mut self, - _endpoints: &mut Endpoints, - _source: ServiceId, + endpoints: &mut Endpoints, + source: ServiceId, request: CtlMsg, ) -> Result<(), Error> { #[allow(clippy::match_single_binding)] match request { + CtlMsg::GetInfo => { + let info = BusMsg::Ctl(CtlMsg::PeerInfo(self.get_info())); + endpoints.send_to(ServiceBus::Ctl, self.identity(), source, info)?; + Ok(()) + } _ => { error!("Request is not supported by the CTL interface"); Err(Error::wrong_esb_msg(ServiceBus::Ctl, &request)) @@ -442,6 +447,17 @@ impl Runtime { )?; } + message if message.is_swap_msg() => { + if let Some(swap_id) = message.swap_id() { + endpoints.send_to( + ServiceBus::Msg, + self.identity(), + ServiceId::Swapd(swap_id.clone()), + BusMsg::Bifrost(message), + )?; + } + } + message => { // TODO: // 1. Check permissions @@ -456,6 +472,33 @@ impl Runtime { Ok(()) } + fn get_info(&self) -> PeerInfo { + PeerInfo { + local_id: self.local_id, + remote_id: self.remote_id.map(|id| vec![id]).unwrap_or_default(), + local_socket: self.local_socket, + remote_socket: vec![self.remote_socket], + uptime: SystemTime::now() + .duration_since(self.started) + .unwrap_or_else(|_| Duration::from_secs(0)), + since: self + .started + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) + .as_secs(), + messages_sent: self.messages_sent, + messages_received: self.messages_received, + channels: self + .channels + .iter() + .copied() + .map(bolt::ActiveChannelId::as_slice32) + .collect(), + connected: !self.connect, + awaits_pong: self.awaited_pong.is_some(), + } + } + fn handle_rpc( &mut self, endpoints: &mut Endpoints, @@ -464,30 +507,7 @@ impl Runtime { ) -> Result<(), Error> { match message { RpcMsg::GetInfo => { - let peer_info = PeerInfo { - local_id: self.local_id, - remote_id: self.remote_id.map(|id| vec![id]).unwrap_or_default(), - local_socket: self.local_socket, - remote_socket: vec![self.remote_socket], - uptime: SystemTime::now() - .duration_since(self.started) - .unwrap_or_else(|_| Duration::from_secs(0)), - since: self - .started - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_secs(), - messages_sent: self.messages_sent, - messages_received: self.messages_received, - channels: self - .channels - .iter() - .copied() - .map(bolt::ActiveChannelId::as_slice32) - .collect(), - connected: !self.connect, - awaits_pong: self.awaited_pong.is_some(), - }; + let peer_info = self.get_info(); self.send_rpc(endpoints, client_id, peer_info)?; } diff --git a/src/swapd/automata/mod.rs b/src/swapd/automata/mod.rs new file mode 100644 index 0000000..7cc3d0e --- /dev/null +++ b/src/swapd/automata/mod.rs @@ -0,0 +1,237 @@ +use bitcoin::PrivateKey; +use lnp::channel::bolt::Lifecycle; +use lnp::p2p::bifrost::{SwapId, SwapOutRequestMsg}; +use lnp::p2p::bolt::ShortChannelId; +use lnp_rpc::{Failure, FailureCode, ServiceId}; +use microservices::esb; + +use super::runtime::Runtime; +use crate::automata::{Event, StateMachine}; +use crate::bus::{BusMsg, CtlMsg}; + +/// Errors for swap workflow. +#[derive(Clone, Debug, Display, From, Error)] +#[display(doc_comments)] +pub enum Error { + /// unexpected message from {2} for a swap state {1}. Message details: {0} + UnexpectedMessage(BusMsg, SwapStateEnum, ServiceId), + + /// error sending RPC request during state transition. Details: {0} + #[from] + Esb(esb::Error), + + /// unable to {operation} during {current_state} channel state + InvalidState { + operation: &'static str, + current_state: Lifecycle, + }, + + /// swap was not persisted on a disk, so unable to reestablish + NoPersistantData, + + /// failed to save swap state. Details: {0} + #[from] + Persistence(strict_encoding::Error), + + Electrum(String), +} + +impl From for Error { + fn from(err: electrum_client::Error) -> Self { Error::Electrum(err.to_string()) } +} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct SwapInState { + id: SwapId, +} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct SwapOutState { + id: SwapId, + request: SwapOutRequestMsg, + outgoing_chan_ids: Vec, + swap_tx_conf_requirement: u32, + private_key: PrivateKey, +} + +pub enum SwapState { + Out(SwapOutState), + In(SwapInState), +} + +#[derive(Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Debug, Display, From)] +#[display("{0}")] +pub enum SwapStateEnum { + Init, + + #[from] + SwapOutSender(SwapOutSenderState), + #[from] + SwapOutReceiver(SwapOutReceiverState), + #[from] + SwapInSender(SwapInSenderState), + #[from] + SwapInReceiver(SwapInReceiverState), + + #[from] + SwapCommon(SwapCommonState), +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display, From)] +#[derive(StrictEncode, StrictDecode)] +pub enum SwapCommonState { + #[display("SEND_CANCEL")] + SendCancel, + + #[display("SWAP_CANCELED")] + SwapCanceled, +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display, From)] +#[derive(StrictEncode, StrictDecode)] +pub enum SwapOutSenderState { + #[display("CREATE_SWAP")] + CreateSwap, + + #[display("SEND_REQUEST")] + SendRequest, + + #[display("AWAIT_AGREEMENT")] + AwaitAgreement, + + #[display("PAY_FEE_INVOICE")] + PayFeeInvoice, + + #[display("AWAIT_TX_BROADCASTED_MSG")] + AwaitTxBroadcastedMsg, + + #[display("AWAIT_TX_CONFIRMATION")] + AwaitTxConfirmation, + + #[display("VALIDATE_TX_AND_PAY_CLAIM_INVOICE")] + ValidateTxAndPayClaimInvoice, + + #[display("CLAIM_SWAP")] + ClaimSwap, + + #[display("SEND_PRIV_KEY")] + SendPrivKey, + + #[display("SEND_COOP_CLOSE")] + SendCoopClose, +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display, From)] +#[derive(StrictEncode, StrictDecode)] +pub enum SwapOutReceiverState { + #[display("CREATE_SWAP")] + CreateSwap, + + #[display("SEND_FEE_INVOICE")] + SendFeeInvoice, + + #[display("AWAIT_FEE_INVOICE_PAYMENT")] + AwaitFeeInvoicePayment, + + #[display("BroadcastOpeningTx")] + BroadcastOpeningTx, + /// + #[display("SendTxBroadcastedMessage")] + SendTxBroadcastedMessage, + + #[display("AWAIT_CLAIM_INVOICE_PAYMENT")] + AwaitClaimInvoicePayment, + + #[display("ABORTED")] + Aborted, + + #[display("CLAIM_SWAP_CSV")] + ClaimSwapCsv, + + #[display("CLAIM_SWAP_COOP")] + ClaimSwapCoop, +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display, From)] +#[derive(StrictEncode, StrictDecode)] +pub enum SwapInSenderState { + #[display("CREATE_SWAP")] + CreateSwap, + + #[display("SEND_REQUEST")] + SendRequest, + + #[display("AWAIT_AGREEMENT")] + AwaitAgreement, + + #[display("BROADCAST_OPENING_TX")] + BroadcastOpeningTx, + + #[display("SEND_TX_BROADCASTED_MESSAGE")] + SendTxBroadcastedMessage, + + #[display("AWAIT_CLAIM_PAYMENT")] + AwaitClaimPayment, + + #[display("CLAIM_SWAP_CSV")] + ClaimSwapCoop, +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display, From)] +#[derive(StrictEncode, StrictDecode)] +pub enum SwapInReceiverState { + #[display("CREATE_SWAP")] + CreateSwap, + + #[display("SEND_AGREEMENT")] + SendAgreement, + + #[display("AWAIT_TX_BROADCASTED_MSG")] + AwaitTxBroadcastedMsg, + + #[display("AWAIT_TX_CONFIRMATION")] + AwaitTxConfirmation, + + #[display("VALIDATE_TX_AND_PAY_CLAIM_INVOICE")] + ValidateTxAndPayClaimInvoice, + + #[display("CLAIM_SWAP")] + ClaimSwap, + + #[display("SEND_PRIV_KEY")] + SendPrivKey, + + #[display("SEND_COOP_CLOSE")] + SendCoopClose, +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +pub struct SwapStateMachine { + id: SwapId, + state: SwapStateEnum, +} + +impl StateMachine for SwapStateMachine { + type Error = Error; + + fn next(self, event: Event, runtime: &mut Runtime) -> Result, Self::Error> + where + Self: Sized, + { + debug!("SwapStateMachine {:#?} received {} event", self.id, event.message); + if let BusMsg::Ctl(CtlMsg::Error { error, .. }) = &event.message { + let failure = Failure { code: FailureCode::Swap, info: error.clone() }; + todo!() + } + + match (self.state, event.message) { + (SwapStateEnum::Init, BusMsg::Ctl(CtlMsg::DeriveKeyset(key_set))) => { + todo!() + } + (SwapStateEnum::SwapOutSender(swap_out_sender), BusMsg::Rpc(rpc)) => { + todo!() + } + _ => todo!(), + } + } +} diff --git a/src/swapd/mod.rs b/src/swapd/mod.rs new file mode 100644 index 0000000..9469d6e --- /dev/null +++ b/src/swapd/mod.rs @@ -0,0 +1,8 @@ +pub mod automata; +#[cfg(feature = "server")] +mod opts; +mod runtime; + +#[cfg(feature = "server")] +pub use opts::Opts; +pub use runtime::run; diff --git a/src/swapd/opts.rs b/src/swapd/opts.rs new file mode 100644 index 0000000..ac9cfcf --- /dev/null +++ b/src/swapd/opts.rs @@ -0,0 +1,22 @@ +use crate::opts::Options; + +#[derive(Parser, Clone, PartialEq, Eq, Debug)] +#[clap(name = "watchd", bin_name = "watchd", author, version)] +pub struct Opts { + /// These params can be read also from the configuration file, not just + /// command-line args or environment variables + #[clap(flatten)] + pub shared: crate::opts::Opts, +} + +impl Options for Opts { + type Conf = (); + + fn shared(&self) -> &crate::opts::Opts { &self.shared } + + fn config(&self) -> Self::Conf { () } +} + +impl Opts { + pub fn process(&mut self) { self.shared.process() } +} diff --git a/src/swapd/runtime.rs b/src/swapd/runtime.rs new file mode 100644 index 0000000..e9e1dca --- /dev/null +++ b/src/swapd/runtime.rs @@ -0,0 +1,110 @@ +use lnp::p2p; +use lnp::p2p::bifrost::SwapId; +use lnp_rpc::{RpcMsg, ServiceId, SwapInfo}; +use microservices::esb::{self, EndpointList, Handler}; + +use crate::bus::{BusMsg, CtlMsg, ServiceBus}; +use crate::{Config, Endpoints, Error, Responder, Service}; + +pub fn run(swap_id: SwapId, config: Config) -> Result<(), Error> { + debug!("Opening bridge between electrum watcher and main service threads"); + let runtime = Runtime::with(swap_id, &config); + Service::run(config, runtime, false) +} + +pub struct Runtime { + enquirer: Option, + id: SwapId, +} + +impl Responder for Runtime { + #[inline] + fn enquirer(&self) -> Option { self.enquirer } +} + +impl Handler for Runtime { + type Request = BusMsg; + type Error = Error; + + fn identity(&self) -> ::Address { ServiceId::Swapd(self.id.clone()) } + + fn handle( + &mut self, + endpoints: &mut esb::EndpointList, + bus_id: ServiceBus, + source: ServiceId, + request: Self::Request, + ) -> Result<(), Self::Error> { + match (bus_id, request, source) { + (ServiceBus::Msg, BusMsg::Bifrost(msg), ServiceId::PeerBifrost(_nodeid)) => { + self.handle_peerswap_msg(endpoints, msg) + } + (ServiceBus::Msg, BusMsg::Bifrost(_), service) => unreachable!( + "swapd received bifrost p2p message not from a peerd but from {}", + service + ), + (ServiceBus::Rpc, BusMsg::Rpc(msg), ServiceId::Client(client_id)) => { + self.handle_rpc(endpoints, client_id, msg) + } + (ServiceBus::Rpc, BusMsg::Rpc(_), service) => { + unreachable!("swapd received RPC message not from a client but from {}", service) + } + (ServiceBus::Ctl, BusMsg::Ctl(msg), source) => self.handle_ctl(endpoints, source, msg), + (bus, msg, _) => Err(Error::wrong_esb_msg(bus, &msg)), + } + } + + fn handle_err( + &mut self, + endpoints: &mut esb::EndpointList, + error: esb::Error<::Address>, + ) -> Result<(), Self::Error> { + todo!() + } +} + +impl Runtime { + pub fn with(swap_id: SwapId, config: &Config) -> Self { todo!() } + + fn handle_ctl( + &mut self, + endpoints: &mut Endpoints, + source: ServiceId, + msg: CtlMsg, + ) -> Result<(), Error> { + todo!() + } + + fn handle_rpc( + &self, + endpoints: &mut Endpoints, + client_id: u64, + msg: RpcMsg, + ) -> Result<(), >::Error> { + match msg { + RpcMsg::GetInfo => { + let swap_info = RpcMsg::SwapInfo(SwapInfo { + id: self.id.clone(), + }); + self.send_rpc(endpoints, client_id, swap_info)?; + }, + wrong_request => { + error!("Request is not supported by the RPC interface"); + return Err(Error::wrong_esb_msg(ServiceBus::Rpc, &wrong_request)); + } + } + Ok(()) + } + fn handle_peerswap_msg( + &self, + endpoints: &mut EndpointList, + msg: p2p::bifrost::Messages, + ) -> Result<(), >::Error> { + match msg { + p2p::bifrost::Messages::SwapInRequest(req) => { + todo!() + } + _ => todo!(), + } + } +} diff --git a/src/swapd/worker.rs b/src/swapd/worker.rs new file mode 100644 index 0000000..b9891b8 --- /dev/null +++ b/src/swapd/worker.rs @@ -0,0 +1,5 @@ + +use crate::{Config, Error, Service}; + +pub fn run(config: Config) -> Result<(), Error> { +} \ No newline at end of file