use std::collections::BTreeMap; use std::io; use std::process::Stdio; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use serde_json::Value as JsonRpcMessage; use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport}; #[derive(Debug)] #[allow(dead_code)] pub struct McpStdioProcess { child: Child, stdin: ChildStdin, stdout: BufReader, } #[allow(dead_code)] impl McpStdioProcess { pub fn spawn(transport: &McpStdioTransport) -> io::Result { let mut command = Command::new(&transport.command); command .args(&transport.args) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()); apply_env(&mut command, &transport.env); let mut child = command.spawn()?; let stdin = child .stdin .take() .ok_or_else(|| io::Error::other("stdio MCP process missing stdin pipe"))?; let stdout = child .stdout .take() .ok_or_else(|| io::Error::other("stdio MCP process missing stdout pipe"))?; Ok(Self { child, stdin, stdout: BufReader::new(stdout), }) } pub async fn write_all(&mut self, bytes: &[u8]) -> io::Result<()> { self.stdin.write_all(bytes).await } pub async fn flush(&mut self) -> io::Result<()> { self.stdin.flush().await } pub async fn read_available(&mut self) -> io::Result> { let mut buffer = vec![0_u8; 4096]; let read = self.stdout.read(&mut buffer).await?; buffer.truncate(read); Ok(buffer) } pub async fn write_jsonrpc_message(&mut self, message: &JsonRpcMessage) -> io::Result<()> { let encoded = encode_jsonrpc_message(message)?; self.write_all(&encoded).await?; self.flush().await } pub async fn read_jsonrpc_message(&mut self) -> io::Result { let payload = self.read_frame().await?; serde_json::from_slice(&payload) .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error)) } async fn read_frame(&mut self) -> io::Result> { let mut content_length = None; loop { let mut line = String::new(); let bytes_read = self.stdout.read_line(&mut line).await?; if bytes_read == 0 { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, "MCP stdio stream closed while reading headers", )); } if line == "\r\n" { break; } if let Some(value) = line.strip_prefix("Content-Length:") { let parsed = value .trim() .parse::() .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?; content_length = Some(parsed); } } let content_length = content_length.ok_or_else(|| { io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header") })?; let mut payload = vec![0_u8; content_length]; self.stdout.read_exact(&mut payload).await?; Ok(payload) } pub async fn terminate(&mut self) -> io::Result<()> { self.child.kill().await } pub async fn wait(&mut self) -> io::Result { self.child.wait().await } } #[allow(dead_code)] pub fn spawn_mcp_stdio_process(bootstrap: &McpClientBootstrap) -> io::Result { match &bootstrap.transport { McpClientTransport::Stdio(transport) => McpStdioProcess::spawn(transport), other => Err(io::Error::new( io::ErrorKind::InvalidInput, format!( "MCP bootstrap transport for {} is not stdio: {other:?}", bootstrap.server_name ), )), } } #[allow(dead_code)] fn apply_env(command: &mut Command, env: &BTreeMap) { for (key, value) in env { command.env(key, value); } } fn encode_jsonrpc_message(message: &JsonRpcMessage) -> io::Result> { let body = serde_json::to_vec(message) .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?; let header = format!("Content-Length: {}\r\n\r\n", body.len()); let mut framed = header.into_bytes(); framed.extend(body); Ok(framed) } #[cfg(test)] mod tests { use std::collections::BTreeMap; use std::fs; use std::io::ErrorKind; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::runtime::Builder; use crate::config::{ ConfigSource, McpServerConfig, McpStdioServerConfig, ScopedMcpServerConfig, }; use crate::mcp_client::McpClientBootstrap; use super::{spawn_mcp_stdio_process, McpStdioProcess}; fn temp_dir() -> PathBuf { let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("time should be after epoch") .as_nanos(); std::env::temp_dir().join(format!("runtime-mcp-stdio-{nanos}")) } fn write_echo_script() -> PathBuf { let root = temp_dir(); fs::create_dir_all(&root).expect("temp dir"); let script_path = root.join("echo-mcp.sh"); fs::write( &script_path, "#!/bin/sh\nprintf 'READY:%s\\n' \"$MCP_TEST_TOKEN\"\nIFS= read -r line\nprintf 'ECHO:%s\\n' \"$line\"\n", ) .expect("write script"); let mut permissions = fs::metadata(&script_path).expect("metadata").permissions(); permissions.set_mode(0o755); fs::set_permissions(&script_path, permissions).expect("chmod"); script_path } fn write_jsonrpc_script() -> PathBuf { let root = temp_dir(); fs::create_dir_all(&root).expect("temp dir"); let script_path = root.join("jsonrpc-mcp.py"); let script = [ "#!/usr/bin/env python3", "import json, sys", "header = b''", r"while not header.endswith(b'\r\n\r\n'):", " chunk = sys.stdin.buffer.read(1)", " if not chunk:", " raise SystemExit(1)", " header += chunk", "length = 0", r"for line in header.decode().split('\r\n'):", r" if line.lower().startswith('content-length:'):", r" length = int(line.split(':', 1)[1].strip())", "payload = sys.stdin.buffer.read(length)", "json.loads(payload.decode())", r"response = json.dumps({'jsonrpc': '2.0', 'id': 1, 'result': {'echo': True}}).encode()", r"sys.stdout.buffer.write(f'Content-Length: {len(response)}\r\n\r\n'.encode() + response)", "sys.stdout.buffer.flush()", "", ] .join("\n"); fs::write(&script_path, script).expect("write script"); let mut permissions = fs::metadata(&script_path).expect("metadata").permissions(); permissions.set_mode(0o755); fs::set_permissions(&script_path, permissions).expect("chmod"); script_path } fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap { let config = ScopedMcpServerConfig { scope: ConfigSource::Local, config: McpServerConfig::Stdio(McpStdioServerConfig { command: script_path.to_string_lossy().into_owned(), args: Vec::new(), env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "secret-value".to_string())]), }), }; McpClientBootstrap::from_scoped_config("stdio server", &config) } #[test] fn spawns_stdio_process_and_round_trips_io() { let runtime = Builder::new_current_thread() .enable_all() .build() .expect("runtime"); runtime.block_on(async { let script_path = write_echo_script(); let bootstrap = sample_bootstrap(&script_path); let mut process = spawn_mcp_stdio_process(&bootstrap).expect("spawn stdio process"); let ready = process.read_available().await.expect("read ready"); assert_eq!(String::from_utf8_lossy(&ready), "READY:secret-value\n"); process .write_all(b"ping from client\n") .await .expect("write input"); process.flush().await.expect("flush"); let echoed = process.read_available().await.expect("read echo"); assert_eq!(String::from_utf8_lossy(&echoed), "ECHO:ping from client\n"); let status = process.wait().await.expect("wait for exit"); assert!(status.success()); fs::remove_file(&script_path).expect("cleanup script"); fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir"); }); } #[test] fn rejects_non_stdio_bootstrap() { let config = ScopedMcpServerConfig { scope: ConfigSource::Local, config: McpServerConfig::Sdk(crate::config::McpSdkServerConfig { name: "sdk-server".to_string(), }), }; let bootstrap = McpClientBootstrap::from_scoped_config("sdk server", &config); let error = spawn_mcp_stdio_process(&bootstrap).expect_err("non-stdio should fail"); assert_eq!(error.kind(), ErrorKind::InvalidInput); } #[test] fn round_trips_jsonrpc_messages_over_stdio_frames() { let runtime = Builder::new_current_thread() .enable_all() .build() .expect("runtime"); runtime.block_on(async { let script_path = write_jsonrpc_script(); let transport = crate::mcp_client::McpStdioTransport { command: "python3".to_string(), args: vec![script_path.to_string_lossy().into_owned()], env: BTreeMap::new(), }; let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly"); process .write_jsonrpc_message(&serde_json::json!({ "jsonrpc": "2.0", "id": 1, "method": "initialize" })) .await .expect("write jsonrpc message"); let response = process .read_jsonrpc_message() .await .expect("read jsonrpc response"); assert_eq!(response["jsonrpc"], serde_json::json!("2.0")); assert_eq!(response["id"], serde_json::json!(1)); assert_eq!(response["result"]["echo"], serde_json::json!(true)); let status = process.wait().await.expect("wait for exit"); assert!(status.success()); fs::remove_file(&script_path).expect("cleanup script"); fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir"); }); } #[test] fn direct_spawn_uses_transport_env() { let runtime = Builder::new_current_thread() .enable_all() .build() .expect("runtime"); runtime.block_on(async { let script_path = write_echo_script(); let transport = crate::mcp_client::McpStdioTransport { command: script_path.to_string_lossy().into_owned(), args: Vec::new(), env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "direct-secret".to_string())]), }; let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly"); let ready = process.read_available().await.expect("read ready"); assert_eq!(String::from_utf8_lossy(&ready), "READY:direct-secret\n"); process.terminate().await.expect("terminate child"); let _ = process.wait().await.expect("wait after kill"); fs::remove_file(&script_path).expect("cleanup script"); fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir"); }); } }