mirror of
https://github.com/marcoallegretti/WEFT_OS.git
synced 2026-03-27 01:13:09 +00:00
fix(appd): abort TerminateApp during startup phase promptly
Before this fix, TerminateApp sent while a process was waiting for its READY signal was not acted on until the 30-second timeout fired. abort_rx is now included in the tokio::select! that wraps wait_for_ready, so the child is killed and AppState::Stopped broadcast as soon as the abort is received, regardless of where in the startup sequence it fires. test: supervisor_abort_during_startup_broadcasts_stopped
This commit is contained in:
parent
488900a5db
commit
71597580ba
2 changed files with 68 additions and 4 deletions
|
|
@ -599,6 +599,55 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
#[tokio::test(flavor = "current_thread")]
|
||||||
|
async fn supervisor_abort_during_startup_broadcasts_stopped() {
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
|
|
||||||
|
let script =
|
||||||
|
std::env::temp_dir().join(format!("weft_test_sleep_{}.sh", std::process::id()));
|
||||||
|
std::fs::write(&script, "#!/bin/sh\nsleep 60\n").unwrap();
|
||||||
|
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
||||||
|
|
||||||
|
let prior = std::env::var("WEFT_RUNTIME_BIN").ok();
|
||||||
|
unsafe { std::env::set_var("WEFT_RUNTIME_BIN", &script) };
|
||||||
|
|
||||||
|
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
|
||||||
|
let mut rx = registry.lock().await.subscribe();
|
||||||
|
let session_id = registry.lock().await.launch("test.abort.startup");
|
||||||
|
let abort_rx = registry.lock().await.register_abort(session_id);
|
||||||
|
|
||||||
|
registry.lock().await.terminate(session_id);
|
||||||
|
|
||||||
|
runtime::supervise(
|
||||||
|
session_id,
|
||||||
|
"test.abort.startup",
|
||||||
|
Arc::clone(®istry),
|
||||||
|
abort_rx,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(matches!(
|
||||||
|
registry.lock().await.state(session_id),
|
||||||
|
AppStateKind::NotFound
|
||||||
|
));
|
||||||
|
|
||||||
|
let broadcast = rx.try_recv();
|
||||||
|
assert!(matches!(
|
||||||
|
broadcast,
|
||||||
|
Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped }) if sid == session_id
|
||||||
|
));
|
||||||
|
|
||||||
|
let _ = std::fs::remove_file(&script);
|
||||||
|
unsafe {
|
||||||
|
match prior {
|
||||||
|
Some(v) => std::env::set_var("WEFT_RUNTIME_BIN", v),
|
||||||
|
None => std::env::remove_var("WEFT_RUNTIME_BIN"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
#[tokio::test(flavor = "current_thread")]
|
#[tokio::test(flavor = "current_thread")]
|
||||||
async fn supervisor_spawn_failure_broadcasts_stopped() {
|
async fn supervisor_spawn_failure_broadcasts_stopped() {
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ pub(crate) async fn supervise(
|
||||||
registry: Registry,
|
registry: Registry,
|
||||||
abort_rx: tokio::sync::oneshot::Receiver<()>,
|
abort_rx: tokio::sync::oneshot::Receiver<()>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
let mut abort_rx = abort_rx;
|
||||||
let bin = match std::env::var("WEFT_RUNTIME_BIN") {
|
let bin = match std::env::var("WEFT_RUNTIME_BIN") {
|
||||||
Ok(b) => b,
|
Ok(b) => b,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
|
|
@ -45,10 +46,13 @@ pub(crate) async fn supervise(
|
||||||
let stdout = child.stdout.take().expect("stdout piped");
|
let stdout = child.stdout.take().expect("stdout piped");
|
||||||
let stderr = child.stderr.take().expect("stderr piped");
|
let stderr = child.stderr.take().expect("stderr piped");
|
||||||
|
|
||||||
let ready_result = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)).await;
|
let ready_result = tokio::select! {
|
||||||
|
r = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)) => Some(r),
|
||||||
|
_ = &mut abort_rx => None,
|
||||||
|
};
|
||||||
|
|
||||||
match ready_result {
|
match ready_result {
|
||||||
Ok(Ok(remaining_stdout)) => {
|
Some(Ok(Ok(remaining_stdout))) => {
|
||||||
registry
|
registry
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
|
|
@ -60,10 +64,10 @@ pub(crate) async fn supervise(
|
||||||
tracing::info!(session_id, %app_id, "app ready");
|
tracing::info!(session_id, %app_id, "app ready");
|
||||||
tokio::spawn(drain_stdout(remaining_stdout, session_id));
|
tokio::spawn(drain_stdout(remaining_stdout, session_id));
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Some(Ok(Err(e))) => {
|
||||||
tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY");
|
tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY");
|
||||||
}
|
}
|
||||||
Err(_elapsed) => {
|
Some(Err(_elapsed)) => {
|
||||||
tracing::warn!(session_id, %app_id, "READY timeout after 30s; killing process");
|
tracing::warn!(session_id, %app_id, "READY timeout after 30s; killing process");
|
||||||
let _ = child.kill().await;
|
let _ = child.kill().await;
|
||||||
let mut reg = registry.lock().await;
|
let mut reg = registry.lock().await;
|
||||||
|
|
@ -74,6 +78,17 @@ pub(crate) async fn supervise(
|
||||||
});
|
});
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
None => {
|
||||||
|
tracing::info!(session_id, %app_id, "abort during startup; killing process");
|
||||||
|
let _ = child.kill().await;
|
||||||
|
let mut reg = registry.lock().await;
|
||||||
|
reg.set_state(session_id, AppStateKind::Stopped);
|
||||||
|
let _ = reg.broadcast().send(Response::AppState {
|
||||||
|
session_id,
|
||||||
|
state: AppStateKind::Stopped,
|
||||||
|
});
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::spawn(drain_stderr(stderr, session_id));
|
tokio::spawn(drain_stderr(stderr, session_id));
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue