Coverage Report

Created: 2026-04-05 07:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
crates/flotilla-core/src/providers/ssh_runner.rs
Line
Count
Source
1
use std::{path::Path, sync::Arc};
2
3
use async_trait::async_trait;
4
use uuid::Uuid;
5
6
use crate::providers::{
7
    helper_exec_script, install_managed_helper_script, ChannelLabel, CommandOutput, CommandRunner, FLOTILLA_HELPER_NAME,
8
    FLOTILLA_HELPER_SCRIPT,
9
};
10
11
/// Command runner that executes commands on a remote host over SSH.
12
///
13
/// This is intentionally narrow: it shells out to `ssh` for direct-environment
14
/// execution and discovery, but it does not model daemon-to-daemon transport.
15
pub struct SshCommandRunner {
16
    destination: String,
17
    multiplex: bool,
18
    runner: Arc<dyn CommandRunner>,
19
}
20
21
impl SshCommandRunner {
22
18
    pub fn new(destination: impl Into<String>, multiplex: bool, runner: Arc<dyn CommandRunner>) -> Self {
23
18
        Self { destination: destination.into(), multiplex, runner }
24
18
    }
25
26
63
    fn ssh_prefix_args(&self) -> Vec<&str> {
27
63
        let mut args = vec!["-T", "-o", "BatchMode=yes"];
28
63
        if self.multiplex {
29
58
            args.extend(["-o", "ControlMaster=auto", "-o", "ControlPath=/tmp/flotilla-ssh-%C", "-o", "ControlPersist=60"]);
30
58
        
}5
31
63
        args.push(self.destination.as_str());
32
63
        args
33
63
    }
34
35
61
    fn ssh_shell_args<'a>(&'a self, script: &'a str) -> Vec<&'a str> {
36
61
        let mut args = self.ssh_prefix_args();
37
61
        args.push("sh");
38
61
        args.push("-lc");
39
61
        args.push(script);
40
61
        args
41
61
    }
42
43
61
    fn remote_exec_script(&self, cmd: &str, args: &[&str], cwd: &Path) -> String {
44
61
        let mut parts = Vec::with_capacity(args.len() + 4);
45
61
        parts.push(format!("cd {}", flotilla_protocol::arg::shell_quote(&cwd.to_string_lossy())));
46
61
        parts.push("&&".to_string());
47
61
        parts.push("exec".to_string());
48
61
        parts.push(flotilla_protocol::arg::shell_quote(cmd));
49
61
        parts.extend(args.iter().map(|arg| 
flotilla_protocol::arg::shell_quote56
(
arg56
)));
50
61
        parts.join(" ")
51
61
    }
52
53
60
    async fn execute(&self, cmd: &str, args: &[&str], cwd: &Path, label: &ChannelLabel) -> Result<String, String> {
54
60
        let script = self.remote_exec_script(cmd, args, cwd);
55
60
        let ssh_args = self.ssh_shell_args(&script);
56
60
        self.runner.run("ssh", &ssh_args, Path::new("/"), label).await
57
59
    }
58
}
59
60
#[async_trait]
61
impl CommandRunner for SshCommandRunner {
62
59
    async fn run(&self, cmd: &str, args: &[&str], cwd: &Path, label: &ChannelLabel) -> Result<String, String> {
63
        self.execute(cmd, args, cwd, label).await
64
59
    }
65
66
1
    async fn run_output(&self, cmd: &str, args: &[&str], cwd: &Path, label: &ChannelLabel) -> Result<CommandOutput, String> {
67
        let script = self.remote_exec_script(cmd, args, cwd);
68
        let ssh_args = self.ssh_shell_args(&script);
69
        self.runner.run_output("ssh", &ssh_args, Path::new("/"), label).await
70
1
    }
71
72
1
    async fn exists(&self, cmd: &str, args: &[&str]) -> bool {
73
        self.execute(cmd, args, Path::new("/"), &ChannelLabel::Noop).await.is_ok()
74
1
    }
75
76
1
    async fn ensure_file(&self, path: &Path, content: &str) -> Result<String, String> {
77
        let temp_suffix = Uuid::new_v4().to_string();
78
        let path_str = path.to_string_lossy().into_owned();
79
        let helper_path =
80
            install_managed_helper_script(&*self.runner, "ssh", &self.ssh_prefix_args(), FLOTILLA_HELPER_NAME, FLOTILLA_HELPER_SCRIPT)
81
                .await?;
82
        let helper_script = helper_exec_script(&helper_path, "ensure-file-if-absent", &[&path_str, content, &temp_suffix])?;
83
        let mut owned_args: Vec<String> = self.ssh_prefix_args().into_iter().map(str::to_string).collect();
84
        owned_args.extend(["sh".to_string(), "-lc".to_string(), helper_script]);
85
        let arg_refs: Vec<&str> = owned_args.iter().map(String::as_str).collect();
86
        self.runner.run("ssh", &arg_refs, Path::new("/"), &ChannelLabel::Noop).await
87
1
    }
88
}
89
90
#[cfg(test)]
91
mod tests {
92
    use std::{
93
        collections::VecDeque,
94
        path::{Path, PathBuf},
95
        sync::Mutex,
96
    };
97
98
    use async_trait::async_trait;
99
100
    use super::SshCommandRunner;
101
    use crate::providers::{ChannelLabel, CommandOutput, CommandRunner};
102
103
    struct RecordingRunner {
104
        calls: Mutex<Vec<(String, Vec<String>, PathBuf)>>,
105
        run_results: Mutex<VecDeque<Result<String, String>>>,
106
        run_output_result: Mutex<Option<Result<CommandOutput, String>>>,
107
    }
108
109
    impl RecordingRunner {
110
3
        fn with_run_result(result: Result<String, String>) -> Self {
111
3
            Self::with_run_results(vec![result])
112
3
        }
113
114
4
        fn with_run_results(results: Vec<Result<String, String>>) -> Self {
115
4
            Self { calls: Mutex::new(Vec::new()), run_results: Mutex::new(results.into()), run_output_result: Mutex::new(None) }
116
4
        }
117
118
1
        fn with_run_output_result(result: Result<CommandOutput, String>) -> Self {
119
1
            Self { calls: Mutex::new(Vec::new()), run_results: Mutex::new(VecDeque::new()), run_output_result: Mutex::new(Some(result)) }
120
1
        }
121
122
4
        fn calls(&self) -> Vec<(String, Vec<String>, PathBuf)> {
123
4
            self.calls.lock().expect("calls mutex").clone()
124
4
        }
125
    }
126
127
    #[async_trait]
128
    impl CommandRunner for RecordingRunner {
129
5
        async fn run(&self, cmd: &str, args: &[&str], cwd: &Path, _label: &ChannelLabel) -> Result<String, String> {
130
            self.calls.lock().expect("calls mutex").push((
131
                cmd.to_string(),
132
39
                args.iter().map(|arg| (*arg).to_string()).collect(),
133
                cwd.to_path_buf(),
134
            ));
135
            self.run_results.lock().expect("run_results mutex").pop_front().expect("run result not configured")
136
5
        }
137
138
1
        async fn run_output(&self, cmd: &str, args: &[&str], cwd: &Path, _label: &ChannelLabel) -> Result<CommandOutput, String> {
139
            self.calls.lock().expect("calls mutex").push((
140
                cmd.to_string(),
141
13
                args.iter().map(|arg| (*arg).to_string()).collect(),
142
                cwd.to_path_buf(),
143
            ));
144
            self.run_output_result.lock().expect("run_output_result mutex").take().expect("run output result not configured")
145
1
        }
146
147
0
        async fn exists(&self, _cmd: &str, _args: &[&str]) -> bool {
148
            true
149
0
        }
150
    }
151
152
3
    fn ssh_call_args(calls: &[(String, Vec<String>, PathBuf)]) -> &Vec<String> {
153
3
        assert_eq!(calls.len(), 1);
154
3
        assert_eq!(calls[0].0, "ssh");
155
3
        &calls[0].1
156
3
    }
157
158
    #[tokio::test]
159
1
    async fn run_builds_ssh_command_with_working_directory() {
160
1
        let inner = std::sync::Arc::new(RecordingRunner::with_run_result(Ok("stdout".into())));
161
1
        let runner = SshCommandRunner::new("alice@feta.local", false, inner.clone());
162
163
1
        let output = runner.run("git", &["status", "--short"], Path::new("/repo with space"), &ChannelLabel::Noop).await;
164
165
1
        assert_eq!(output.unwrap(), "stdout");
166
1
        let calls = inner.calls();
167
1
        let args = ssh_call_args(&calls);
168
1
        assert_eq!(args[0], "-T");
169
1
        assert_eq!(args[1], "-o");
170
1
        assert_eq!(args[2], "BatchMode=yes");
171
1
        assert_eq!(args[3], "alice@feta.local");
172
1
        assert_eq!(args[4], "sh");
173
1
        assert_eq!(args[5], "-lc");
174
1
        assert_eq!(args[6], "cd '/repo with space' && exec 'git' 'status' '--short'");
175
1
    }
176
177
    #[tokio::test]
178
1
    async fn run_output_preserves_stdout_and_stderr() {
179
1
        let inner = std::sync::Arc::new(RecordingRunner::with_run_output_result(Ok(CommandOutput {
180
1
            stdout: "out".into(),
181
1
            stderr: "err".into(),
182
1
            success: false,
183
1
        })));
184
1
        let runner = SshCommandRunner::new("alice@feta.local", true, inner.clone());
185
186
1
        let output = runner.run_output("git", &["status"], Path::new("/repo"), &ChannelLabel::Noop).await.unwrap();
187
188
1
        assert_eq!(output.stdout, "out");
189
1
        assert_eq!(output.stderr, "err");
190
1
        assert!(!output.success);
191
192
1
        let calls = inner.calls();
193
1
        let args = ssh_call_args(&calls);
194
1
        assert_eq!(args[0], "-T");
195
1
        assert_eq!(args[1], "-o");
196
1
        assert_eq!(args[2], "BatchMode=yes");
197
1
        assert_eq!(&args[3..9], ["-o", "ControlMaster=auto", "-o", "ControlPath=/tmp/flotilla-ssh-%C", "-o", "ControlPersist=60"]);
198
1
        assert_eq!(args[9], "alice@feta.local");
199
1
        assert_eq!(args[10], "sh");
200
1
        assert_eq!(args[11], "-lc");
201
1
        assert_eq!(args[12], "cd '/repo' && exec 'git' 'status'");
202
1
    }
203
204
    #[tokio::test]
205
1
    async fn exists_uses_remote_command_lookup() {
206
1
        let inner = std::sync::Arc::new(RecordingRunner::with_run_result(Ok(String::new())));
207
1
        let runner = SshCommandRunner::new("alice@feta.local", false, inner.clone());
208
209
1
        assert!(runner.exists("cleat", &["--version"]).await);
210
211
1
        let calls = inner.calls();
212
1
        let args = ssh_call_args(&calls);
213
1
        assert_eq!(args[0], "-T");
214
1
        assert_eq!(args[1], "-o");
215
1
        assert_eq!(args[2], "BatchMode=yes");
216
1
        assert_eq!(args[3], "alice@feta.local");
217
1
        assert_eq!(args[4], "sh");
218
1
        assert_eq!(args[5], "-lc");
219
1
        assert_eq!(args[6], "cd '/' && exec 'cleat' '--version'");
220
1
    }
221
222
    #[tokio::test]
223
1
    async fn ensure_file_writes_remote_file() {
224
1
        let inner = std::sync::Arc::new(RecordingRunner::with_run_results(vec![
225
1
            Ok("/remote/state/flotilla/helpers/helper-hash/flotilla-helper\n".into()),
226
1
            Ok(String::new()),
227
        ]));
228
1
        let runner = SshCommandRunner::new("alice@feta.local", false, inner.clone());
229
230
1
        let content = runner.ensure_file(Path::new("/etc/flotilla/config.toml"), "key = true\n").await.expect("ensure_file");
231
1
        assert_eq!(content, String::new());
232
233
1
        let calls = inner.calls();
234
1
        assert_eq!(calls.len(), 2);
235
236
1
        let install_args = &calls[0].1;
237
1
        assert_eq!(install_args[0], "-T");
238
1
        assert_eq!(install_args[1], "-o");
239
1
        assert_eq!(install_args[2], "BatchMode=yes");
240
1
        assert_eq!(install_args[3], "alice@feta.local");
241
1
        assert_eq!(install_args[4], "sh");
242
1
        assert_eq!(install_args[5], "-lc");
243
1
        assert!(install_args[6].contains("helpers/$helper_hash"));
244
1
        assert_eq!(install_args[7], "flotilla-bootstrap-install-managed-script");
245
1
        assert_eq!(install_args[8], "flotilla-helper");
246
1
        assert!(!install_args[9].is_empty());
247
248
1
        let args = &calls[1].1;
249
1
        assert_eq!(args[0], "-T");
250
1
        assert_eq!(args[1], "-o");
251
1
        assert_eq!(args[2], "BatchMode=yes");
252
1
        assert_eq!(args[3], "alice@feta.local");
253
1
        assert_eq!(args[4], "sh");
254
1
        assert_eq!(args[5], "-lc");
255
1
        assert!(args[6].contains("PATH='/remote/state/flotilla/helpers/helper-hash':\"$PATH\""));
256
1
        assert!(args[6].contains("exec 'flotilla-helper' 'ensure-file-if-absent'"));
257
1
        assert!(args[6].contains("'/etc/flotilla/config.toml'"));
258
1
        assert!(args[6].contains("'key = true\n'"));
259
1
    }
260
261
    #[tokio::test]
262
1
    async fn run_propagates_runner_errors() {
263
1
        let inner = std::sync::Arc::new(RecordingRunner::with_run_result(Err("ssh failed".into())));
264
1
        let runner = SshCommandRunner::new("alice@feta.local", false, inner.clone());
265
266
1
        let error = runner.run("git", &["status"], Path::new("/repo"), &ChannelLabel::Noop).await;
267
268
1
        assert_eq!(error.unwrap_err(), "ssh failed");
269
1
    }
270
}