Skip to content

Commit

Permalink
Merge branch 'main' into jtran/no-is-nan
Browse files Browse the repository at this point in the history
  • Loading branch information
jtran authored Jan 11, 2025
2 parents db6f85b + 013cb10 commit 9031c1d
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/ci-cd-scripts/playwright-electron.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ if [[ ! -f "test-results/.last-run.json" ]]; then
fi

retry=1
max_retrys=4
max_retrys=5

# retry failed tests, doing our own retries because using inbuilt playwright retries causes connection issues
while [[ $retry -le $max_retrys ]]; do
Expand Down
2 changes: 2 additions & 0 deletions e2e/playwright/point-click.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1182,9 +1182,11 @@ shellSketchOnFacesCases.forEach((initialCode, index) => {
commandName: 'Shell',
})
await clickOnCap()
await page.waitForTimeout(500)
await cmdBar.progressCmdBar()
await page.waitForTimeout(500)
await cmdBar.progressCmdBar()
await page.waitForTimeout(500)
await cmdBar.expectState({
stage: 'review',
headerArguments: {
Expand Down
18 changes: 9 additions & 9 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions playwright.electron.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ export default defineConfig({
},
projects: [
{
name: 'Google Chrome',
name: 'chromium',
use: {
...devices['Desktop Chrome'],
channel: 'chrome',
contextOptions: {
/* Chromium is the only one with these permission types */
permissions: ['clipboard-write', 'clipboard-read'],
Expand Down
2 changes: 2 additions & 0 deletions src/wasm-lib/kcl-to-core/src/conn_mock_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,4 +502,6 @@ impl kcl_lib::EngineManager for EngineConnection {
})),
}
}

async fn close(&self) {}
}
74 changes: 58 additions & 16 deletions src/wasm-lib/kcl/src/engine/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ enum SocketHealth {
}

type WebSocketTcpWrite = futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>, WsMsg>;
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct EngineConnection {
engine_req_tx: mpsc::Sender<ToEngineReq>,
shutdown_tx: mpsc::Sender<()>,
responses: Arc<DashMap<uuid::Uuid, WebSocketResponse>>,
pending_errors: Arc<Mutex<Vec<String>>>,
#[allow(dead_code)]
Expand Down Expand Up @@ -130,21 +131,49 @@ struct ToEngineReq {

impl EngineConnection {
/// Start waiting for incoming engine requests, and send each one over the WebSocket to the engine.
async fn start_write_actor(mut tcp_write: WebSocketTcpWrite, mut engine_req_rx: mpsc::Receiver<ToEngineReq>) {
while let Some(req) = engine_req_rx.recv().await {
let ToEngineReq { req, request_sent } = req;
let res = if let WebSocketRequest::ModelingCmdReq(ModelingCmdReq {
cmd: ModelingCmd::ImportFiles { .. },
cmd_id: _,
}) = &req
{
// Send it as binary.
Self::inner_send_to_engine_binary(req, &mut tcp_write).await
} else {
Self::inner_send_to_engine(req, &mut tcp_write).await
};
let _ = request_sent.send(res);
async fn start_write_actor(
mut tcp_write: WebSocketTcpWrite,
mut engine_req_rx: mpsc::Receiver<ToEngineReq>,
mut shutdown_rx: mpsc::Receiver<()>,
) {
loop {
tokio::select! {
maybe_req = engine_req_rx.recv() => {
match maybe_req {
Some(ToEngineReq { req, request_sent }) => {
// Decide whether to send as binary or text,
// then send to the engine.
let res = if let WebSocketRequest::ModelingCmdReq(ModelingCmdReq {
cmd: ModelingCmd::ImportFiles { .. },
cmd_id: _,
}) = &req
{
Self::inner_send_to_engine_binary(req, &mut tcp_write).await
} else {
Self::inner_send_to_engine(req, &mut tcp_write).await
};

// Let the caller know we’ve sent the request (ok or error).
let _ = request_sent.send(res);
}
None => {
// The engine_req_rx channel has closed, so no more requests.
// We'll gracefully exit the loop and close the engine.
break;
}
}
},

// If we get a shutdown signal, close the engine immediately and return.
_ = shutdown_rx.recv() => {
let _ = Self::inner_close_engine(&mut tcp_write).await;
return;
}
}
}

// If we exit the loop (e.g. engine_req_rx was closed),
// still gracefully close the engine before returning.
let _ = Self::inner_close_engine(&mut tcp_write).await;
}

Expand Down Expand Up @@ -194,7 +223,8 @@ impl EngineConnection {

let (tcp_write, tcp_read) = ws_stream.split();
let (engine_req_tx, engine_req_rx) = mpsc::channel(10);
tokio::task::spawn(Self::start_write_actor(tcp_write, engine_req_rx));
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
tokio::task::spawn(Self::start_write_actor(tcp_write, engine_req_rx, shutdown_rx));

let mut tcp_read = TcpRead { stream: tcp_read };

Expand Down Expand Up @@ -304,6 +334,7 @@ impl EngineConnection {

Ok(EngineConnection {
engine_req_tx,
shutdown_tx,
tcp_read_handle: Arc::new(TcpReadHandle {
handle: Arc::new(tcp_read_handle),
}),
Expand Down Expand Up @@ -484,4 +515,15 @@ impl EngineManager for EngineConnection {
fn get_session_data(&self) -> Option<ModelingSessionData> {
self.session_data.lock().unwrap().clone()
}

async fn close(&self) {
let _ = self.shutdown_tx.send(()).await;
loop {
if let Ok(guard) = self.socket_health.lock() {
if *guard == SocketHealth::Inactive {
return;
}
}
}
}
}
2 changes: 2 additions & 0 deletions src/wasm-lib/kcl/src/engine/conn_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,6 @@ impl crate::engine::EngineManager for EngineConnection {
})),
}
}

async fn close(&self) {}
}
3 changes: 3 additions & 0 deletions src/wasm-lib/kcl/src/engine/conn_wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,7 @@ impl crate::engine::EngineManager for EngineConnection {

Ok(ws_result)
}

// maybe we can actually impl this here? not sure how atm.
async fn close(&self) {}
}
3 changes: 3 additions & 0 deletions src/wasm-lib/kcl/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,9 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
fn get_session_data(&self) -> Option<ModelingSessionData> {
None
}

/// Close the engine connection and wait for it to finish.
async fn close(&self);
}

#[derive(Debug, Hash, Eq, Clone, Deserialize, Serialize, PartialEq, ts_rs::TS, JsonSchema)]
Expand Down
22 changes: 16 additions & 6 deletions src/wasm-lib/kcl/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2013,10 +2013,13 @@ impl ExecutorContext {
// AND if we aren't in wasm it doesn't really matter.
Ok(())
}
// Given an old ast, old program memory and new ast, find the parts of the code that need to be
// re-executed.
// This function should never error, because in the case of any internal error, we should just pop
// the cache.
/// Given an old ast, old program memory and new ast, find the parts of the code that need to be
/// re-executed.
/// This function should never error, because in the case of any internal error, we should just pop
/// the cache.
///
/// Returns `None` when there are no changes to the program, i.e. it is
/// fully cached.
pub async fn get_changed_program(&self, info: CacheInformation) -> Option<CacheResult> {
let Some(old) = info.old else {
// We have no old info, we need to re-execute the whole thing.
Expand Down Expand Up @@ -2137,7 +2140,7 @@ impl ExecutorContext {
}
}
std::cmp::Ordering::Equal => {
// currently unreachable, but lets pretend like the code
// currently unreachable, but let's pretend like the code
// above can do something meaningful here for when we get
// to diffing and yanking chunks of the program apart.

Expand Down Expand Up @@ -2236,7 +2239,10 @@ impl ExecutorContext {
)
})?;
// Move the artifact commands to simplify cache management.
exec_state.global.artifact_commands = self.engine.take_artifact_commands();
exec_state
.global
.artifact_commands
.extend(self.engine.take_artifact_commands());
let session_data = self.engine.get_session_data();
Ok(session_data)
}
Expand Down Expand Up @@ -2626,6 +2632,10 @@ impl ExecutorContext {

self.prepare_snapshot().await
}

pub async fn close(&self) {
self.engine.close().await;
}
}

/// For each argument given,
Expand Down
21 changes: 15 additions & 6 deletions src/wasm-lib/kcl/src/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ pub async fn execute_and_snapshot(
) -> Result<image::DynamicImage, ExecError> {
let ctx = new_context(units, true, project_directory).await?;
let program = Program::parse_no_errs(code).map_err(KclErrorWithOutputs::no_outputs)?;
do_execute_and_snapshot(&ctx, program)
let res = do_execute_and_snapshot(&ctx, program)
.await
.map(|(_state, snap)| snap)
.map_err(|err| err.error)
.map_err(|err| err.error);
ctx.close().await;
res
}

/// Executes a kcl program and takes a snapshot of the result.
Expand All @@ -39,14 +41,16 @@ pub async fn execute_and_snapshot_ast(
project_directory: Option<PathBuf>,
) -> Result<(ProgramMemory, Vec<Operation>, Vec<ArtifactCommand>, image::DynamicImage), ExecErrorWithState> {
let ctx = new_context(units, true, project_directory).await?;
do_execute_and_snapshot(&ctx, ast).await.map(|(state, snap)| {
let res = do_execute_and_snapshot(&ctx, ast).await.map(|(state, snap)| {
(
state.mod_local.memory,
state.mod_local.operations,
state.global.artifact_commands,
snap,
)
})
});
ctx.close().await;
res
}

pub async fn execute_and_snapshot_no_auth(
Expand All @@ -56,10 +60,12 @@ pub async fn execute_and_snapshot_no_auth(
) -> Result<image::DynamicImage, ExecError> {
let ctx = new_context(units, false, project_directory).await?;
let program = Program::parse_no_errs(code).map_err(KclErrorWithOutputs::no_outputs)?;
do_execute_and_snapshot(&ctx, program)
let res = do_execute_and_snapshot(&ctx, program)
.await
.map(|(_state, snap)| snap)
.map_err(|err| err.error)
.map_err(|err| err.error);
ctx.close().await;
res
}

async fn do_execute_and_snapshot(
Expand All @@ -80,6 +86,9 @@ async fn do_execute_and_snapshot(
.map_err(|e| ExecError::BadPng(e.to_string()))
.and_then(|x| x.decode().map_err(|e| ExecError::BadPng(e.to_string())))
.map_err(|err| ExecErrorWithState::new(err, exec_state.clone()))?;

ctx.close().await;

Ok((exec_state, img))
}

Expand Down
Loading

0 comments on commit 9031c1d

Please sign in to comment.