diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 13da6f8..601cd38 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,6 +3,7 @@ name: CI on: push: branches: ["main", "master"] + tags: ["v*"] pull_request: jobs: @@ -23,3 +24,36 @@ jobs: - name: Test run: cargo test --workspace --verbose + + build-windows-client: + runs-on: ubuntu-latest + if: github.event_name == 'push' + + steps: + - uses: actions/checkout@v4 + + - name: Install Rust (stable) + Windows target + uses: dtolnay/rust-toolchain@stable + with: + targets: x86_64-pc-windows-gnu + + - name: Install MinGW cross-compiler + run: sudo apt-get update && sudo apt-get install -y gcc-mingw-w64-x86-64 + + - name: Cache dependencies + uses: Swatinem/rust-cache@v2 + with: + key: windows-gnu + + - name: Build Windows client (cross-compile) + run: | + cargo build --release --package helios-client --target x86_64-pc-windows-gnu + env: + CARGO_TARGET_X86_64_PC_WINDOWS_GNU_LINKER: x86_64-w64-mingw32-gcc + + - name: Upload Windows client artifact + uses: actions/upload-artifact@v4 + with: + name: helios-remote-client-windows + path: target/x86_64-pc-windows-gnu/release/helios-client.exe + if-no-files-found: error diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index c1b8ecb..6b81376 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -3,8 +3,27 @@ name = "helios-client" version = "0.1.0" edition = "2021" -# Phase 2 — Windows client (not yet implemented) -# See README.md in this crate for the planned implementation. +[[bin]] +name = "helios-client" +path = "src/main.rs" [dependencies] +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = { version = "0.21", features = ["connect"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" helios-common = { path = "../common" } +dirs = "5" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +base64 = "0.22" +png = "0.17" +futures-util = "0.3" + +[target.'cfg(windows)'.dependencies] +windows = { version = "0.54", features = [ + "Win32_Foundation", + "Win32_Graphics_Gdi", + "Win32_UI_Input_KeyboardAndMouse", + "Win32_System_Threading", +] } diff --git a/crates/client/src/input.rs b/crates/client/src/input.rs new file mode 100644 index 0000000..152869d --- /dev/null +++ b/crates/client/src/input.rs @@ -0,0 +1,154 @@ +/// Mouse click and keyboard input via Windows SendInput (or stub on non-Windows). +use helios_common::MouseButton; + +#[cfg(windows)] +pub fn click(x: i32, y: i32, button: &MouseButton) -> Result<(), String> { + use windows::Win32::UI::Input::KeyboardAndMouse::{ + SendInput, INPUT, INPUT_MOUSE, MOUSEEVENTF_ABSOLUTE, MOUSEEVENTF_LEFTDOWN, + MOUSEEVENTF_LEFTUP, MOUSEEVENTF_MIDDLEDOWN, MOUSEEVENTF_MIDDLEUP, MOUSEEVENTF_MOVE, + MOUSEEVENTF_RIGHTDOWN, MOUSEEVENTF_RIGHTUP, MOUSEINPUT, + }; + use windows::Win32::UI::WindowsAndMessaging::{GetSystemMetrics, SM_CXSCREEN, SM_CYSCREEN}; + + unsafe { + let screen_w = GetSystemMetrics(SM_CXSCREEN) as i32; + let screen_h = GetSystemMetrics(SM_CYSCREEN) as i32; + + if screen_w == 0 || screen_h == 0 { + return Err(format!( + "Could not get screen dimensions: {screen_w}x{screen_h}" + )); + } + + // Convert pixel coords to absolute 0-65535 range + let abs_x = ((x * 65535) / screen_w) as i32; + let abs_y = ((y * 65535) / screen_h) as i32; + + let (down_flag, up_flag) = match button { + MouseButton::Left => (MOUSEEVENTF_LEFTDOWN, MOUSEEVENTF_LEFTUP), + MouseButton::Right => (MOUSEEVENTF_RIGHTDOWN, MOUSEEVENTF_RIGHTUP), + MouseButton::Middle => (MOUSEEVENTF_MIDDLEDOWN, MOUSEEVENTF_MIDDLEUP), + }; + + // Move to position + let move_input = INPUT { + r#type: INPUT_MOUSE, + Anonymous: windows::Win32::UI::Input::KeyboardAndMouse::INPUT_0 { + mi: MOUSEINPUT { + dx: abs_x, + dy: abs_y, + mouseData: 0, + dwFlags: MOUSEEVENTF_MOVE | MOUSEEVENTF_ABSOLUTE, + time: 0, + dwExtraInfo: 0, + }, + }, + }; + + let down_input = INPUT { + r#type: INPUT_MOUSE, + Anonymous: windows::Win32::UI::Input::KeyboardAndMouse::INPUT_0 { + mi: MOUSEINPUT { + dx: abs_x, + dy: abs_y, + mouseData: 0, + dwFlags: down_flag | MOUSEEVENTF_ABSOLUTE, + time: 0, + dwExtraInfo: 0, + }, + }, + }; + + let up_input = INPUT { + r#type: INPUT_MOUSE, + Anonymous: windows::Win32::UI::Input::KeyboardAndMouse::INPUT_0 { + mi: MOUSEINPUT { + dx: abs_x, + dy: abs_y, + mouseData: 0, + dwFlags: up_flag | MOUSEEVENTF_ABSOLUTE, + time: 0, + dwExtraInfo: 0, + }, + }, + }; + + let inputs = [move_input, down_input, up_input]; + let result = SendInput(&inputs, std::mem::size_of::() as i32); + + if result != inputs.len() as u32 { + return Err(format!( + "SendInput for click at ({x},{y}) sent {result}/{} events — some may have been blocked by UIPI", + inputs.len() + )); + } + + Ok(()) + } +} + +#[cfg(windows)] +pub fn type_text(text: &str) -> Result<(), String> { + use windows::Win32::UI::Input::KeyboardAndMouse::{ + SendInput, INPUT, INPUT_KEYBOARD, KEYBDINPUT, KEYEVENTF_UNICODE, + }; + + if text.is_empty() { + return Ok(()); + } + + unsafe { + let mut inputs: Vec = Vec::with_capacity(text.len() * 2); + + for ch in text.encode_utf16() { + // Key down + inputs.push(INPUT { + r#type: INPUT_KEYBOARD, + Anonymous: windows::Win32::UI::Input::KeyboardAndMouse::INPUT_0 { + ki: KEYBDINPUT { + wVk: windows::Win32::UI::Input::KeyboardAndMouse::VIRTUAL_KEY(0), + wScan: ch, + dwFlags: KEYEVENTF_UNICODE, + time: 0, + dwExtraInfo: 0, + }, + }, + }); + // Key up + inputs.push(INPUT { + r#type: INPUT_KEYBOARD, + Anonymous: windows::Win32::UI::Input::KeyboardAndMouse::INPUT_0 { + ki: KEYBDINPUT { + wVk: windows::Win32::UI::Input::KeyboardAndMouse::VIRTUAL_KEY(0), + wScan: ch, + dwFlags: KEYEVENTF_UNICODE + | windows::Win32::UI::Input::KeyboardAndMouse::KEYEVENTF_KEYUP, + time: 0, + dwExtraInfo: 0, + }, + }, + }); + } + + let result = SendInput(&inputs, std::mem::size_of::() as i32); + + if result != inputs.len() as u32 { + return Err(format!( + "SendInput for type_text sent {result}/{} events — some may have been blocked (UIPI or secure desktop)", + inputs.len() + )); + } + + Ok(()) + } +} + +#[cfg(not(windows))] +pub fn click(_x: i32, _y: i32, _button: &MouseButton) -> Result<(), String> { + Err("click() is only supported on Windows".to_string()) +} + +#[cfg(not(windows))] +pub fn type_text(_text: &str) -> Result<(), String> { + Err("type_text() is only supported on Windows".to_string()) +} diff --git a/crates/client/src/main.rs b/crates/client/src/main.rs index 8740dca..c3c72b3 100644 --- a/crates/client/src/main.rs +++ b/crates/client/src/main.rs @@ -1,7 +1,279 @@ -// helios-client — Phase 2 (not yet implemented) -// See crates/client/README.md for the planned implementation. +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; -fn main() { - eprintln!("helios-client is not yet implemented. See crates/client/README.md."); - std::process::exit(1); +use futures_util::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tracing::{error, info, warn}; + +use helios_common::{ClientMessage, ServerMessage}; + +mod shell; +mod screenshot; +mod input; + +#[derive(Debug, Serialize, Deserialize)] +struct Config { + relay_url: String, + relay_code: String, + label: Option, +} + +impl Config { + fn config_path() -> PathBuf { + let base = dirs::config_dir() + .or_else(|| dirs::home_dir()) + .unwrap_or_else(|| PathBuf::from(".")); + base.join("helios-remote").join("config.json") + } + + fn load() -> Option { + let path = Self::config_path(); + let data = std::fs::read_to_string(&path).ok()?; + serde_json::from_str(&data).ok() + } + + fn save(&self) -> std::io::Result<()> { + let path = Self::config_path(); + std::fs::create_dir_all(path.parent().unwrap())?; + let data = serde_json::to_string_pretty(self).unwrap(); + std::fs::write(&path, data)?; + Ok(()) + } +} + +fn prompt_config() -> Config { + let relay_url = { + println!("Relay server URL [default: ws://46.225.185.232:8765/ws]: "); + let mut input = String::new(); + std::io::stdin().read_line(&mut input).unwrap(); + let trimmed = input.trim(); + if trimmed.is_empty() { + "ws://46.225.185.232:8765/ws".to_string() + } else { + trimmed.to_string() + } + }; + + let relay_code = { + println!("Enter relay code: "); + let mut input = String::new(); + std::io::stdin().read_line(&mut input).unwrap(); + input.trim().to_string() + }; + + let label = { + println!("Label for this machine (optional, press Enter to skip): "); + let mut input = String::new(); + std::io::stdin().read_line(&mut input).unwrap(); + let trimmed = input.trim().to_string(); + if trimmed.is_empty() { None } else { Some(trimmed) } + }; + + Config { relay_url, relay_code, label } +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt() + .with_env_filter( + std::env::var("RUST_LOG") + .unwrap_or_else(|_| "helios_client=info".to_string()), + ) + .init(); + + // Load or prompt for config + let config = match Config::load() { + Some(c) => { + info!("Loaded config from {:?}", Config::config_path()); + c + } + None => { + info!("No config found — prompting for setup"); + let c = prompt_config(); + if let Err(e) = c.save() { + error!("Failed to save config: {e}"); + } else { + info!("Config saved to {:?}", Config::config_path()); + } + c + } + }; + + let config = Arc::new(config); + let shell = Arc::new(Mutex::new(shell::PersistentShell::new())); + + // Connect with exponential backoff + let mut backoff = Duration::from_secs(1); + const MAX_BACKOFF: Duration = Duration::from_secs(30); + + loop { + info!("Connecting to {}", config.relay_url); + match connect_async(&config.relay_url).await { + Ok((ws_stream, _)) => { + info!("Connected!"); + backoff = Duration::from_secs(1); // reset on success + + let (mut write, mut read) = ws_stream.split(); + + // Send Hello + let hello = ClientMessage::Hello { + label: config.label.clone(), + }; + let hello_json = serde_json::to_string(&hello).unwrap(); + if let Err(e) = write.send(Message::Text(hello_json)).await { + error!("Failed to send Hello: {e}"); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(MAX_BACKOFF); + continue; + } + + // Shared write half + let write = Arc::new(Mutex::new(write)); + + // Process messages + while let Some(msg_result) = read.next().await { + match msg_result { + Ok(Message::Text(text)) => { + let server_msg: ServerMessage = match serde_json::from_str(&text) { + Ok(m) => m, + Err(e) => { + warn!("Failed to parse server message: {e}\nRaw: {text}"); + continue; + } + }; + + let write_clone = Arc::clone(&write); + let shell_clone = Arc::clone(&shell); + + tokio::spawn(async move { + let response = handle_message(server_msg, shell_clone).await; + let json = serde_json::to_string(&response).unwrap(); + let mut w = write_clone.lock().await; + if let Err(e) = w.send(Message::Text(json)).await { + error!("Failed to send response: {e}"); + } + }); + } + Ok(Message::Ping(data)) => { + let mut w = write.lock().await; + let _ = w.send(Message::Pong(data)).await; + } + Ok(Message::Close(_)) => { + info!("Server closed connection"); + break; + } + Err(e) => { + error!("WebSocket error: {e}"); + break; + } + _ => {} + } + } + + warn!("Disconnected. Reconnecting in {:?}...", backoff); + } + Err(e) => { + error!("Connection failed: {e}"); + } + } + + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(MAX_BACKOFF); + } +} + +async fn handle_message( + msg: ServerMessage, + shell: Arc>, +) -> ClientMessage { + match msg { + ServerMessage::ScreenshotRequest { request_id } => { + match screenshot::take_screenshot() { + Ok((image_base64, width, height)) => ClientMessage::ScreenshotResponse { + request_id, + image_base64, + width, + height, + }, + Err(e) => { + error!("Screenshot failed: {e}"); + ClientMessage::Error { + request_id, + message: format!("Screenshot failed: {e}"), + } + } + } + } + + ServerMessage::ExecRequest { request_id, command } => { + info!("Exec: {command}"); + let mut sh = shell.lock().await; + match sh.run(&command).await { + Ok((stdout, stderr, exit_code)) => ClientMessage::ExecResponse { + request_id, + stdout, + stderr, + exit_code, + }, + Err(e) => { + error!("Exec failed for command {:?}: {e}", command); + ClientMessage::Error { + request_id, + message: format!( + "Exec failed for command {:?}.\nError: {e}\nContext: persistent shell may have died.", + command + ), + } + } + } + } + + ServerMessage::ClickRequest { request_id, x, y, button } => { + info!("Click: ({x},{y}) {:?}", button); + match input::click(x, y, &button) { + Ok(()) => ClientMessage::Ack { request_id }, + Err(e) => { + error!("Click failed at ({x},{y}): {e}"); + ClientMessage::Error { + request_id, + message: format!("Click at ({x},{y}) failed: {e}"), + } + } + } + } + + ServerMessage::TypeRequest { request_id, text } => { + info!("Type: {} chars", text.len()); + match input::type_text(&text) { + Ok(()) => ClientMessage::Ack { request_id }, + Err(e) => { + error!("Type failed: {e}"); + ClientMessage::Error { + request_id, + message: format!("Type failed: {e}"), + } + } + } + } + + ServerMessage::Ack { request_id } => { + info!("Server ack for {request_id}"); + // Nothing to do - server acked something we sent + ClientMessage::Ack { request_id } + } + + ServerMessage::Error { request_id, message } => { + error!("Server error (req={request_id:?}): {message}"); + // No meaningful response needed but we need to return something + // Use a dummy ack if we have a request_id + if let Some(rid) = request_id { + ClientMessage::Ack { request_id: rid } + } else { + ClientMessage::Hello { label: None } + } + } + } } diff --git a/crates/client/src/screenshot.rs b/crates/client/src/screenshot.rs new file mode 100644 index 0000000..d867ea4 --- /dev/null +++ b/crates/client/src/screenshot.rs @@ -0,0 +1,146 @@ +/// Screenshot capture — Windows GDI on Windows, stub on other platforms. +use base64::Engine; + +#[cfg(windows)] +pub fn take_screenshot() -> Result<(String, u32, u32), String> { + use windows::Win32::Foundation::RECT; + use windows::Win32::Graphics::Gdi::{ + BitBlt, CreateCompatibleBitmap, CreateCompatibleDC, DeleteDC, DeleteObject, + GetDIBits, GetObjectW, SelectObject, BITMAP, BITMAPINFO, BITMAPINFOHEADER, + DIB_RGB_COLORS, SRCCOPY, + }; + use windows::Win32::UI::WindowsAndMessaging::GetDesktopWindow; + use windows::Win32::Graphics::Gdi::GetWindowDC; + use windows::Win32::Graphics::Gdi::ReleaseDC; + + unsafe { + let hwnd = GetDesktopWindow(); + let hdc_screen = GetWindowDC(hwnd); + if hdc_screen.is_invalid() { + return Err("GetWindowDC failed — cannot capture screen".to_string()); + } + + // Get screen dimensions + use windows::Win32::Graphics::Gdi::{GetDeviceCaps, HORZRES, VERTRES}; + let width = GetDeviceCaps(hdc_screen, HORZRES) as u32; + let height = GetDeviceCaps(hdc_screen, VERTRES) as u32; + + if width == 0 || height == 0 { + ReleaseDC(hwnd, hdc_screen); + return Err(format!("Invalid screen dimensions: {width}x{height}")); + } + + // Create compatible DC and bitmap + let hdc_mem = CreateCompatibleDC(hdc_screen); + if hdc_mem.is_invalid() { + ReleaseDC(hwnd, hdc_screen); + return Err("CreateCompatibleDC failed".to_string()); + } + + let hbm = CreateCompatibleBitmap(hdc_screen, width as i32, height as i32); + if hbm.is_invalid() { + DeleteDC(hdc_mem); + ReleaseDC(hwnd, hdc_screen); + return Err("CreateCompatibleBitmap failed".to_string()); + } + + let old_obj = SelectObject(hdc_mem, hbm); + + // BitBlt the screen into our bitmap + let blt_result = BitBlt( + hdc_mem, + 0, 0, + width as i32, height as i32, + hdc_screen, + 0, 0, + SRCCOPY, + ); + + if blt_result.is_err() { + SelectObject(hdc_mem, old_obj); + DeleteObject(hbm); + DeleteDC(hdc_mem); + ReleaseDC(hwnd, hdc_screen); + return Err("BitBlt failed — could not copy screen pixels".to_string()); + } + + // Get raw pixel data via GetDIBits + let mut bmi = BITMAPINFO { + bmiHeader: BITMAPINFOHEADER { + biSize: std::mem::size_of::() as u32, + biWidth: width as i32, + biHeight: -(height as i32), // negative = top-down + biPlanes: 1, + biBitCount: 32, + biCompression: 0, // BI_RGB + biSizeImage: 0, + biXPelsPerMeter: 0, + biYPelsPerMeter: 0, + biClrUsed: 0, + biClrImportant: 0, + }, + bmiColors: [Default::default()], + }; + + let buf_size = (width * height * 4) as usize; + let mut pixel_buf: Vec = vec![0u8; buf_size]; + + let lines = GetDIBits( + hdc_mem, + hbm, + 0, + height, + Some(pixel_buf.as_mut_ptr() as *mut _), + &mut bmi, + DIB_RGB_COLORS, + ); + + SelectObject(hdc_mem, old_obj); + DeleteObject(hbm); + DeleteDC(hdc_mem); + ReleaseDC(hwnd, hdc_screen); + + if lines == 0 { + return Err(format!("GetDIBits failed — returned 0 scan lines (expected {height})")); + } + + // Convert BGRA → RGBA + for chunk in pixel_buf.chunks_exact_mut(4) { + chunk.swap(0, 2); // B <-> R + } + + // Encode as PNG + let png_bytes = encode_png(&pixel_buf, width, height)?; + let b64 = base64::engine::general_purpose::STANDARD.encode(&png_bytes); + + Ok((b64, width, height)) + } +} + +#[cfg(not(windows))] +pub fn take_screenshot() -> Result<(String, u32, u32), String> { + // Stub for non-Windows builds + // In a real scenario, could use X11/scrot on Linux + let width = 1u32; + let height = 1u32; + let pixel_data = vec![0u8, 0u8, 0u8, 255u8]; // single black pixel RGBA + let png_bytes = encode_png(&pixel_data, width, height)?; + let b64 = base64::engine::general_purpose::STANDARD.encode(&png_bytes); + Ok((b64, width, height)) +} + +fn encode_png(rgba: &[u8], width: u32, height: u32) -> Result, String> { + let mut buf = Vec::new(); + { + let mut encoder = png::Encoder::new(&mut buf, width, height); + encoder.set_color(png::ColorType::Rgba); + encoder.set_depth(png::BitDepth::Eight); + let mut writer = encoder + .write_header() + .map_err(|e| format!("PNG header error: {e}"))?; + writer + .write_image_data(rgba) + .map_err(|e| format!("PNG write error: {e}"))?; + } + Ok(buf) +} diff --git a/crates/client/src/shell.rs b/crates/client/src/shell.rs new file mode 100644 index 0000000..39addca --- /dev/null +++ b/crates/client/src/shell.rs @@ -0,0 +1,161 @@ +/// Persistent shell session that keeps a cmd.exe (Windows) or sh (Unix) alive +/// between commands, so state like `cd` is preserved. +use std::process::Stdio; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStdin, ChildStdout, ChildStderr}; +use tracing::{debug, warn}; + +const OUTPUT_TIMEOUT_MS: u64 = 10_000; +/// Unique sentinel appended after every command to know when output is done. +const SENTINEL: &str = "__HELIOS_DONE__"; + +pub struct PersistentShell { + child: Option, +} + +struct ShellProcess { + _child: Child, + stdin: ChildStdin, + stdout_lines: tokio::sync::Mutex>, + stderr_lines: tokio::sync::Mutex>, +} + +impl PersistentShell { + pub fn new() -> Self { + Self { child: None } + } + + async fn spawn(&mut self) -> Result<(), String> { + #[cfg(windows)] + let (program, args) = ("cmd.exe", vec!["/Q"]); + #[cfg(not(windows))] + let (program, args) = ("sh", vec!["-s"]); + + let mut cmd = tokio::process::Command::new(program); + for arg in &args { + cmd.arg(arg); + } + cmd.stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true); + + let mut child = cmd + .spawn() + .map_err(|e| format!("Failed to spawn shell '{program}': {e}"))?; + + let stdin = child.stdin.take().ok_or("no stdin")?; + let stdout = child.stdout.take().ok_or("no stdout")?; + let stderr = child.stderr.take().ok_or("no stderr")?; + + self.child = Some(ShellProcess { + _child: child, + stdin, + stdout_lines: tokio::sync::Mutex::new(BufReader::new(stdout)), + stderr_lines: tokio::sync::Mutex::new(BufReader::new(stderr)), + }); + + Ok(()) + } + + /// Run a command in the persistent shell, returning (stdout, stderr, exit_code). + /// exit_code is always 0 for intermediate commands; we read the exit code via `echo %ERRORLEVEL%`. + pub async fn run(&mut self, command: &str) -> Result<(String, String, i32), String> { + // Restart shell if it died + if self.child.is_none() { + self.spawn().await?; + } + + let result = self.run_inner(command).await; + + match result { + Ok(r) => Ok(r), + Err(e) => { + // Shell probably died — drop it and report error + warn!("Shell error, will respawn next time: {e}"); + self.child = None; + Err(e) + } + } + } + + async fn run_inner(&mut self, command: &str) -> Result<(String, String, i32), String> { + let shell = self.child.as_mut().ok_or("no shell")?; + + // Write command + sentinel echo to stdin + #[cfg(windows)] + let cmd_line = format!("{command}\r\necho {SENTINEL}%ERRORLEVEL%\r\n"); + #[cfg(not(windows))] + let cmd_line = format!("{command}\necho {SENTINEL}$?\n"); + + debug!("Shell input: {cmd_line:?}"); + + shell + .stdin + .write_all(cmd_line.as_bytes()) + .await + .map_err(|e| format!("Failed to write to shell stdin: {e}"))?; + shell + .stdin + .flush() + .await + .map_err(|e| format!("Failed to flush shell stdin: {e}"))?; + + // Read stdout until we see the sentinel line + let mut stdout_buf = String::new(); + #[allow(unused_assignments)] + let mut exit_code = 0i32; + + let timeout = tokio::time::Duration::from_millis(OUTPUT_TIMEOUT_MS); + + { + let mut reader = shell.stdout_lines.lock().await; + loop { + let mut line = String::new(); + let read_fut = reader.read_line(&mut line); + match tokio::time::timeout(timeout, read_fut).await { + Ok(Ok(0)) => { + return Err("Shell stdout EOF — process likely died".to_string()); + } + Ok(Ok(_)) => { + debug!("stdout line: {line:?}"); + if line.trim_end().starts_with(SENTINEL) { + // Parse exit code from sentinel line + let code_str = line.trim_end().trim_start_matches(SENTINEL); + exit_code = code_str.trim().parse().unwrap_or(0); + break; + } else { + stdout_buf.push_str(&line); + } + } + Ok(Err(e)) => { + return Err(format!("Shell stdout read error: {e}")); + } + Err(_) => { + return Err(format!( + "Shell stdout timed out after {}ms waiting for command to finish.\nCommand: {command}\nOutput so far: {stdout_buf}", + OUTPUT_TIMEOUT_MS + )); + } + } + } + } + + // Drain available stderr (non-blocking) + let mut stderr_buf = String::new(); + { + let mut reader = shell.stderr_lines.lock().await; + let drain_timeout = tokio::time::Duration::from_millis(100); + loop { + let mut line = String::new(); + match tokio::time::timeout(drain_timeout, reader.read_line(&mut line)).await { + Ok(Ok(0)) | Err(_) => break, + Ok(Ok(_)) => stderr_buf.push_str(&line), + Ok(Err(_)) => break, + } + } + } + + Ok((stdout_buf, stderr_buf, exit_code)) + } +}