Coverage Report

Created: 2026-04-05 07:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
crates/flotilla-core/src/step.rs
Line
Count
Source
1
use std::{
2
    sync::{Arc, Mutex},
3
    time::Duration,
4
};
5
6
use flotilla_protocol::{CommandValue, DaemonEvent, HostName, RepoIdentity, StepStatus};
7
pub use flotilla_protocol::{Step, StepAction, StepExecutionContext, StepOutcome};
8
use tokio::sync::broadcast;
9
use tokio_util::sync::CancellationToken;
10
use tracing::{debug, info};
11
12
use crate::path_context::ExecutionEnvironmentPath;
13
14
/// Resolves symbolic step actions into outcomes.
15
#[async_trait::async_trait]
16
pub trait StepResolver: Send + Sync {
17
    async fn resolve(
18
        &self,
19
        description: &str,
20
        context: &StepExecutionContext,
21
        action: StepAction,
22
        prior: &[StepOutcome],
23
    ) -> Result<StepOutcome, String>;
24
}
25
26
pub struct RemoteStepBatchRequest {
27
    pub command_id: u64,
28
    pub target_host: HostName,
29
    pub repo_identity: RepoIdentity,
30
    /// Requester-local repo path used when remapping remote progress into
31
    /// `DaemonEvent::CommandStepUpdate` for the UI. Remote execution itself
32
    /// resolves the actual repo root from `repo_identity`. This may be absent
33
    /// when the requester only has identity metadata for the repo.
34
    pub repo: Option<ExecutionEnvironmentPath>,
35
    /// Global step index of the first step in this batch on the requester.
36
    ///
37
    /// The executing host emits batch-relative progress indices. The requester
38
    /// uses this offset when remapping those updates into the global command
39
    /// timeline that the UI already understands.
40
    pub step_offset: usize,
41
    pub steps: Vec<Step>,
42
}
43
44
#[derive(Clone, Debug, PartialEq, Eq)]
45
pub struct RemoteStepProgressUpdate {
46
    pub batch_step_index: usize,
47
    pub batch_step_count: usize,
48
    pub description: String,
49
    pub status: StepStatus,
50
}
51
52
#[async_trait::async_trait]
53
pub trait RemoteStepProgressSink: Send + Sync {
54
    async fn emit(&self, update: RemoteStepProgressUpdate);
55
}
56
57
#[async_trait::async_trait]
58
pub trait RemoteStepExecutor: Send + Sync {
59
    async fn execute_batch(
60
        &self,
61
        request: RemoteStepBatchRequest,
62
        progress_sink: Arc<dyn RemoteStepProgressSink>,
63
    ) -> Result<Vec<StepOutcome>, String>;
64
65
    async fn cancel_active_batch(&self, command_id: u64) -> Result<(), String>;
66
}
67
68
pub struct UnsupportedRemoteStepExecutor;
69
70
#[async_trait::async_trait]
71
impl RemoteStepExecutor for UnsupportedRemoteStepExecutor {
72
    async fn execute_batch(
73
        &self,
74
        request: RemoteStepBatchRequest,
75
        _progress_sink: Arc<dyn RemoteStepProgressSink>,
76
0
    ) -> Result<Vec<StepOutcome>, String> {
77
        Err(format!("remote step execution is not wired for host {}", request.target_host))
78
0
    }
79
80
0
    async fn cancel_active_batch(&self, _command_id: u64) -> Result<(), String> {
81
        Ok(())
82
0
    }
83
}
84
85
/// A plan of steps to execute for a command.
86
pub struct StepPlan {
87
    pub steps: Vec<Step>,
88
}
89
90
impl StepPlan {
91
152
    pub fn new(steps: Vec<Step>) -> Self {
92
152
        Self { steps }
93
152
    }
94
}
95
96
/// Execute a step plan, emitting progress events and checking cancellation between steps.
97
#[allow(clippy::too_many_arguments)]
98
73
pub async fn run_step_plan(
99
73
    plan: StepPlan,
100
73
    command_id: u64,
101
73
    local_host: HostName,
102
73
    repo_identity: RepoIdentity,
103
73
    repo: ExecutionEnvironmentPath,
104
73
    cancel: CancellationToken,
105
73
    event_tx: broadcast::Sender<DaemonEvent>,
106
73
    resolver: &dyn StepResolver,
107
73
) -> CommandValue {
108
73
    let remote_executor = UnsupportedRemoteStepExecutor;
109
73
    run_step_plan_with_remote_executor(plan, command_id, local_host, repo_identity, repo, cancel, event_tx, resolver, &remote_executor)
110
73
        .await
111
73
}
112
113
/// Execute a step plan with explicit remote-step handling.
114
#[allow(clippy::too_many_arguments)]
115
104
pub async fn run_step_plan_with_remote_executor(
116
104
    plan: StepPlan,
117
104
    command_id: u64,
118
104
    local_host: HostName,
119
104
    repo_identity: RepoIdentity,
120
104
    repo: ExecutionEnvironmentPath,
121
104
    cancel: CancellationToken,
122
104
    event_tx: broadcast::Sender<DaemonEvent>,
123
104
    resolver: &dyn StepResolver,
124
104
    remote_executor: &dyn RemoteStepExecutor,
125
104
) -> CommandValue {
126
104
    let step_count = plan.steps.len();
127
104
    info!(%command_id, %step_count, %local_host, "running step plan");
128
104
    let mut outcomes: Vec<StepOutcome> = Vec::new();
129
104
    let steps = plan.steps;
130
104
    let mut i = 0usize;
131
132
222
    while i < step_count {
133
156
        if cancel.is_cancelled() {
134
1
            return CommandValue::Cancelled;
135
155
        }
136
137
155
        let step = steps[i].clone();
138
155
        let step_target = step.host.host_name().clone();
139
155
        debug!(%command_id, %step_target, %local_host, step_index = i, desc = %step.description, "step dispatch");
140
141
155
        if step_target == local_host {
142
139
            emit_step_update(
143
139
                &event_tx,
144
139
                command_id,
145
139
                local_host.clone(),
146
139
                repo_identity.clone(),
147
139
                Some(repo.as_path().to_path_buf()),
148
139
                i,
149
139
                step_count,
150
139
                step.description.clone(),
151
139
                StepStatus::Started,
152
            );
153
154
139
            let outcome = resolver.resolve(&step.description, &step.host, step.action, &outcomes).await;
155
156
            // Cancellation wins over a successful in-flight step, but provider
157
            // errors still surface so we don't hide the underlying failure.
158
139
            if cancel.is_cancelled() && 
outcome3
.
is_ok3
() {
159
3
                return CommandValue::Cancelled;
160
136
            }
161
162
136
            match outcome {
163
114
                Ok(step_outcome) => {
164
114
                    let status = match &step_outcome {
165
2
                        StepOutcome::Skipped => StepStatus::Skipped,
166
112
                        _ => StepStatus::Succeeded,
167
                    };
168
114
                    emit_step_update(
169
114
                        &event_tx,
170
114
                        command_id,
171
114
                        local_host.clone(),
172
114
                        repo_identity.clone(),
173
114
                        Some(repo.as_path().to_path_buf()),
174
114
                        i,
175
114
                        step_count,
176
114
                        step.description.clone(),
177
114
                        status,
178
                    );
179
114
                    outcomes.push(step_outcome);
180
                }
181
22
                Err(e) => {
182
22
                    emit_step_update(
183
22
                        &event_tx,
184
22
                        command_id,
185
22
                        local_host.clone(),
186
22
                        repo_identity.clone(),
187
22
                        Some(repo.as_path().to_path_buf()),
188
22
                        i,
189
22
                        step_count,
190
22
                        step.description.clone(),
191
22
                        StepStatus::Failed { message: e.clone() },
192
                    );
193
22
                    return prior_result_or_error(&outcomes, e);
194
                }
195
            }
196
114
            i += 1;
197
        } else {
198
16
            let target_host = step_target;
199
            {
200
16
                let segment_start = i;
201
16
                let mut segment_steps = vec![step];
202
16
                i += 1;
203
27
                while i < step_count {
204
21
                    if *steps[i].host.host_name() == target_host {
205
11
                        segment_steps.push(steps[i].clone());
206
11
                        i += 1;
207
11
                    } else {
208
10
                        break;
209
                    }
210
                }
211
212
16
                let progress_sink = Arc::new(EventForwardingProgressSink {
213
16
                    command_id,
214
16
                    host: target_host.clone(),
215
16
                    repo_identity: repo_identity.clone(),
216
16
                    repo: Some(repo.clone()),
217
16
                    step_offset: segment_start,
218
16
                    step_count,
219
16
                    event_tx: event_tx.clone(),
220
16
                    state: Mutex::new(RemoteProgressState::default()),
221
16
                });
222
16
                let request = RemoteStepBatchRequest {
223
16
                    command_id,
224
16
                    target_host: target_host.clone(),
225
16
                    repo_identity: repo_identity.clone(),
226
16
                    repo: Some(repo.clone()),
227
16
                    step_offset: segment_start,
228
16
                    steps: segment_steps,
229
16
                };
230
231
16
                let batch = remote_executor.execute_batch(request, progress_sink.clone());
232
16
                tokio::pin!(batch);
233
234
16
                let (
cancelled_during_batch14
,
outcome14
) = tokio::select! {
235
16
                    
outcome10
= &mut batch =>
(false, outcome)10
,
236
16
                    _ = cancel.cancelled() => {
237
4
                        let outcome = match remote_executor.cancel_active_batch(command_id).await {
238
3
                            Ok(()) => tokio::time::timeout(Duration::from_secs(5), &mut batch)
239
3
                                .await
240
3
                                .unwrap_or_else(|_| Err(
"timed out waiting for remote batch cancellation"0
.
into0
())),
241
1
                            Err(message) => Err(message),
242
                        };
243
4
                        (true, outcome)
244
                    }
245
                };
246
247
14
                if cancelled_during_batch {
248
4
                    return CommandValue::Cancelled;
249
10
                }
250
251
10
                match outcome {
252
4
                    Ok(step_outcomes) => outcomes.extend(step_outcomes),
253
6
                    Err(e) => {
254
6
                        if let Some(
failure3
) = progress_sink.synthesized_failure(e.clone()) {
255
3
                            emit_step_update(
256
3
                                &event_tx,
257
3
                                command_id,
258
3
                                target_host.clone(),
259
3
                                repo_identity.clone(),
260
3
                                Some(repo.as_path().to_path_buf()),
261
3
                                segment_start + failure.batch_step_index,
262
3
                                step_count,
263
3
                                failure.description,
264
3
                                StepStatus::Failed { message: e.clone() },
265
3
                            );
266
3
                        }
267
6
                        return prior_result_or_error(&outcomes, e);
268
                    }
269
                }
270
            }
271
        }
272
    }
273
274
    // Return the last meaningful result, or Ok if no step produced one
275
66
    outcomes
276
66
        .into_iter()
277
66
        .rev()
278
108
        .
find_map66
(|o| match o {
279
34
            StepOutcome::CompletedWith(r) => Some(r),
280
74
            _ => None,
281
108
        })
282
66
        .unwrap_or(CommandValue::Ok)
283
102
}
284
285
#[allow(clippy::too_many_arguments)]
286
304
fn emit_step_update(
287
304
    event_tx: &broadcast::Sender<DaemonEvent>,
288
304
    command_id: u64,
289
304
    host: HostName,
290
304
    repo_identity: RepoIdentity,
291
304
    repo: Option<std::path::PathBuf>,
292
304
    step_index: usize,
293
304
    step_count: usize,
294
304
    description: String,
295
304
    status: StepStatus,
296
304
) {
297
304
    debug!(%command_id, %host, step_index, step_count, %description, ?status, "emit_step_update");
298
304
    let _ = event_tx.send(DaemonEvent::CommandStepUpdate {
299
304
        command_id,
300
304
        host,
301
304
        repo_identity,
302
304
        repo,
303
304
        step_index,
304
304
        step_count,
305
304
        description,
306
304
        status,
307
304
    });
308
304
}
309
310
28
fn prior_result_or_error(outcomes: &[StepOutcome], error: String) -> CommandValue {
311
28
    let prior_result = outcomes.iter().rev().find_map(|o| match 
o12
{
312
3
        StepOutcome::CompletedWith(r) => Some(r.clone()),
313
9
        _ => None,
314
12
    });
315
28
    prior_result.unwrap_or(CommandValue::Error { message: error })
316
28
}
317
318
struct EventForwardingProgressSink {
319
    command_id: u64,
320
    host: HostName,
321
    repo_identity: RepoIdentity,
322
    repo: Option<ExecutionEnvironmentPath>,
323
    step_offset: usize,
324
    step_count: usize,
325
    event_tx: broadcast::Sender<DaemonEvent>,
326
    state: Mutex<RemoteProgressState>,
327
}
328
329
#[derive(Default)]
330
struct RemoteProgressState {
331
    latest_batch_step_index: usize,
332
    latest_description: Option<String>,
333
    failed_emitted: bool,
334
}
335
336
struct SynthesizedRemoteFailure {
337
    batch_step_index: usize,
338
    description: String,
339
}
340
341
impl EventForwardingProgressSink {
342
6
    fn synthesized_failure(&self, message: String) -> Option<SynthesizedRemoteFailure> {
343
6
        let state = self.state.lock().expect("progress state mutex poisoned");
344
6
        if state.failed_emitted {
345
3
            return None;
346
3
        }
347
348
3
        Some(SynthesizedRemoteFailure {
349
3
            batch_step_index: state.latest_batch_step_index,
350
3
            description: state.latest_description.clone().unwrap_or(message),
351
3
        })
352
6
    }
353
}
354
355
#[async_trait::async_trait]
356
impl RemoteStepProgressSink for EventForwardingProgressSink {
357
26
    async fn emit(&self, update: RemoteStepProgressUpdate) {
358
        debug!(command_id = self.command_id, %self.host, batch_step_index = update.batch_step_index, ?update.status, %update.description, "remote progress received");
359
        {
360
            let mut state = self.state.lock().expect("progress state mutex poisoned");
361
            state.latest_batch_step_index = update.batch_step_index;
362
            state.latest_description = Some(update.description.clone());
363
            if matches!(update.status, StepStatus::Failed { .. }) {
364
                state.failed_emitted = true;
365
            }
366
        }
367
        emit_step_update(
368
            &self.event_tx,
369
            self.command_id,
370
            self.host.clone(),
371
            self.repo_identity.clone(),
372
25
            self.repo.as_ref().map(|repo| repo.as_path().to_path_buf()),
373
            self.step_offset + update.batch_step_index,
374
            self.step_count,
375
            update.description,
376
            update.status,
377
        );
378
26
    }
379
}
380
381
#[cfg(test)]
382
mod tests;