diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..b74234d --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,241 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "2.33.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim", + "textwrap", + "unicode-width", + "vec_map", +] + +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + +[[package]] +name = "getrandom" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash", +] + +[[package]] +name = "hashlink" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +dependencies = [ + "hashbrown", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "jerbs" +version = "0.1.0" +dependencies = [ + "clap", + "rusqlite", +] + +[[package]] +name = "libc" +version = "0.2.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce" + +[[package]] +name = "libsqlite3-sys" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abd5850c449b40bacb498b2bbdfaff648b1b055630073ba8db499caf2d0ea9f2" +dependencies = [ + "pkg-config", + "vcpkg", +] + +[[package]] +name = "memchr" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" + +[[package]] +name = "once_cell" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" + +[[package]] +name = "pkg-config" +version = "0.3.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb" + +[[package]] +name = "rusqlite" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a82b0b91fad72160c56bf8da7a549b25d7c31109f52cc1437eac4c0ad2550a7" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "memchr", + "smallvec", +] + +[[package]] +name = "smallvec" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" + +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "unicode-width" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + +[[package]] +name = "version_check" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" + +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1576178 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "jerbs" +version = "0.1.0" +edition = "2018" +authors = ["Kaz Wesley "] +description = "Command-line work-stealing scheduler." +repository = "https://github.com/kazcw/jerbs" +license = "GPL-3.0" +categories = ["command-line-utilities"] +exclude = [".gitignore"] + +[dependencies] +clap = "2.33" +rusqlite = "0.26" diff --git a/README.md b/README.md new file mode 100644 index 0000000..6978f14 --- /dev/null +++ b/README.md @@ -0,0 +1,59 @@ +# jerbs + +Command-line work-stealing scheduler. + +## Operation + +Create a job database: +``` +$ jerbs work.db new +``` + +Define a job and enqueue some repetitions: +``` +$ jerbs work.db new-job --count 17 <<< "info for thing to do 17 times" +1 +``` +The output is the job id, which you can use to edit the job later. + +See what's scheduled: +``` +$ jerbs work.db list-jobs -v +1 17 "info for thing to do 17 times" +``` +(Note: do not use verbose output (`-v`) for scripting. It is intended to be +human-readable and the format is unstable.) + +Run a worker: +``` +$ while jerbs work.db take $$ | read JOB; do echo $JOB; done +``` +Now start some more! + +## Typical Usage + +I made this so I could have a tmux with a worker process in each pane, all +taking jobs from the same queue. The worker processes run a shell script that +uses this utility to pick the next job. + +A job's payload is a blob of data. What's in the blob is up to you. If a job +needs multiple parameters, the blobs could be filenames indicating where to +find the job data; or, you might pack the data directly into the blob with a +delimiter-based format or `jq` or something. + +Worker IDs can be any utf-8 string. If your worker is a bash script, you can +pass `$$` to use your worker's PID. + +Because the data blob for your task may contain characters that are subject to +string interpolation hazards, any command that requires a blob will read it +from standard input by default. If your blobs are shell-safe, you can instead +use `--data` to include your blob in the arguments. + +## Comparison to alternatives + +Other work-stealing schedulers (like GNU Parallel) are frameworks; they own the +worker processes, so you can only configure workers through the framework. +`jerbs` inverts this paradigm: `jerbs` is a utility to be used from your worker +script. With `jerbs` you can easily assign unique resources to the workers, pin +workers to CPUs/NUMA nodes, or dynamically vary the number of simultaneous +jobs. At last, the workers control the means of production. diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..574e831 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,210 @@ +use rusqlite::Connection; +use rusqlite::params; + +const DB_VERSION: i32 = 1; + +#[derive(Debug)] +#[non_exhaustive] +pub enum Error { + SqliteError(rusqlite::Error), + DbTooNew { db_version: i32 }, +} +pub type Result = std::result::Result; + +impl From for Error { + fn from(e: rusqlite::Error) -> Self { + Error::SqliteError(e) + } +} + +struct Job { + id: i32, + data: Vec, +} + +pub struct Db { + conn: Connection, +} + +impl Db { + pub fn create(path: &str) -> Result { + // TODO: fail right away if the path exists--would give a clearer error message than + // bailing on a CREATE TABLE below. + let conn = Connection::open(path)?; + + Self::create_from_conn(conn) + } + + fn create_from_conn(conn: Connection) -> Result { + conn.execute("CREATE TABLE meta (version INTEGER)", [])?; + conn.execute("CREATE TABLE job (id INTEGER PRIMARY KEY, count INTEGER NOT NULL, data BLOB NOT NULL UNIQUE)", [])?; + conn.execute("CREATE TABLE worker (id INTEGER PRIMARY KEY, job REFERENCES job, data BLOB NOT NULL)", [])?; + conn.execute("INSERT INTO meta VALUES (?)", [DB_VERSION])?; + + Ok(Self { conn }) + } + + pub fn open(path: &str) -> Result { + let conn = Connection::open(path)?; + Self::open_from_conn(conn) + } + + fn open_from_conn(conn: Connection) -> Result { + { + let mut version = conn.prepare("SELECT version FROM meta")?; + let mut version = version.query([])?; + let version: i32 = version.next()?.unwrap().get(0)?; + if version > DB_VERSION { + return Err(Error::DbTooNew{ db_version: version }); + } + } + Ok(Self { conn }) + } + + pub fn take(&mut self, data: &str) -> Result>> { + const JOB_Q: &str = + "SELECT job.id, job.data FROM job \ + LEFT JOIN (SELECT worker.job, count(1) as c FROM worker GROUP BY worker.job) as w + ON w.job = job.id \ + WHERE COALESCE(w.c, 0) < job.count ORDER BY job.id LIMIT 1"; + let job; + let tx = self.conn.transaction()?; + { + let mut job_q = tx.prepare(JOB_Q)?; + let mut jobs = job_q.query([])?; + let row = jobs.next()?; + if let Some(row) = row { + job = Job { + id: row.get(0)?, + data: row.get(1)?, + }; + } else { + return Ok(None); + } + tx.execute( + "INSERT INTO worker (job, data) VALUES (?, ?)", + params![job.id, data], + )?; + } + tx.commit()?; + + Ok(Some(job.data)) + } + + pub fn new_job(&mut self, data: &[u8], count: u64) -> Result { + self.conn.execute("INSERT INTO job (data, count) VALUES (?, ?)", params![data, count])?; + let id = self.conn.last_insert_rowid() as u32; + + Ok(id) + } + + // TODO: iterator version. Has to own its Statement. + pub fn job_ids_vec(&self) -> Result> { + let mut q = self.conn.prepare("SELECT id FROM job")?; + let mut results = Vec::new(); + let mut rows = q.query([])?; + while let Some(row) = rows.next()? { + results.push(row.get(0).unwrap()); + } + Ok(results) + } + + pub fn get_data(&mut self, job_id: u32) -> Result> { + let mut q = self.conn.prepare("SELECT data FROM job WHERE id = ?")?; + let mut result = q.query([job_id])?; + result.next()?.unwrap().get(0).map_err(From::from) + } + + pub fn get_count(&mut self, job_id: u32) -> Result { + let mut q_c = self.conn.prepare("SELECT count FROM job WHERE id = ?")?; + let mut q_w = self.conn.prepare("SELECT count(1) FROM worker WHERE job = ?")?; + let mut c = q_c.query([job_id])?; + let mut w = q_w.query([job_id])?; + let c: u64 = c.next()?.unwrap().get(0)?; + let w: u64 = w.next()?.unwrap().get(0)?; + debug_assert!(c >= w); + Ok(if w > c { + 0 + } else { + c - w + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_init_twice() -> Result<()> { + let conn = Connection::open_in_memory()?; + let db1 = Db::create_from_conn(conn)?; + let result = Db::create_from_conn(db1.conn); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_db_too_new() -> Result<()> { + let conn = Connection::open_in_memory()?; + let db = Db::create_from_conn(conn)?; + let conn = db.conn; + conn.execute("UPDATE meta SET version = ?", [std::i32::MAX])?; + let result = Db::open_from_conn(conn); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_job() -> Result<()> { + let conn = Connection::open_in_memory()?; + let mut db = Db::create_from_conn(conn)?; + + // insert a job + const BLOB: &[u8] = b"foo bar"; + const INITIAL_COUNT: u64 = 2; + let id = db.new_job(BLOB, INITIAL_COUNT)?; + + // make sure it's inserted + let ids = db.job_ids_vec()?; + assert_eq!(ids.len(), 1); + assert_eq!(ids[0], id); + + // make sure it's inserted correctly + let blob = db.get_data(id)?; + assert_eq!(&blob, BLOB); + let count = db.get_count(id)?; + assert_eq!(count, INITIAL_COUNT); + + // check that take() works + let blob = db.take("some worker id")?.unwrap(); + assert_eq!(&blob, BLOB); + assert_eq!(db.get_count(id)?, 1); + let blob = db.take("some worker id")?.unwrap(); + assert_eq!(&blob, BLOB); + assert_eq!(db.get_count(id)?, 0); + let result = db.take("some worker id")?; + assert_eq!(result, None); + assert_eq!(db.get_count(id)?, 0); + + Ok(()) + } + + #[test] + fn test_job_collision() -> Result<()> { + let conn = Connection::open_in_memory()?; + let mut db = Db::create_from_conn(conn)?; + + // insert a job + const BLOB: &[u8] = b"foo bar"; + db.new_job(BLOB, 3)?; + + // try to insert another job with the same blob + let secondtime = db.new_job(BLOB, 0); + assert!(secondtime.is_err()); + + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..f9e9f3d --- /dev/null +++ b/src/main.rs @@ -0,0 +1,134 @@ +mod db; + +use db::Db; +use clap::{Arg, App, AppSettings, SubCommand}; +use std::io::{self, Read, Write}; + +fn read_data() -> Vec { + let mut buf = Vec::new(); + io::stdin().read_to_end(&mut buf).unwrap(); + buf +} + +fn main() -> db::Result<()> { + let args = + App::new("jerbs") + .setting(AppSettings::SubcommandRequiredElseHelp) + .version("0.1") + .author("Kaz Wesley ") + .about("Command-line work-stealing scheduler") + .arg(Arg::with_name("database") + .help("Path to the jobs database file") + .required(true) + .index(1)) + .subcommand(SubCommand::with_name("new") + .about("create a new jobs database")) + .subcommand(SubCommand::with_name("new-job") + .about("define a job") + .arg(Arg::with_name("count") + .help("the number of repetitions to enqueue initially") + .short("c") + .long("count") + .takes_value(true) + .default_value("0")) + .arg(Arg::with_name("data") + .help("the data associated with the job") + .short("d") + .long("data") + .takes_value(true))) + .subcommand(SubCommand::with_name("list-jobs") + .about("show all defined jobs") + .arg(Arg::with_name("verbose") + .help("informative output for interactive use") + .short("v") + .long("verbose"))) + .subcommand(SubCommand::with_name("get-data") + .about("get the data associated with a job") + .arg(Arg::with_name("job-id") + .required(true) + .index(1))) + .subcommand(SubCommand::with_name("get-count") + .about("get the remaining count for a job") + .arg(Arg::with_name("job-id") + .required(true) + .index(1))) + .subcommand(SubCommand::with_name("take") + .about("take a job from the queue") + .arg(Arg::with_name("wait") + .help("wait for a job to become available") + .short("w") + .long("wait")) + .arg(Arg::with_name("worker-id") + .help("any string identifying the worker taking the job") + .required(true) + .index(1))) + .get_matches(); + + let path = args.value_of("database").unwrap(); + + match args.subcommand() { + ("new", Some(_)) => { + let _ = Db::create(path)?; + Ok(()) + } + ("new-job", Some(args)) => { + let count = args.value_of("count").unwrap().parse().expect("count must be integer"); + let mut db = Db::open(path)?; + let id = if let Some(data) = args.value_of("data") { + db.new_job(data.as_bytes(), count)? + } else { + let data = read_data(); + db.new_job(&data, count)? + }; + println!("{}", id); + Ok(()) + }, + ("list-jobs", Some(args)) => { + let verbose = args.is_present("verbose"); + let mut db = Db::open(path)?; + let ids = db.job_ids_vec()?; + if verbose { + for id in ids { + let count = db.get_count(id)?; + let data = db.get_data(id)?; + let data = std::str::from_utf8(&data).unwrap_or(""); + println!("{}\t{}\t{}", id, count, data); + } + } else { + for id in ids { + println!("{}", id); + } + } + Ok(()) + }, + ("get-data", Some(args)) => { + let id = args.value_of("job-id").unwrap().parse().expect("job ids are integers"); + let data = Db::open(path)?.get_data(id)?; + io::stdout().write_all(&data).unwrap(); + Ok(()) + }, + ("get-count", Some(args)) => { + let id = args.value_of("job-id").unwrap().parse().expect("job ids are integers"); + let count = Db::open(path)?.get_count(id)?; + println!("{}", count); + Ok(()) + }, + ("take", Some(args)) => { + let mut db = Db::open(path)?; + let worker = args.value_of("worker-id").unwrap(); + let wait = args.is_present("wait"); + if wait { + todo!("take --wait") + } else { + let data = db.take(worker)?; + if let Some(data) = data { + io::stdout().write_all(&data).unwrap(); + Ok(()) + } else { + std::process::exit(2); + } + } + }, + _ => unreachable!(), + } +}