Skip to content

Commit

Permalink
Setup planner skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
pipex committed Jan 21, 2025
1 parent 51f82ad commit 70eb653
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 28 deletions.
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub enum Error {
#[error("condition failed: ${0}")]
TaskConditionFailed(#[from] super::task::ConditionFailed),

#[error("plan could not be found: ${0}")]
PlanNotFound(#[from] super::worker::PlanSearchError),

#[error(transparent)]
Other(#[from] Box<dyn std::error::Error>),
}
Expand Down
14 changes: 9 additions & 5 deletions src/path.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use jsonptr::Pointer;
use jsonptr::{Pointer, PointerBuf};
use std::fmt::Display;
use std::ops::Deref;
use std::sync::Arc;

#[derive(Clone, Default, PartialEq, Debug)]
pub struct Path(&'static Pointer);
pub struct Path(PointerBuf);

impl Path {
pub fn from_static(s: &'static str) -> Path {
Path(Pointer::from_static(s))
pub(crate) fn new(pointer: &Pointer) -> Self {
Self(pointer.to_buf())
}

pub fn from_static(s: &'static str) -> Self {
Path(Pointer::from_static(s).to_buf())
}

pub fn to_str(&self) -> &str {
Expand All @@ -30,7 +34,7 @@ impl From<Path> for String {

impl AsRef<Pointer> for Path {
fn as_ref(&self) -> &Pointer {
self.0
&self.0
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct System {
state: Value,
}

// TODO: replace with TryFrom implementation
impl<S> From<S> for System
where
S: Serialize,
Expand All @@ -66,6 +67,10 @@ where
}

impl System {
pub(crate) fn new(state: Value) -> Self {
Self { state }
}

pub(crate) fn root(&self) -> &Value {
&self.state
}
Expand Down
1 change: 0 additions & 1 deletion src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl<S> Context<S> {
}

// This will be used by the planner
#[allow(dead_code)]
pub(crate) fn path(self, path: &'static str) -> Self {
Self {
target: self.target,
Expand Down
14 changes: 14 additions & 0 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ impl<S> Task<S> {
}
}

pub fn id(&self) -> &String {
match self {
Self::Atom { id, .. } => id,
Self::List { id, .. } => id,
}
}

pub fn context(&self) -> &Context<S> {
match self {
Self::Atom { context, .. } => context,
Self::List { context, .. } => context,
}
}

/// Run every action in the task sequentially and return the
/// aggregate changes.
/// TODO: this should probably only have crate visibility
Expand Down
4 changes: 0 additions & 4 deletions src/worker/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ impl<S> Domain<S> {
/// Find matches for the given path in the domain
/// the matches are sorted in order that they should be
/// tested
///
// This will no longer be dead code when the planner
// is implemented
#[allow(dead_code)]
pub(crate) fn at(&self, path: &str) -> Option<(PathArgs, impl Iterator<Item = &Intent<S>>)> {
self.router
.at(path)
Expand Down
4 changes: 3 additions & 1 deletion src/worker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod domain;
mod intent;
mod planner;
mod target;
mod workflow;

pub use domain::*;
pub use intent::*;
pub use target::Target;
pub use planner::*;
110 changes: 110 additions & 0 deletions src/worker/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use serde::Serialize;
use serde_json::Value;
use std::fmt::{self, Display};

use crate::error::{Error, IntoError};
use crate::path::Path;
use crate::task::{Context, Task};

use super::domain::Domain;
use super::target::Target;
use super::workflow::Workflow;
use super::Operation;

pub struct Planner<S> {
domain: Domain<S>,
}

pub enum Plan<S> {
Found { workflow: Workflow<S>, state: S },
NotFound,
}

#[derive(Debug)]
pub enum PlanSearchError {
SerializationError(serde_json::error::Error),
PathNotFound,
}

impl std::error::Error for PlanSearchError {}

impl Display for PlanSearchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PlanSearchError::SerializationError(err) => err.fmt(f),
PlanSearchError::PathNotFound => write!(f, "not found"),
}
}
}

impl IntoError for PlanSearchError {
fn into_error(self) -> Error {
Error::PlanNotFound(self)
}
}

impl<S> Planner<S> {
pub fn new(domain: Domain<S>) -> Self {
Self { domain }
}

fn try_task(&self, task: Task<S>, state: &Value, initial_plan: Workflow<S>) {
match task {
Task::Atom { .. } => {
// Detect loops in the plan
// if (initial_plan.0.some())
}
Task::List { .. } => unimplemented!(),
}
}

pub fn find_plan(&self, cur: S, tgt: S) -> Result<Workflow<S>, Error>
where
S: Serialize + Clone,
{
let initial = serde_json::to_value(cur).map_err(PlanSearchError::SerializationError)?;
let target = Target::try_from(tgt.clone()).map_err(PlanSearchError::SerializationError)?;
let initial_plan = Workflow::<S>::default();

let mut stack = vec![(initial, initial_plan)];

while let Some((state, plan)) = stack.pop() {
let distance = target.distance(&state);

// we reached the target
if distance.is_empty() {
// TODO: return the proper plan
return Ok(Workflow::default());
}

for op in distance.iter() {
// Find applicable tasks
let path = Path::new(op.path());
let matching = self.domain.at(path.to_str());
if let Some((args, intents)) = matching {
// Create the calling context for the job
let context = Context {
path,
args,
target: Some(tgt.clone()),
};
for intent in intents {
// If the intent is applicable to the operation
if op.matches(&intent.operation) || intent.operation != Operation::Any {
let task = intent.job.into_task(context.clone());

// apply the task to the state, if it progresses the plan, then select
// it and put the new state with the new plan on the stack

// try to apply the job
// if successful, put the original state on the stack, followed by the
// new state after applying
}
}
}
}
}

Err(PlanSearchError::PathNotFound)?
}
}
39 changes: 22 additions & 17 deletions src/worker/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use serde_json::Value;
use std::{
cmp::Ordering,
collections::{BTreeSet, LinkedList},
fmt::{self, Display},
ops::Deref,
};

use super::intent;

pub(crate) struct Distance(BTreeSet<Operation>);

impl Distance {
Expand All @@ -20,7 +21,11 @@ impl Distance {
self.0.insert(o);
}

pub(crate) fn iter(&self) -> impl Iterator<Item = &Operation> {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

pub fn iter(&self) -> impl Iterator<Item = &Operation> {
self.0.iter()
}

Expand Down Expand Up @@ -63,24 +68,24 @@ impl Distance {

pub struct Target(Value);

#[derive(Debug)]
pub enum TargetError {
SerializationFailed(serde_json::error::Error),
}
#[derive(PartialEq, Eq, Debug, Clone)]
pub(crate) struct Operation(PatchOperation);

impl std::error::Error for TargetError {}
impl Operation {
pub fn path(&self) -> &Pointer {
self.0.path()
}

impl Display for TargetError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TargetError::SerializationFailed(err) => err.fmt(f),
pub fn matches(&self, op: &intent::Operation) -> bool {
match self.0 {
PatchOperation::Add(..) => op == &intent::Operation::Create,
PatchOperation::Replace(..) => op == &intent::Operation::Update,
PatchOperation::Remove(..) => op == &intent::Operation::Delete,
_ => false,
}
}
}

#[derive(PartialEq, Eq, Debug, Clone)]
pub(crate) struct Operation(PatchOperation);

impl PartialOrd for Operation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
Expand Down Expand Up @@ -111,8 +116,8 @@ impl Target {
Self(target.into())
}

pub fn from<S: Serialize>(state: S) -> Result<Self, TargetError> {
let state = serde_json::to_value(state).map_err(TargetError::SerializationFailed)?;
pub fn try_from<S: Serialize>(state: S) -> Result<Self, serde_json::error::Error> {
let state = serde_json::to_value(state)?;
Ok(Target::new(state))
}

Expand Down Expand Up @@ -147,7 +152,7 @@ impl Target {
// get all paths up to the root
while let Some(newparent) = parent.parent() {
// get the target at the parent to use as value
// no matter the operation, the parent of the target shoul
// no matter the operation, the parent of the target should
// always exist. If it doesn't there is a bug (probbly in jsonptr)
let value = newparent.resolve(&self.0).unwrap_or_else(|e| {
panic!(
Expand Down
26 changes: 26 additions & 0 deletions src/worker/workflow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::dag::Dag;
use crate::task::Task;

pub(crate) struct Action<S> {
/**
* Unique id for the action. This is calculated from the
* task is and the current runtime state expected
* by the planner. This is used for loop detection in the plan.
*/
id: String,

/**
* The task to execute
*
* Only atomic tasks should be added to a worflow item
*/
task: Task<S>,
}

pub struct Workflow<S>(pub(crate) Dag<Action<S>>);

impl<S> Default for Workflow<S> {
fn default() -> Self {
Workflow(Dag::default())
}
}

0 comments on commit 70eb653

Please sign in to comment.