crates/flotilla-core/src/step.rs
Line | Count | Source |
1 | | use std::{ |
2 | | sync::{Arc, Mutex}, |
3 | | time::Duration, |
4 | | }; |
5 | | |
6 | | use flotilla_protocol::{CommandValue, DaemonEvent, HostName, RepoIdentity, StepStatus}; |
7 | | pub use flotilla_protocol::{Step, StepAction, StepExecutionContext, StepOutcome}; |
8 | | use tokio::sync::broadcast; |
9 | | use tokio_util::sync::CancellationToken; |
10 | | use tracing::{debug, info}; |
11 | | |
12 | | use crate::path_context::ExecutionEnvironmentPath; |
13 | | |
14 | | /// Resolves symbolic step actions into outcomes. |
15 | | #[async_trait::async_trait] |
16 | | pub trait StepResolver: Send + Sync { |
17 | | async fn resolve( |
18 | | &self, |
19 | | description: &str, |
20 | | context: &StepExecutionContext, |
21 | | action: StepAction, |
22 | | prior: &[StepOutcome], |
23 | | ) -> Result<StepOutcome, String>; |
24 | | } |
25 | | |
26 | | pub struct RemoteStepBatchRequest { |
27 | | pub command_id: u64, |
28 | | pub target_host: HostName, |
29 | | pub repo_identity: RepoIdentity, |
30 | | /// Requester-local repo path used when remapping remote progress into |
31 | | /// `DaemonEvent::CommandStepUpdate` for the UI. Remote execution itself |
32 | | /// resolves the actual repo root from `repo_identity`. This may be absent |
33 | | /// when the requester only has identity metadata for the repo. |
34 | | pub repo: Option<ExecutionEnvironmentPath>, |
35 | | /// Global step index of the first step in this batch on the requester. |
36 | | /// |
37 | | /// The executing host emits batch-relative progress indices. The requester |
38 | | /// uses this offset when remapping those updates into the global command |
39 | | /// timeline that the UI already understands. |
40 | | pub step_offset: usize, |
41 | | pub steps: Vec<Step>, |
42 | | } |
43 | | |
44 | | #[derive(Clone, Debug, PartialEq, Eq)] |
45 | | pub struct RemoteStepProgressUpdate { |
46 | | pub batch_step_index: usize, |
47 | | pub batch_step_count: usize, |
48 | | pub description: String, |
49 | | pub status: StepStatus, |
50 | | } |
51 | | |
52 | | #[async_trait::async_trait] |
53 | | pub trait RemoteStepProgressSink: Send + Sync { |
54 | | async fn emit(&self, update: RemoteStepProgressUpdate); |
55 | | } |
56 | | |
57 | | #[async_trait::async_trait] |
58 | | pub trait RemoteStepExecutor: Send + Sync { |
59 | | async fn execute_batch( |
60 | | &self, |
61 | | request: RemoteStepBatchRequest, |
62 | | progress_sink: Arc<dyn RemoteStepProgressSink>, |
63 | | ) -> Result<Vec<StepOutcome>, String>; |
64 | | |
65 | | async fn cancel_active_batch(&self, command_id: u64) -> Result<(), String>; |
66 | | } |
67 | | |
68 | | pub struct UnsupportedRemoteStepExecutor; |
69 | | |
70 | | #[async_trait::async_trait] |
71 | | impl RemoteStepExecutor for UnsupportedRemoteStepExecutor { |
72 | | async fn execute_batch( |
73 | | &self, |
74 | | request: RemoteStepBatchRequest, |
75 | | _progress_sink: Arc<dyn RemoteStepProgressSink>, |
76 | 0 | ) -> Result<Vec<StepOutcome>, String> { |
77 | | Err(format!("remote step execution is not wired for host {}", request.target_host)) |
78 | 0 | } |
79 | | |
80 | 0 | async fn cancel_active_batch(&self, _command_id: u64) -> Result<(), String> { |
81 | | Ok(()) |
82 | 0 | } |
83 | | } |
84 | | |
85 | | /// A plan of steps to execute for a command. |
86 | | pub struct StepPlan { |
87 | | pub steps: Vec<Step>, |
88 | | } |
89 | | |
90 | | impl StepPlan { |
91 | 152 | pub fn new(steps: Vec<Step>) -> Self { |
92 | 152 | Self { steps } |
93 | 152 | } |
94 | | } |
95 | | |
96 | | /// Execute a step plan, emitting progress events and checking cancellation between steps. |
97 | | #[allow(clippy::too_many_arguments)] |
98 | 73 | pub async fn run_step_plan( |
99 | 73 | plan: StepPlan, |
100 | 73 | command_id: u64, |
101 | 73 | local_host: HostName, |
102 | 73 | repo_identity: RepoIdentity, |
103 | 73 | repo: ExecutionEnvironmentPath, |
104 | 73 | cancel: CancellationToken, |
105 | 73 | event_tx: broadcast::Sender<DaemonEvent>, |
106 | 73 | resolver: &dyn StepResolver, |
107 | 73 | ) -> CommandValue { |
108 | 73 | let remote_executor = UnsupportedRemoteStepExecutor; |
109 | 73 | run_step_plan_with_remote_executor(plan, command_id, local_host, repo_identity, repo, cancel, event_tx, resolver, &remote_executor) |
110 | 73 | .await |
111 | 73 | } |
112 | | |
113 | | /// Execute a step plan with explicit remote-step handling. |
114 | | #[allow(clippy::too_many_arguments)] |
115 | 104 | pub async fn run_step_plan_with_remote_executor( |
116 | 104 | plan: StepPlan, |
117 | 104 | command_id: u64, |
118 | 104 | local_host: HostName, |
119 | 104 | repo_identity: RepoIdentity, |
120 | 104 | repo: ExecutionEnvironmentPath, |
121 | 104 | cancel: CancellationToken, |
122 | 104 | event_tx: broadcast::Sender<DaemonEvent>, |
123 | 104 | resolver: &dyn StepResolver, |
124 | 104 | remote_executor: &dyn RemoteStepExecutor, |
125 | 104 | ) -> CommandValue { |
126 | 104 | let step_count = plan.steps.len(); |
127 | 104 | info!(%command_id, %step_count, %local_host, "running step plan"); |
128 | 104 | let mut outcomes: Vec<StepOutcome> = Vec::new(); |
129 | 104 | let steps = plan.steps; |
130 | 104 | let mut i = 0usize; |
131 | | |
132 | 222 | while i < step_count { |
133 | 156 | if cancel.is_cancelled() { |
134 | 1 | return CommandValue::Cancelled; |
135 | 155 | } |
136 | | |
137 | 155 | let step = steps[i].clone(); |
138 | 155 | let step_target = step.host.host_name().clone(); |
139 | 155 | debug!(%command_id, %step_target, %local_host, step_index = i, desc = %step.description, "step dispatch"); |
140 | | |
141 | 155 | if step_target == local_host { |
142 | 139 | emit_step_update( |
143 | 139 | &event_tx, |
144 | 139 | command_id, |
145 | 139 | local_host.clone(), |
146 | 139 | repo_identity.clone(), |
147 | 139 | Some(repo.as_path().to_path_buf()), |
148 | 139 | i, |
149 | 139 | step_count, |
150 | 139 | step.description.clone(), |
151 | 139 | StepStatus::Started, |
152 | | ); |
153 | | |
154 | 139 | let outcome = resolver.resolve(&step.description, &step.host, step.action, &outcomes).await; |
155 | | |
156 | | // Cancellation wins over a successful in-flight step, but provider |
157 | | // errors still surface so we don't hide the underlying failure. |
158 | 139 | if cancel.is_cancelled() && outcome3 .is_ok3 () { |
159 | 3 | return CommandValue::Cancelled; |
160 | 136 | } |
161 | | |
162 | 136 | match outcome { |
163 | 114 | Ok(step_outcome) => { |
164 | 114 | let status = match &step_outcome { |
165 | 2 | StepOutcome::Skipped => StepStatus::Skipped, |
166 | 112 | _ => StepStatus::Succeeded, |
167 | | }; |
168 | 114 | emit_step_update( |
169 | 114 | &event_tx, |
170 | 114 | command_id, |
171 | 114 | local_host.clone(), |
172 | 114 | repo_identity.clone(), |
173 | 114 | Some(repo.as_path().to_path_buf()), |
174 | 114 | i, |
175 | 114 | step_count, |
176 | 114 | step.description.clone(), |
177 | 114 | status, |
178 | | ); |
179 | 114 | outcomes.push(step_outcome); |
180 | | } |
181 | 22 | Err(e) => { |
182 | 22 | emit_step_update( |
183 | 22 | &event_tx, |
184 | 22 | command_id, |
185 | 22 | local_host.clone(), |
186 | 22 | repo_identity.clone(), |
187 | 22 | Some(repo.as_path().to_path_buf()), |
188 | 22 | i, |
189 | 22 | step_count, |
190 | 22 | step.description.clone(), |
191 | 22 | StepStatus::Failed { message: e.clone() }, |
192 | | ); |
193 | 22 | return prior_result_or_error(&outcomes, e); |
194 | | } |
195 | | } |
196 | 114 | i += 1; |
197 | | } else { |
198 | 16 | let target_host = step_target; |
199 | | { |
200 | 16 | let segment_start = i; |
201 | 16 | let mut segment_steps = vec![step]; |
202 | 16 | i += 1; |
203 | 27 | while i < step_count { |
204 | 21 | if *steps[i].host.host_name() == target_host { |
205 | 11 | segment_steps.push(steps[i].clone()); |
206 | 11 | i += 1; |
207 | 11 | } else { |
208 | 10 | break; |
209 | | } |
210 | | } |
211 | | |
212 | 16 | let progress_sink = Arc::new(EventForwardingProgressSink { |
213 | 16 | command_id, |
214 | 16 | host: target_host.clone(), |
215 | 16 | repo_identity: repo_identity.clone(), |
216 | 16 | repo: Some(repo.clone()), |
217 | 16 | step_offset: segment_start, |
218 | 16 | step_count, |
219 | 16 | event_tx: event_tx.clone(), |
220 | 16 | state: Mutex::new(RemoteProgressState::default()), |
221 | 16 | }); |
222 | 16 | let request = RemoteStepBatchRequest { |
223 | 16 | command_id, |
224 | 16 | target_host: target_host.clone(), |
225 | 16 | repo_identity: repo_identity.clone(), |
226 | 16 | repo: Some(repo.clone()), |
227 | 16 | step_offset: segment_start, |
228 | 16 | steps: segment_steps, |
229 | 16 | }; |
230 | | |
231 | 16 | let batch = remote_executor.execute_batch(request, progress_sink.clone()); |
232 | 16 | tokio::pin!(batch); |
233 | | |
234 | 16 | let (cancelled_during_batch14 , outcome14 ) = tokio::select! { |
235 | 16 | outcome10 = &mut batch => (false, outcome)10 , |
236 | 16 | _ = cancel.cancelled() => { |
237 | 4 | let outcome = match remote_executor.cancel_active_batch(command_id).await { |
238 | 3 | Ok(()) => tokio::time::timeout(Duration::from_secs(5), &mut batch) |
239 | 3 | .await |
240 | 3 | .unwrap_or_else(|_| Err("timed out waiting for remote batch cancellation"0 .into0 ())), |
241 | 1 | Err(message) => Err(message), |
242 | | }; |
243 | 4 | (true, outcome) |
244 | | } |
245 | | }; |
246 | | |
247 | 14 | if cancelled_during_batch { |
248 | 4 | return CommandValue::Cancelled; |
249 | 10 | } |
250 | | |
251 | 10 | match outcome { |
252 | 4 | Ok(step_outcomes) => outcomes.extend(step_outcomes), |
253 | 6 | Err(e) => { |
254 | 6 | if let Some(failure3 ) = progress_sink.synthesized_failure(e.clone()) { |
255 | 3 | emit_step_update( |
256 | 3 | &event_tx, |
257 | 3 | command_id, |
258 | 3 | target_host.clone(), |
259 | 3 | repo_identity.clone(), |
260 | 3 | Some(repo.as_path().to_path_buf()), |
261 | 3 | segment_start + failure.batch_step_index, |
262 | 3 | step_count, |
263 | 3 | failure.description, |
264 | 3 | StepStatus::Failed { message: e.clone() }, |
265 | 3 | ); |
266 | 3 | } |
267 | 6 | return prior_result_or_error(&outcomes, e); |
268 | | } |
269 | | } |
270 | | } |
271 | | } |
272 | | } |
273 | | |
274 | | // Return the last meaningful result, or Ok if no step produced one |
275 | 66 | outcomes |
276 | 66 | .into_iter() |
277 | 66 | .rev() |
278 | 108 | .find_map66 (|o| match o { |
279 | 34 | StepOutcome::CompletedWith(r) => Some(r), |
280 | 74 | _ => None, |
281 | 108 | }) |
282 | 66 | .unwrap_or(CommandValue::Ok) |
283 | 102 | } |
284 | | |
285 | | #[allow(clippy::too_many_arguments)] |
286 | 304 | fn emit_step_update( |
287 | 304 | event_tx: &broadcast::Sender<DaemonEvent>, |
288 | 304 | command_id: u64, |
289 | 304 | host: HostName, |
290 | 304 | repo_identity: RepoIdentity, |
291 | 304 | repo: Option<std::path::PathBuf>, |
292 | 304 | step_index: usize, |
293 | 304 | step_count: usize, |
294 | 304 | description: String, |
295 | 304 | status: StepStatus, |
296 | 304 | ) { |
297 | 304 | debug!(%command_id, %host, step_index, step_count, %description, ?status, "emit_step_update"); |
298 | 304 | let _ = event_tx.send(DaemonEvent::CommandStepUpdate { |
299 | 304 | command_id, |
300 | 304 | host, |
301 | 304 | repo_identity, |
302 | 304 | repo, |
303 | 304 | step_index, |
304 | 304 | step_count, |
305 | 304 | description, |
306 | 304 | status, |
307 | 304 | }); |
308 | 304 | } |
309 | | |
310 | 28 | fn prior_result_or_error(outcomes: &[StepOutcome], error: String) -> CommandValue { |
311 | 28 | let prior_result = outcomes.iter().rev().find_map(|o| match o12 { |
312 | 3 | StepOutcome::CompletedWith(r) => Some(r.clone()), |
313 | 9 | _ => None, |
314 | 12 | }); |
315 | 28 | prior_result.unwrap_or(CommandValue::Error { message: error }) |
316 | 28 | } |
317 | | |
318 | | struct EventForwardingProgressSink { |
319 | | command_id: u64, |
320 | | host: HostName, |
321 | | repo_identity: RepoIdentity, |
322 | | repo: Option<ExecutionEnvironmentPath>, |
323 | | step_offset: usize, |
324 | | step_count: usize, |
325 | | event_tx: broadcast::Sender<DaemonEvent>, |
326 | | state: Mutex<RemoteProgressState>, |
327 | | } |
328 | | |
329 | | #[derive(Default)] |
330 | | struct RemoteProgressState { |
331 | | latest_batch_step_index: usize, |
332 | | latest_description: Option<String>, |
333 | | failed_emitted: bool, |
334 | | } |
335 | | |
336 | | struct SynthesizedRemoteFailure { |
337 | | batch_step_index: usize, |
338 | | description: String, |
339 | | } |
340 | | |
341 | | impl EventForwardingProgressSink { |
342 | 6 | fn synthesized_failure(&self, message: String) -> Option<SynthesizedRemoteFailure> { |
343 | 6 | let state = self.state.lock().expect("progress state mutex poisoned"); |
344 | 6 | if state.failed_emitted { |
345 | 3 | return None; |
346 | 3 | } |
347 | | |
348 | 3 | Some(SynthesizedRemoteFailure { |
349 | 3 | batch_step_index: state.latest_batch_step_index, |
350 | 3 | description: state.latest_description.clone().unwrap_or(message), |
351 | 3 | }) |
352 | 6 | } |
353 | | } |
354 | | |
355 | | #[async_trait::async_trait] |
356 | | impl RemoteStepProgressSink for EventForwardingProgressSink { |
357 | 26 | async fn emit(&self, update: RemoteStepProgressUpdate) { |
358 | | debug!(command_id = self.command_id, %self.host, batch_step_index = update.batch_step_index, ?update.status, %update.description, "remote progress received"); |
359 | | { |
360 | | let mut state = self.state.lock().expect("progress state mutex poisoned"); |
361 | | state.latest_batch_step_index = update.batch_step_index; |
362 | | state.latest_description = Some(update.description.clone()); |
363 | | if matches!(update.status, StepStatus::Failed { .. }) { |
364 | | state.failed_emitted = true; |
365 | | } |
366 | | } |
367 | | emit_step_update( |
368 | | &self.event_tx, |
369 | | self.command_id, |
370 | | self.host.clone(), |
371 | | self.repo_identity.clone(), |
372 | 25 | self.repo.as_ref().map(|repo| repo.as_path().to_path_buf()), |
373 | | self.step_offset + update.batch_step_index, |
374 | | self.step_count, |
375 | | update.description, |
376 | | update.status, |
377 | | ); |
378 | 26 | } |
379 | | } |
380 | | |
381 | | #[cfg(test)] |
382 | | mod tests; |