crates/flotilla-core/src/providers/ssh_runner.rs
Line | Count | Source |
1 | | use std::{path::Path, sync::Arc}; |
2 | | |
3 | | use async_trait::async_trait; |
4 | | use uuid::Uuid; |
5 | | |
6 | | use crate::providers::{ |
7 | | helper_exec_script, install_managed_helper_script, ChannelLabel, CommandOutput, CommandRunner, FLOTILLA_HELPER_NAME, |
8 | | FLOTILLA_HELPER_SCRIPT, |
9 | | }; |
10 | | |
11 | | /// Command runner that executes commands on a remote host over SSH. |
12 | | /// |
13 | | /// This is intentionally narrow: it shells out to `ssh` for direct-environment |
14 | | /// execution and discovery, but it does not model daemon-to-daemon transport. |
15 | | pub struct SshCommandRunner { |
16 | | destination: String, |
17 | | multiplex: bool, |
18 | | runner: Arc<dyn CommandRunner>, |
19 | | } |
20 | | |
21 | | impl SshCommandRunner { |
22 | 18 | pub fn new(destination: impl Into<String>, multiplex: bool, runner: Arc<dyn CommandRunner>) -> Self { |
23 | 18 | Self { destination: destination.into(), multiplex, runner } |
24 | 18 | } |
25 | | |
26 | 63 | fn ssh_prefix_args(&self) -> Vec<&str> { |
27 | 63 | let mut args = vec!["-T", "-o", "BatchMode=yes"]; |
28 | 63 | if self.multiplex { |
29 | 58 | args.extend(["-o", "ControlMaster=auto", "-o", "ControlPath=/tmp/flotilla-ssh-%C", "-o", "ControlPersist=60"]); |
30 | 58 | }5 |
31 | 63 | args.push(self.destination.as_str()); |
32 | 63 | args |
33 | 63 | } |
34 | | |
35 | 61 | fn ssh_shell_args<'a>(&'a self, script: &'a str) -> Vec<&'a str> { |
36 | 61 | let mut args = self.ssh_prefix_args(); |
37 | 61 | args.push("sh"); |
38 | 61 | args.push("-lc"); |
39 | 61 | args.push(script); |
40 | 61 | args |
41 | 61 | } |
42 | | |
43 | 61 | fn remote_exec_script(&self, cmd: &str, args: &[&str], cwd: &Path) -> String { |
44 | 61 | let mut parts = Vec::with_capacity(args.len() + 4); |
45 | 61 | parts.push(format!("cd {}", flotilla_protocol::arg::shell_quote(&cwd.to_string_lossy()))); |
46 | 61 | parts.push("&&".to_string()); |
47 | 61 | parts.push("exec".to_string()); |
48 | 61 | parts.push(flotilla_protocol::arg::shell_quote(cmd)); |
49 | 61 | parts.extend(args.iter().map(|arg| flotilla_protocol::arg::shell_quote56 (arg56 ))); |
50 | 61 | parts.join(" ") |
51 | 61 | } |
52 | | |
53 | 60 | async fn execute(&self, cmd: &str, args: &[&str], cwd: &Path, label: &ChannelLabel) -> Result<String, String> { |
54 | 60 | let script = self.remote_exec_script(cmd, args, cwd); |
55 | 60 | let ssh_args = self.ssh_shell_args(&script); |
56 | 60 | self.runner.run("ssh", &ssh_args, Path::new("/"), label).await |
57 | 59 | } |
58 | | } |
59 | | |
60 | | #[async_trait] |
61 | | impl CommandRunner for SshCommandRunner { |
62 | 59 | async fn run(&self, cmd: &str, args: &[&str], cwd: &Path, label: &ChannelLabel) -> Result<String, String> { |
63 | | self.execute(cmd, args, cwd, label).await |
64 | 59 | } |
65 | | |
66 | 1 | async fn run_output(&self, cmd: &str, args: &[&str], cwd: &Path, label: &ChannelLabel) -> Result<CommandOutput, String> { |
67 | | let script = self.remote_exec_script(cmd, args, cwd); |
68 | | let ssh_args = self.ssh_shell_args(&script); |
69 | | self.runner.run_output("ssh", &ssh_args, Path::new("/"), label).await |
70 | 1 | } |
71 | | |
72 | 1 | async fn exists(&self, cmd: &str, args: &[&str]) -> bool { |
73 | | self.execute(cmd, args, Path::new("/"), &ChannelLabel::Noop).await.is_ok() |
74 | 1 | } |
75 | | |
76 | 1 | async fn ensure_file(&self, path: &Path, content: &str) -> Result<String, String> { |
77 | | let temp_suffix = Uuid::new_v4().to_string(); |
78 | | let path_str = path.to_string_lossy().into_owned(); |
79 | | let helper_path = |
80 | | install_managed_helper_script(&*self.runner, "ssh", &self.ssh_prefix_args(), FLOTILLA_HELPER_NAME, FLOTILLA_HELPER_SCRIPT) |
81 | | .await?; |
82 | | let helper_script = helper_exec_script(&helper_path, "ensure-file-if-absent", &[&path_str, content, &temp_suffix])?; |
83 | | let mut owned_args: Vec<String> = self.ssh_prefix_args().into_iter().map(str::to_string).collect(); |
84 | | owned_args.extend(["sh".to_string(), "-lc".to_string(), helper_script]); |
85 | | let arg_refs: Vec<&str> = owned_args.iter().map(String::as_str).collect(); |
86 | | self.runner.run("ssh", &arg_refs, Path::new("/"), &ChannelLabel::Noop).await |
87 | 1 | } |
88 | | } |
89 | | |
90 | | #[cfg(test)] |
91 | | mod tests { |
92 | | use std::{ |
93 | | collections::VecDeque, |
94 | | path::{Path, PathBuf}, |
95 | | sync::Mutex, |
96 | | }; |
97 | | |
98 | | use async_trait::async_trait; |
99 | | |
100 | | use super::SshCommandRunner; |
101 | | use crate::providers::{ChannelLabel, CommandOutput, CommandRunner}; |
102 | | |
103 | | struct RecordingRunner { |
104 | | calls: Mutex<Vec<(String, Vec<String>, PathBuf)>>, |
105 | | run_results: Mutex<VecDeque<Result<String, String>>>, |
106 | | run_output_result: Mutex<Option<Result<CommandOutput, String>>>, |
107 | | } |
108 | | |
109 | | impl RecordingRunner { |
110 | 3 | fn with_run_result(result: Result<String, String>) -> Self { |
111 | 3 | Self::with_run_results(vec![result]) |
112 | 3 | } |
113 | | |
114 | 4 | fn with_run_results(results: Vec<Result<String, String>>) -> Self { |
115 | 4 | Self { calls: Mutex::new(Vec::new()), run_results: Mutex::new(results.into()), run_output_result: Mutex::new(None) } |
116 | 4 | } |
117 | | |
118 | 1 | fn with_run_output_result(result: Result<CommandOutput, String>) -> Self { |
119 | 1 | Self { calls: Mutex::new(Vec::new()), run_results: Mutex::new(VecDeque::new()), run_output_result: Mutex::new(Some(result)) } |
120 | 1 | } |
121 | | |
122 | 4 | fn calls(&self) -> Vec<(String, Vec<String>, PathBuf)> { |
123 | 4 | self.calls.lock().expect("calls mutex").clone() |
124 | 4 | } |
125 | | } |
126 | | |
127 | | #[async_trait] |
128 | | impl CommandRunner for RecordingRunner { |
129 | 5 | async fn run(&self, cmd: &str, args: &[&str], cwd: &Path, _label: &ChannelLabel) -> Result<String, String> { |
130 | | self.calls.lock().expect("calls mutex").push(( |
131 | | cmd.to_string(), |
132 | 39 | args.iter().map(|arg| (*arg).to_string()).collect(), |
133 | | cwd.to_path_buf(), |
134 | | )); |
135 | | self.run_results.lock().expect("run_results mutex").pop_front().expect("run result not configured") |
136 | 5 | } |
137 | | |
138 | 1 | async fn run_output(&self, cmd: &str, args: &[&str], cwd: &Path, _label: &ChannelLabel) -> Result<CommandOutput, String> { |
139 | | self.calls.lock().expect("calls mutex").push(( |
140 | | cmd.to_string(), |
141 | 13 | args.iter().map(|arg| (*arg).to_string()).collect(), |
142 | | cwd.to_path_buf(), |
143 | | )); |
144 | | self.run_output_result.lock().expect("run_output_result mutex").take().expect("run output result not configured") |
145 | 1 | } |
146 | | |
147 | 0 | async fn exists(&self, _cmd: &str, _args: &[&str]) -> bool { |
148 | | true |
149 | 0 | } |
150 | | } |
151 | | |
152 | 3 | fn ssh_call_args(calls: &[(String, Vec<String>, PathBuf)]) -> &Vec<String> { |
153 | 3 | assert_eq!(calls.len(), 1); |
154 | 3 | assert_eq!(calls[0].0, "ssh"); |
155 | 3 | &calls[0].1 |
156 | 3 | } |
157 | | |
158 | | #[tokio::test] |
159 | 1 | async fn run_builds_ssh_command_with_working_directory() { |
160 | 1 | let inner = std::sync::Arc::new(RecordingRunner::with_run_result(Ok("stdout".into()))); |
161 | 1 | let runner = SshCommandRunner::new("alice@feta.local", false, inner.clone()); |
162 | | |
163 | 1 | let output = runner.run("git", &["status", "--short"], Path::new("/repo with space"), &ChannelLabel::Noop).await; |
164 | | |
165 | 1 | assert_eq!(output.unwrap(), "stdout"); |
166 | 1 | let calls = inner.calls(); |
167 | 1 | let args = ssh_call_args(&calls); |
168 | 1 | assert_eq!(args[0], "-T"); |
169 | 1 | assert_eq!(args[1], "-o"); |
170 | 1 | assert_eq!(args[2], "BatchMode=yes"); |
171 | 1 | assert_eq!(args[3], "alice@feta.local"); |
172 | 1 | assert_eq!(args[4], "sh"); |
173 | 1 | assert_eq!(args[5], "-lc"); |
174 | 1 | assert_eq!(args[6], "cd '/repo with space' && exec 'git' 'status' '--short'"); |
175 | 1 | } |
176 | | |
177 | | #[tokio::test] |
178 | 1 | async fn run_output_preserves_stdout_and_stderr() { |
179 | 1 | let inner = std::sync::Arc::new(RecordingRunner::with_run_output_result(Ok(CommandOutput { |
180 | 1 | stdout: "out".into(), |
181 | 1 | stderr: "err".into(), |
182 | 1 | success: false, |
183 | 1 | }))); |
184 | 1 | let runner = SshCommandRunner::new("alice@feta.local", true, inner.clone()); |
185 | | |
186 | 1 | let output = runner.run_output("git", &["status"], Path::new("/repo"), &ChannelLabel::Noop).await.unwrap(); |
187 | | |
188 | 1 | assert_eq!(output.stdout, "out"); |
189 | 1 | assert_eq!(output.stderr, "err"); |
190 | 1 | assert!(!output.success); |
191 | | |
192 | 1 | let calls = inner.calls(); |
193 | 1 | let args = ssh_call_args(&calls); |
194 | 1 | assert_eq!(args[0], "-T"); |
195 | 1 | assert_eq!(args[1], "-o"); |
196 | 1 | assert_eq!(args[2], "BatchMode=yes"); |
197 | 1 | assert_eq!(&args[3..9], ["-o", "ControlMaster=auto", "-o", "ControlPath=/tmp/flotilla-ssh-%C", "-o", "ControlPersist=60"]); |
198 | 1 | assert_eq!(args[9], "alice@feta.local"); |
199 | 1 | assert_eq!(args[10], "sh"); |
200 | 1 | assert_eq!(args[11], "-lc"); |
201 | 1 | assert_eq!(args[12], "cd '/repo' && exec 'git' 'status'"); |
202 | 1 | } |
203 | | |
204 | | #[tokio::test] |
205 | 1 | async fn exists_uses_remote_command_lookup() { |
206 | 1 | let inner = std::sync::Arc::new(RecordingRunner::with_run_result(Ok(String::new()))); |
207 | 1 | let runner = SshCommandRunner::new("alice@feta.local", false, inner.clone()); |
208 | | |
209 | 1 | assert!(runner.exists("cleat", &["--version"]).await); |
210 | | |
211 | 1 | let calls = inner.calls(); |
212 | 1 | let args = ssh_call_args(&calls); |
213 | 1 | assert_eq!(args[0], "-T"); |
214 | 1 | assert_eq!(args[1], "-o"); |
215 | 1 | assert_eq!(args[2], "BatchMode=yes"); |
216 | 1 | assert_eq!(args[3], "alice@feta.local"); |
217 | 1 | assert_eq!(args[4], "sh"); |
218 | 1 | assert_eq!(args[5], "-lc"); |
219 | 1 | assert_eq!(args[6], "cd '/' && exec 'cleat' '--version'"); |
220 | 1 | } |
221 | | |
222 | | #[tokio::test] |
223 | 1 | async fn ensure_file_writes_remote_file() { |
224 | 1 | let inner = std::sync::Arc::new(RecordingRunner::with_run_results(vec![ |
225 | 1 | Ok("/remote/state/flotilla/helpers/helper-hash/flotilla-helper\n".into()), |
226 | 1 | Ok(String::new()), |
227 | | ])); |
228 | 1 | let runner = SshCommandRunner::new("alice@feta.local", false, inner.clone()); |
229 | | |
230 | 1 | let content = runner.ensure_file(Path::new("/etc/flotilla/config.toml"), "key = true\n").await.expect("ensure_file"); |
231 | 1 | assert_eq!(content, String::new()); |
232 | | |
233 | 1 | let calls = inner.calls(); |
234 | 1 | assert_eq!(calls.len(), 2); |
235 | | |
236 | 1 | let install_args = &calls[0].1; |
237 | 1 | assert_eq!(install_args[0], "-T"); |
238 | 1 | assert_eq!(install_args[1], "-o"); |
239 | 1 | assert_eq!(install_args[2], "BatchMode=yes"); |
240 | 1 | assert_eq!(install_args[3], "alice@feta.local"); |
241 | 1 | assert_eq!(install_args[4], "sh"); |
242 | 1 | assert_eq!(install_args[5], "-lc"); |
243 | 1 | assert!(install_args[6].contains("helpers/$helper_hash")); |
244 | 1 | assert_eq!(install_args[7], "flotilla-bootstrap-install-managed-script"); |
245 | 1 | assert_eq!(install_args[8], "flotilla-helper"); |
246 | 1 | assert!(!install_args[9].is_empty()); |
247 | | |
248 | 1 | let args = &calls[1].1; |
249 | 1 | assert_eq!(args[0], "-T"); |
250 | 1 | assert_eq!(args[1], "-o"); |
251 | 1 | assert_eq!(args[2], "BatchMode=yes"); |
252 | 1 | assert_eq!(args[3], "alice@feta.local"); |
253 | 1 | assert_eq!(args[4], "sh"); |
254 | 1 | assert_eq!(args[5], "-lc"); |
255 | 1 | assert!(args[6].contains("PATH='/remote/state/flotilla/helpers/helper-hash':\"$PATH\"")); |
256 | 1 | assert!(args[6].contains("exec 'flotilla-helper' 'ensure-file-if-absent'")); |
257 | 1 | assert!(args[6].contains("'/etc/flotilla/config.toml'")); |
258 | 1 | assert!(args[6].contains("'key = true\n'")); |
259 | 1 | } |
260 | | |
261 | | #[tokio::test] |
262 | 1 | async fn run_propagates_runner_errors() { |
263 | 1 | let inner = std::sync::Arc::new(RecordingRunner::with_run_result(Err("ssh failed".into()))); |
264 | 1 | let runner = SshCommandRunner::new("alice@feta.local", false, inner.clone()); |
265 | | |
266 | 1 | let error = runner.run("git", &["status"], Path::new("/repo"), &ChannelLabel::Noop).await; |
267 | | |
268 | 1 | assert_eq!(error.unwrap_err(), "ssh failed"); |
269 | 1 | } |
270 | | } |