Coverage Report

Created: 2026-05-19 10:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
crates/flotilla-resources/src/controller/mod.rs
Line
Count
Source
1
use std::{
2
    collections::{BTreeMap, VecDeque},
3
    future::Future,
4
    marker::PhantomData,
5
    pin::Pin,
6
    time::Duration,
7
};
8
9
use chrono::{DateTime, Utc};
10
use futures::StreamExt;
11
use tokio::{sync::mpsc, task::JoinHandle};
12
13
use crate::{
14
    apply_status_patch,
15
    backend::{ResourceBackend, TypedResolver},
16
    checkout::CheckoutSpec,
17
    clone::CloneSpec,
18
    environment::EnvironmentSpec,
19
    error::ResourceError,
20
    presentation::PresentationSpec,
21
    resource::{InputMeta, Resource, ResourceObject},
22
    task_workspace::TaskWorkspaceSpec,
23
    terminal_session::TerminalSessionSpec,
24
    watch::{WatchEvent, WatchStart},
25
};
26
27
pub type Event = String;
28
29
#[allow(async_fn_in_trait)]
30
pub trait Reconciler: Send + Sync + 'static {
31
    type Resource: Resource;
32
    type Dependencies;
33
34
    /// Gather or prepare the dependency state needed for `reconcile()`.
35
    ///
36
    /// Reconcilers may use this hook for idempotent preparation work when the
37
    /// dependency result depends on performing that step first.
38
    async fn fetch_dependencies(&self, obj: &ResourceObject<Self::Resource>) -> Result<Self::Dependencies, ResourceError>;
39
40
    fn reconcile(
41
        &self,
42
        obj: &ResourceObject<Self::Resource>,
43
        deps: &Self::Dependencies,
44
        now: DateTime<Utc>,
45
    ) -> ReconcileOutcome<Self::Resource>;
46
47
    async fn run_finalizer(&self, obj: &ResourceObject<Self::Resource>) -> Result<(), ResourceError>;
48
49
    fn finalizer_name(&self) -> Option<&'static str>;
