From e9fc182cb780c496451a1c7e70381c47abed8e7e Mon Sep 17 00:00:00 2001 From: NaturalSelect <2145973003@qq.com> Date: Sat, 9 Dec 2023 19:43:52 +0800 Subject: [PATCH] feat(schedule): add straw2 algo Signed-off-by: NaturalSelect <2145973003@qq.com> --- Cargo.lock | 1 + Cargo.toml | 3 ++- src/schedule/master.rs | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 0a07a71..96c9115 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2325,6 +2325,7 @@ dependencies = [ "prost 0.11.9", "prost-build", "qp2p", + "rand 0.8.5", "regex", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 53317aa..ef8bded 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -qp2p = "0.36.4" #{ path = "qp2p" } +qp2p = "0.36.4" #{ path = "qp2p" } tokio = { version = "1.32.0", features = ["full"] } thiserror = "1.0.50" async-trait = "0.1.74" @@ -39,6 +39,7 @@ wasmedge-sdk = { version = "0.10.1" } async-channel = "2.1.0" sysinfo = "0.29.10" ssh2 = "0.9.4" +rand = "0.8.5" # slog-envlogger = { version = "2.1.0", optional = true } diff --git a/src/schedule/master.rs b/src/schedule/master.rs index 94677cd..0043a06 100644 --- a/src/schedule/master.rs +++ b/src/schedule/master.rs @@ -1,11 +1,14 @@ use std::{collections::hash_map::DefaultHasher, hash::Hasher}; +use async_raft::NodeId; use async_trait::async_trait; use axum::{ http::StatusCode, response::{IntoResponse, Redirect, Response}, }; +use rand::rngs::ThreadRng; use ws_derive::LogicalModule; +// use use super::{executor::Executor, http_handler::RequestHandler}; use crate::{ @@ -24,6 +27,42 @@ pub struct ScheMaster { // view: ScheMasterView, } +trait NodeWeighteFetcher { + // NOTE: get weight return node weight + // larger is better + fn get_node_weight(id:NodeId) -> f64; +} + +struct StrawNodeSelector { + weight_fetcher:Box, + rng:ThreadRng +} + +impl StrawNodeSelector { + fn new(weight_fetcher: Box) -> Self { Self { weight_fetcher,rng:rand::thread_rng() } } +} + +// NOTE: Straw2 algorithm +impl NodeSelector for StrawNodeSelector { + fn select_node(&self, all_node_cnt: usize, fn_name: &str) -> NodeID { + // NOTE: 1 is an impossible value for straw + let mut max_straw:f64 = 1; + let mut node_id:NodeID = 1; + let range = Uniform::new(0, 65536); + // NOTE: node id is [1,all_node_cnt] + for i in 1..all_node_cnt+1 { + let weight = self.weight_fetcher.get_node_weight(i); + let mut straw:f64 = range.sample(&mut self.rng); + straw = (straw / 65536).ln() / weight; + if max_straw == 1 || max_straw < straw { + max_straw = straw; + node_id = i; + } + } + return node_id; + } +} + trait NodeSelector: Send + Sync + 'static { fn select_node(&self, all_node_cnt: usize, fn_name: &str) -> NodeID; }