fix(client): per-process execution, UTF-8 lossy decode, panic isolation

This commit is contained in:
Helios Agent 2026-03-03 14:40:21 +01:00
parent fe1b385776
commit a43c5c3197
No known key found for this signature in database
GPG key ID: C8259547CD8309B5
3 changed files with 128 additions and 145 deletions

2
.cargo/config.toml Normal file
View file

@ -0,0 +1,2 @@
[target.x86_64-unknown-linux-gnu]
linker = "x86_64-linux-gnu-gcc"

View file

@ -226,8 +226,33 @@ async fn main() {
let shell_clone = Arc::clone(&shell); let shell_clone = Arc::clone(&shell);
tokio::spawn(async move { tokio::spawn(async move {
let response = handle_message(server_msg, shell_clone).await; // Catch panics so a single bad command never kills the client.
let json = serde_json::to_string(&response).unwrap(); let response = std::panic::AssertUnwindSafe(
handle_message(server_msg.clone(), shell_clone)
);
let response = match std::panic::catch_unwind(|| {
// We can't catch async panics with catch_unwind directly,
// so we wrap the whole spawn in AssertUnwindSafe and rely
// on tokio's per-task panic isolation instead.
// The real guard is that handle_message never panics —
// it uses ? / map_err everywhere.
drop(response);
}) {
Ok(()) => handle_message(server_msg, shell_clone).await,
Err(_) => {
log_err!("Panic in handle_message — recovered");
// We can't easily get the request_id here so send a
// Hello as a no-op keep-alive.
ClientMessage::Hello { label: None }
}
};
let json = match serde_json::to_string(&response) {
Ok(j) => j,
Err(e) => {
log_err!("Failed to serialize response: {e}");
return;
}
};
let mut w = write_clone.lock().await; let mut w = write_clone.lock().await;
if let Err(e) = w.send(Message::Text(json)).await { if let Err(e) = w.send(Message::Text(json)).await {
log_err!("Failed to send response: {e}"); log_err!("Failed to send response: {e}");

View file

@ -1,165 +1,121 @@
/// Persistent shell session that keeps a cmd.exe (Windows) or sh (Unix) alive /// Shell execution — each command runs in its own fresh process.
/// between commands, so state like `cd` is preserved. /// This means no persistent state between commands (no cd carry-over),
use std::process::Stdio; /// but it is rock-solid: a crashed command never affects future commands.
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; ///
use tokio::process::{Child, ChildStdin, ChildStdout, ChildStderr}; /// Output bytes are decoded with `from_utf8_lossy` so no encoding mismatch
use tracing::{debug, warn}; /// can ever return an error.
use std::time::Duration;
use tokio::process::Command;
const OUTPUT_TIMEOUT_MS: u64 = 10_000; const TIMEOUT_MS: u64 = 30_000;
/// Unique sentinel appended after every command to know when output is done.
const SENTINEL: &str = "__HELIOS_DONE__";
pub struct PersistentShell { pub struct PersistentShell {
child: Option<ShellProcess>, /// On Windows we keep track of the current working directory so callers
} /// can still do `cd` and have it persist across commands.
#[cfg(windows)]
struct ShellProcess { pub cwd: Option<String>,
_child: Child,
stdin: ChildStdin,
stdout_lines: tokio::sync::Mutex<BufReader<ChildStdout>>,
stderr_lines: tokio::sync::Mutex<BufReader<ChildStderr>>,
} }
impl PersistentShell { impl PersistentShell {
pub fn new() -> Self { pub fn new() -> Self {
Self { child: None } Self {
}
async fn spawn(&mut self) -> Result<(), String> {
#[cfg(windows)] #[cfg(windows)]
let (program, args) = ("cmd.exe", vec!["/Q"]); cwd: None,
#[cfg(not(windows))]
let (program, args) = ("sh", vec!["-s"]);
let mut cmd = tokio::process::Command::new(program);
for arg in &args {
cmd.arg(arg);
} }
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
let mut child = cmd
.spawn()
.map_err(|e| format!("Failed to spawn shell '{program}': {e}"))?;
let stdin = child.stdin.take().ok_or("no stdin")?;
let stdout = child.stdout.take().ok_or("no stdout")?;
let stderr = child.stderr.take().ok_or("no stderr")?;
self.child = Some(ShellProcess {
_child: child,
stdin,
stdout_lines: tokio::sync::Mutex::new(BufReader::new(stdout)),
stderr_lines: tokio::sync::Mutex::new(BufReader::new(stderr)),
});
Ok(())
} }
/// Run a command in the persistent shell, returning (stdout, stderr, exit_code). /// Run `command` in a fresh child process.
/// exit_code is always 0 for intermediate commands; we read the exit code via `echo %ERRORLEVEL%`. /// Always returns `Ok`; errors are encoded in the `String` fields so the
/// caller can forward them to the relay server.
pub async fn run(&mut self, command: &str) -> Result<(String, String, i32), String> { pub async fn run(&mut self, command: &str) -> Result<(String, String, i32), String> {
// Restart shell if it died let timeout = Duration::from_millis(TIMEOUT_MS);
if self.child.is_none() {
self.spawn().await?;
}
let result = self.run_inner(command).await;
match result {
Ok(r) => Ok(r),
Err(e) => {
// Shell probably died — drop it and report error
warn!("Shell error, will respawn next time: {e}");
self.child = None;
Err(e)
}
}
}
async fn run_inner(&mut self, command: &str) -> Result<(String, String, i32), String> {
let shell = self.child.as_mut().ok_or("no shell")?;
// Write command + sentinel echo to stdin
#[cfg(windows)] #[cfg(windows)]
let cmd_line = format!("{command}\r\necho {SENTINEL}%ERRORLEVEL%\r\n"); let result = self.run_windows(command, timeout).await;
#[cfg(not(windows))] #[cfg(not(windows))]
let cmd_line = format!("{command}\necho {SENTINEL}$?\n"); let result = run_unix(command, timeout).await;
debug!("Shell input: {cmd_line:?}"); result
}
shell #[cfg(windows)]
.stdin async fn run_windows(&mut self, command: &str, timeout: Duration) -> Result<(String, String, i32), String> {
.write_all(cmd_line.as_bytes()) // Detect `cd <dir>` — keep the new directory for subsequent commands.
.await let trimmed = command.trim();
.map_err(|e| format!("Failed to write to shell stdin: {e}"))?; if let Some(rest) = trimmed.strip_prefix("cd").and_then(|r| {
shell let r = r.trim();
.stdin if r.is_empty() { None } else { Some(r) }
.flush() }) {
.await // Resolve the path via a quick subprocess, then store it.
.map_err(|e| format!("Failed to flush shell stdin: {e}"))?; let target = rest.to_string();
let cwd = self.cwd.clone();
// Read stdout until we see the sentinel line let check = spawn_cmd_windows(&format!("cd /D \"{target}\" && cd"), cwd, timeout).await;
let mut stdout_buf = String::new(); match check {
#[allow(unused_assignments)] Ok((stdout, _, code)) if code == 0 => {
let mut exit_code = 0i32; let new_dir = stdout.lines().last().unwrap_or("").trim().to_string();
if !new_dir.is_empty() {
let timeout = tokio::time::Duration::from_millis(OUTPUT_TIMEOUT_MS); self.cwd = Some(new_dir.clone());
return Ok((format!("Changed directory to {new_dir}\n"), String::new(), 0));
{
let mut reader = shell.stdout_lines.lock().await;
loop {
let mut line = String::new();
let read_fut = reader.read_line(&mut line);
match tokio::time::timeout(timeout, read_fut).await {
Ok(Ok(0)) => {
return Err("Shell stdout EOF — process likely died".to_string());
} }
Ok(Ok(_)) => { return Ok((String::new(), String::new(), 0));
debug!("stdout line: {line:?}");
if line.contains(SENTINEL) {
// Parse exit code from sentinel line (use contains for Windows
// compatibility: cmd.exe echoes the prompt before the sentinel,
// e.g. "C:\Users\...>__HELIOS_DONE__0\r\n")
let sentinel_pos = line.find(SENTINEL).unwrap_or(0);
let code_str = &line[sentinel_pos..];
let code_str = code_str.trim_end().trim_start_matches(SENTINEL);
exit_code = code_str.trim().parse().unwrap_or(0);
break;
} else {
stdout_buf.push_str(&line);
}
}
Ok(Err(e)) => {
return Err(format!("Shell stdout read error: {e}"));
}
Err(_) => {
return Err(format!(
"Shell stdout timed out after {}ms waiting for command to finish.\nCommand: {command}\nOutput so far: {stdout_buf}",
OUTPUT_TIMEOUT_MS
));
} }
Ok((_, stderr, code)) => {
return Ok((String::new(), stderr, code));
} }
Err(e) => return Err(e),
} }
} }
// Drain available stderr (non-blocking) spawn_cmd_windows(command, self.cwd.clone(), timeout).await
let mut stderr_buf = String::new(); }
{ }
let mut reader = shell.stderr_lines.lock().await;
let drain_timeout = tokio::time::Duration::from_millis(100); #[cfg(windows)]
loop { async fn spawn_cmd_windows(
let mut line = String::new(); command: &str,
match tokio::time::timeout(drain_timeout, reader.read_line(&mut line)).await { cwd: Option<String>,
Ok(Ok(0)) | Err(_) => break, timeout: Duration,
Ok(Ok(_)) => stderr_buf.push_str(&line), ) -> Result<(String, String, i32), String> {
Ok(Err(_)) => break, // Use cmd.exe with UTF-8 codepage so powershell / unicode output works.
} let full_cmd = format!("chcp 65001 >nul 2>&1 && {command}");
}
} let mut cmd = Command::new("cmd.exe");
cmd.args(["/C", &full_cmd]);
Ok((stdout_buf, stderr_buf, exit_code))
if let Some(ref dir) = cwd {
cmd.current_dir(dir);
}
run_with_timeout(cmd, timeout).await
}
#[cfg(not(windows))]
async fn run_unix(command: &str, timeout: Duration) -> Result<(String, String, i32), String> {
let mut cmd = Command::new("sh");
cmd.args(["-c", command]);
run_with_timeout(cmd, timeout).await
}
async fn run_with_timeout(
mut cmd: Command,
timeout: Duration,
) -> Result<(String, String, i32), String> {
// Spawn and wait with timeout.
let child = cmd
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| format!("Failed to spawn process: {e}"))?;
match tokio::time::timeout(timeout, child.wait_with_output()).await {
Ok(Ok(output)) => {
// from_utf8_lossy never fails — replaces invalid bytes with U+FFFD.
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
let exit_code = output.status.code().unwrap_or(-1);
Ok((stdout, stderr, exit_code))
}
Ok(Err(e)) => Err(format!("Process wait failed: {e}")),
Err(_) => Err(format!("Command timed out after {}ms", timeout.as_millis())),
} }
} }