50
}
51
52
pub struct ReconcileOutcome<T: Resource> {
53
    pub patch: Option<T::StatusPatch>,
54
    pub actuations: Vec<Actuation>,
55
    pub events: Vec<Event>,
56
    pub requeue_after: Option<Duration>,
57
}
58
59
impl<T: Resource> ReconcileOutcome<T> {
60
73
    pub fn new(patch: Option<T::StatusPatch>) -> Self {
61
73
        Self { patch, actuations: Vec::new(), events: Vec::new(), requeue_after: None }
62
73
    }
63
64
195
    pub fn with_actuations(patch: Option<T::StatusPatch>, actuations: Vec<Actuation>) -> Self {
65
195
        Self { patch, actuations, events: Vec::new(), requeue_after: None }
66
195
    }
67
}
68
69
#[derive(Debug, Clone)]
70
pub enum Actuation {
71
    CreateEnvironment { meta: InputMeta, spec: EnvironmentSpec },
72
    CreateClone { meta: InputMeta, spec: CloneSpec },
73
    CreateCheckout { meta: InputMeta, spec: CheckoutSpec },
74
    CreateTerminalSession { meta: InputMeta, spec: TerminalSessionSpec },
75
    CreateTaskWorkspace { meta: InputMeta, spec: TaskWorkspaceSpec },
76
    CreatePresentation { meta: InputMeta, spec: PresentationSpec },
77
    DeletePresentation { name: String },
78
    DeleteTaskWorkspace { name: String },
79
}
80
81
pub trait SecondaryWatch: Send + Sync {
82
    type Primary: Resource;
83
84
    fn clone_box(&self) -> Box<dyn SecondaryWatch<Primary = Self::Primary>>;
85
86
    fn spawn(
87
        self: Box<Self>,
88
        backend: ResourceBackend,
89
        namespace: String,
90
        sender: mpsc::Sender<String>,
91
    ) -> Pin<Box<dyn Future<Output = Result<(), ResourceError>> + Send>>;
92
}
93
94
impl<P: Resource> Clone for Box<dyn SecondaryWatch<Primary = P>> {
95
56
    fn clone(&self) -> Self {
96
56
        self.clone_box()
97
56
    }
98
}
99
100
#[derive(Clone)]
101
pub struct LabelMappedWatch<W: Resource, P: Resource> {
102
    pub label_key: &'static str,
103
    pub _marker: PhantomData<(W, P)>,
104
}
105
106
impl<W: Resource, P: Resource> LabelMappedWatch<W, P> {
107
72
    async fn enqueue_from_object(
108
72
        label_key: &'static str,
109
72
        sender: &mpsc::Sender<String>,
110
72
        object: &ResourceObject<W>,
111
72
    ) -> Result<(), ResourceError> {
112
72
        if let Some(
primary57
) = object.metadata.labels.get(label_key) {
113
57
            sender
114
57
                .send(primary.clone())
115
57
                .await
116
57
                .map_err(|_| 
ResourceError::other0
("controller queue closed while forwarding secondary event"))
?0
;
117
15
        }
118
72
        Ok(())
119
72
    }
120
}
121
122
impl<W: Resource, P: Resource> SecondaryWatch for LabelMappedWatch<W, P> {
123
    type Primary = P;
124
125
39
    fn clone_box(&self) -> Box<dyn SecondaryWatch<Primary = Self::Primary>> {
126
39
        Box::new(Self { label_key: self.label_key, _marker: PhantomData })
127
39
    }
128
129
39
    fn spawn(
130
39
        self: Box<Self>,
131
39
        backend: ResourceBackend,
132
39
        namespace: String,
133
39
        sender: mpsc::Sender<String>,
134
39
    ) -> Pin<Box<dyn Future<Output = Result<(), ResourceError>> + Send>> {
135
39
        Box::pin(async move {
136
39
            let resolver = backend.using::<W>(&namespace);
137
39
            let listed = resolver.list().await
?0
;
138
39
            for 
object11
in &listed.items {
139
11
                Self::enqueue_from_object(self.label_key, &sender, object).await
?0
;
140
            }
141
142
39
            let mut watch = resolver.watch(WatchStart::FromVersion(listed.resource_version)).await
?0
;
143
100
            while let Some(
event61
) = watch.next().await {
144
61
                match event
?0
{
145
38
                    WatchEvent::Added(
object15
) | WatchEvent::Modified(object) | WatchEvent::Deleted(
object8
) => {
146
61
                        Self::enqueue_from_object(self.label_key, &sender, &object).await
?0
;
147
                    }
148
                }
149
            }
150
0
            Ok(())
151
0
        })
152
39
    }
153
}
154
155
#[derive(Clone)]
156
pub struct LabelJoinWatch<W: Resource, P: Resource> {
157
    pub label_key: &'static str,
158
    pub _marker: PhantomData<(W, P)>,
159
}
160
161
impl<W: Resource, P: Resource> LabelJoinWatch<W, P> {
162
11
    async fn enqueue_matching_primaries(
163
11
        label_key: &'static str,
164
11
        sender: &mpsc::Sender<String>,
165
11
        watched: &ResourceObject<W>,
166
11
        primaries: &TypedResolver<P>,
167
11
    ) -> Result<(), ResourceError> {
168
11
        let Some(value) = watched.metadata.labels.get(label_key) else {
169
0
            return Ok(());
170
        };
171
11
        let selector = BTreeMap::from([(label_key.to_string(), value.clone())]);
172
11
        let listed = primaries.list_matching_labels(&selector).await
?0
;
173
11
        for 
object9
in listed.items {
174
9
            sender
175
9
                .send(object.metadata.name)
176
9
                .await
177
9
                .map_err(|_| 
ResourceError::other0
("controller queue closed while forwarding joined secondary event"))
?0
;
178
        }
179
11
        Ok(())
180
11
    }
181
}
182
183
impl<W: Resource, P: Resource> SecondaryWatch for LabelJoinWatch<W, P> {
184
    type Primary = P;
185
186
14
    fn clone_box(&self) -> Box<dyn SecondaryWatch<Primary = Self::Primary>> {
187
14
        Box::new(Self { label_key: self.label_key, _marker: PhantomData })
188
14
    }
189
190
14
    fn spawn(
191
14
        self: Box<Self>,
192
14
        backend: ResourceBackend,
193
14
        namespace: String,
194
14
        sender: mpsc::Sender<String>,
195
14
    ) -> Pin<Box<dyn Future<Output = Result<(), ResourceError>> + Send>> {
196
14
        Box::pin(async move {
197
14
            let watched = backend.clone().using::<W>(&namespace);
198
14
            let primaries = backend.using::<P>(&namespace);
199
14
            let listed = watched.list().await
?0
;
200
14
            for 
object2
in &listed.items {
201
2
                Self::enqueue_matching_primaries(self.label_key, &sender, object, &primaries).await
?0
;
202
            }
203
204
14
            let mut watch = watched.watch(WatchStart::FromVersion(listed.resource_version)).await
?0
;
205
23
            while let Some(
event9
) = watch.next().await {
206
9
                match event
?0
{
207
6
                    WatchEvent::Added(
object3
) | WatchEvent::Modified(object) | WatchEvent::Deleted(
object0
) => {
208
9
                        Self::enqueue_matching_primaries(self.label_key, &sender, &object, &primaries).await
?0
;
209
                    }
210
                }
211
            }
212
0
            Ok(())
213
0
        })
214
14
    }
215
}
216
217
enum WatchExited {
218
    Primary(Result<(), ResourceError>),
219
    Secondary { index: usize, result: Result<(), ResourceError> },
220
}
221
222
pub struct ControllerLoop<R: Reconciler> {
223
    pub primary: TypedResolver<R::Resource>,
224
    pub secondaries: Vec<Box<dyn SecondaryWatch<Primary = R::Resource>>>,
225
    pub reconciler: R,
226
    pub resync_interval: Duration,
227
    pub backend: ResourceBackend,
228
}
229
230
impl<R: Reconciler> ControllerLoop<R> {
231
    const WATCH_RESTART_BACKOFF: Duration = Duration::from_millis(100);
232
233
31
    async fn apply_actuation(backend: &ResourceBackend, namespace: &str, actuation: Actuation) -> Result<(), ResourceError> {
234
31
        match actuation {
235
0
            Actuation::CreateEnvironment { meta, spec } => {
236
0
                let resolver = backend.using::<crate::Environment>(namespace);
237
0
                Self::create_if_missing(&resolver, meta, spec).await
238
            }
239
1
            Actuation::CreateClone { meta, spec } => {
240
1
                let resolver = backend.using::<crate::Clone>(namespace);
241
1
                Self::create_if_missing(&resolver, meta, spec).await
242
            }
243
2
            Actuation::CreateCheckout { meta, spec } => {
244
2
                let resolver = backend.using::<crate::Checkout>(namespace);
245
2
                Self::create_if_missing(&resolver, meta, spec).await
246
            }
247
2
            Actuation::CreateTerminalSession { meta, spec } => {
248
2
                let resolver = backend.using::<crate::TerminalSession>(namespace);
249
2
                Self::create_if_missing(&resolver, meta, spec).await
250
            }
251
3
            Actuation::CreateTaskWorkspace { meta, spec } => {
252
3
                let resolver = backend.using::<crate::TaskWorkspace>(namespace);
253
3
                Self::create_if_missing(&resolver, meta, spec).await
254
            }
255
16
            Actuation::CreatePresentation { meta, spec } => {
256
16
                let resolver = backend.using::<crate::Presentation>(namespace);
257
16
                Self::create_if_missing(&resolver, meta, spec).await
258
            }
259
4
            Actuation::DeletePresentation { name } => {
260
4
                let resolver = backend.using::<crate::Presentation>(namespace);
261
4
                match resolver.delete(&name).await {
262
4
                    Ok(()) | Err(ResourceError::NotFound { .. }) => Ok(()),
263
0
                    Err(err) => Err(err),
264
                }
265
            }
266
3
            Actuation::DeleteTaskWorkspace { name } => {
267
3
                let resolver = backend.using::<crate::TaskWorkspace>(namespace);
268
3
                match resolver.delete(&name).await {
269
3
                    Ok(()) | Err(ResourceError::NotFound { .. }) => Ok(()),
270
0
                    Err(err) => Err(err),
271
                }
272
            }
273
        }
274
31
    }
275
276
24
    async fn create_if_missing<T: Resource>(resolver: &TypedResolver<T>, meta: InputMeta, spec: T::Spec) -> Result<(), ResourceError> {
277
24
        match resolver.create(&meta, &spec).await {
278
24
            Ok(_) | Err(ResourceError::Conflict { .. }) => Ok(()),
279
0
            Err(err) => Err(err),
280
        }
281
24
    }
282
283
60
    fn spawn_primary_watch(
284
60
        primary: TypedResolver<R::Resource>,
285
60
        sender: mpsc::Sender<String>,
286
60
        watch_exited: mpsc::UnboundedSender<WatchExited>,
287
60
        restart_backoff: Option<Duration>,
288
60
    ) -> JoinHandle<()> {
289
60
        tokio::spawn(async move {
290
60
            if let Some(
backoff0
) = restart_backoff {
291
0
                tokio::time::sleep(backoff).await;
292
60
            }
293
60
            let 
result0
= async {
294
60
                let listed = primary.list().await
?0
;
295
60
                for 
object34
in &listed.items {
296
34
                    sender
297
34
                        .send(object.metadata.name.clone())
298
34
                        .await
299
34
                        .map_err(|_| 
ResourceError::other0
("controller queue closed while forwarding initial primary list"))
?0
;
300
                }
301
302
60
                let mut watch = primary.watch(WatchStart::FromVersion(listed.resource_version)).await
?0
;
303
327
                while let Some(
event267
) = watch.next().await {
304
267
                    match event
?0
{
305
252
                        WatchEvent::Added(
object9
) | WatchEvent::Modified(object) | WatchEvent::Deleted(
object6
) => {
306
267
                            sender
307
267
                                .send(object.metadata.name)
308
267
                                .await
309
267
                                .map_err(|_| 
ResourceError::other0
("controller queue closed while forwarding primary watch event"))
?0
;
310
                        }
311
                    }
312
                }
313
0
                Ok(())
314
0
            }
315
60
            .await;
316
317
0
            let _ = watch_exited.send(WatchExited::Primary(result));
318
0
        })
319
60
    }
320
321
56
    fn spawn_secondary_watch(
322
56
        index: usize,
323
56
        watch: Box<dyn SecondaryWatch<Primary = R::Resource>>,
324
56
        backend: ResourceBackend,
325
56
        namespace: String,
326
56
        sender: mpsc::Sender<String>,
327
56
        watch_exited: mpsc::UnboundedSender<WatchExited>,
328
56
        restart_backoff: Option<Duration>,
329
56
    ) -> JoinHandle<()> {
330
56
        tokio::spawn(async move {
331
56
            if let Some(
backoff2
) = restart_backoff {
332
2
                tokio::time::sleep(backoff).await;
333
54
            }
334
55
            let 
result2
= watch.spawn(backend, namespace, sender).await;
335
2
            let _ = watch_exited.send(WatchExited::Secondary { index, result });
336
2
        })
337
56
    }
338
339
23
    async fn resync_all(primary: &TypedResolver<R::Resource>, sender: &mpsc::Sender<String>) -> Result<(), ResourceError> {
340
23
        let listed = primary.list().await
?0
;
341
23
        for 
object19
in listed.items {
342
19
            sender
343
19
                .send(object.metadata.name)
344
19
                .await
345
19
                .map_err(|_| 
ResourceError::other0
("controller queue closed while forwarding resync item"))
?0
;
346
        }
347
23
        Ok(())
348
23
    }
349
350
60
    pub async fn run(self) -> Result<(), ResourceError>
351
60
    where
352
60
        <R::Resource as Resource>::Status: Default,
353
60
    {
354
60
        let ControllerLoop { primary, secondaries, reconciler, resync_interval, backend } = self;
355
60
        let (sender, mut receiver) = mpsc::channel::<String>(128);
356
60
        let (watch_exited_tx, mut watch_exited_rx) = mpsc::unbounded_channel();
357
60
        let _primary_watch = Self::spawn_primary_watch(primary.clone(), sender.clone(), watch_exited_tx.clone(), None);
358
60
        let secondary_templates = secondaries;
359
60
        let _secondary_watches: Vec<JoinHandle<()>> = secondary_templates
360
60
            .iter()
361
60
            .enumerate()
362
60
            .map(|(index, watch)| 
{54
363
54
                Self::spawn_secondary_watch(
364
54
                    index,
365
54
                    watch.clone(),
366
54
                    backend.clone(),
367
54
                    primary.namespace.clone(),
368
54
                    sender.clone(),
369
54
                    watch_exited_tx.clone(),
370
54
                    None,
371
                )
372
54
            })
373
60
            .collect();
374
60
        let mut resync = tokio::time::interval(resync_interval);
375
60
        resync.tick().await;
376
53
        let mut pending: VecDeque<String> = VecDeque::new();
377
378
        loop {
379
747
            if let Some(
name337
) = pending.pop_front() {
380
337
                let 
object330
= match primary.get(&name).await {
381
330
                    Ok(object) => object,
382
7
                    Err(ResourceError::NotFound { .. }) => continue,
383
0
                    Err(err) => return Err(err),
384
                };
385
330
                if let Some(
finalizer_name317
) = reconciler.finalizer_name() {
386
317
                    if object.metadata.deletion_timestamp.is_none()
387
313
                        && object.metadata.finalizers.iter().all(|finalizer| 
finalizer282
!=
finalizer_name282
)
388
                    {
389
31
                        let meta = InputMeta::from(&object.metadata).with_added_finalizer(finalizer_name);
390
                        // A racing writer may win between get() and update(); rely on the resulting
391
                        // watch event to requeue the object and retry finalizer attachment.
392
31
                        primary.update(&meta, &object.metadata.resource_version, &object.spec).await
?0
;
393
31
                        continue;
394
286
                    }
395
286
                    if object.metadata.deletion_timestamp.is_some()
396
4
                        && object.metadata.finalizers.iter().any(|finalizer| finalizer == finalizer_name)
397
                    {
398
4
                        reconciler.run_finalizer(&object).await
?0
;
399
4
                        let meta = InputMeta::from(&object.metadata).without_finalizer(finalizer_name);
400
4
                        primary.update(&meta, &object.metadata.resource_version, &object.spec).await
?0
;
401
4
                        continue;
402
282
                    }
403
13
                }
404
295
                if object.metadata.deletion_timestamp.is_some() {
405
0
                    continue;
406
295
                }
407
295
                let deps = reconciler.fetch_dependencies(&object).await
?0
;
408
295
                let outcome = reconciler.reconcile(&object, &deps, Utc::now());
409
295
                for 
actuation31
in outcome.actuations {
410
31
                    Self::apply_actuation(&primary.backend, &primary.namespace, actuation).await
?0
;
411
                }
412
295
                if let Some(
patch212
) = outcome.patch {
413
212
                    apply_status_patch(&primary, &name, &patch).await
?0
;
414
83
                }
415
295
                continue;
416
410
            }
417
418
410
            tokio::select! {
419
410
                
maybe_name332
= receiver.recv() => {
420
332
                    let Some(name) = maybe_name else {
421
0
                        return Ok(());
422
                    };
423
383
                    while let Ok(
next51
) = receiver.try_recv() {
424
51
                        if next != name {
425
5
                            pending.push_back(next);
426
46
                        }
427
                    }
428
332
                    pending.push_front(name);
429
                }
430
410
                _ = resync.tick() => {
431
23
                    Self::resync_all(&primary, &sender).await
?0
;
432
                }
433
410
                Some(
exited2
) = watch_exited_rx.recv() => {
434
2
                    match exited {
435
0
                        WatchExited::Primary(result) => {
436
0
                            result?;
437
0
                            let _respawn = Self::spawn_primary_watch(
438
0
                                primary.clone(),
439
0
                                sender.clone(),
440
0
                                watch_exited_tx.clone(),
441
0
                                Some(Self::WATCH_RESTART_BACKOFF),
442
                            );
443
                        }
444
2
                        WatchExited::Secondary { index, result } => {
445
2
                            result
?0
;
446
2
                            let _respawn = Self::spawn_secondary_watch(
447
2
                                index,
448
2
                                secondary_templates[index].clone(),
449
2
                                backend.clone(),
450
2
                                primary.namespace.clone(),
451
2
                                sender.clone(),
452
2
                                watch_exited_tx.clone(),
453
2
                                Some(Self::WATCH_RESTART_BACKOFF),
454
                            );
455
                        }
456
                    }
457
                }
458
            }
459
        }
460
0
    }
461
}
462
463
/// List every resource matching `selector` and delete it, swallowing `NotFound` from
464
/// races with concurrent deletes. Used by cascading-teardown reconcilers (TaskWorkspace,
465
/// Convoy) to garbage-collect their owned children before the finalizer is removed.
466
11
pub async fn delete_matching<T: Resource>(resolver: &TypedResolver<T>, selector: &BTreeMap<String, String>) -> Result<(), ResourceError> {
467
11
    let listed = resolver.list_matching_labels(selector).await
?0
;
468
11
    for 
object9
in listed.items {
469
9
        match resolver.delete(&object.metadata.name).await {
470
9
            Ok(()) | Err(ResourceError::NotFound { .. }) => {}
471
0
            Err(err) => return Err(err),
472
        }
473
    }
474
11
    Ok(())
475
11
}