crates/flotilla-resources/src/controller/mod.rs
Line | Count | Source |
1 | | use std::{ |
2 | | collections::{BTreeMap, VecDeque}, |
3 | | future::Future, |
4 | | marker::PhantomData, |
5 | | pin::Pin, |
6 | | time::Duration, |
7 | | }; |
8 | | |
9 | | use chrono::{DateTime, Utc}; |
10 | | use futures::StreamExt; |
11 | | use tokio::{sync::mpsc, task::JoinHandle}; |
12 | | |
13 | | use crate::{ |
14 | | apply_status_patch, |
15 | | backend::{ResourceBackend, TypedResolver}, |
16 | | checkout::CheckoutSpec, |
17 | | clone::CloneSpec, |
18 | | environment::EnvironmentSpec, |
19 | | error::ResourceError, |
20 | | presentation::PresentationSpec, |
21 | | resource::{InputMeta, Resource, ResourceObject}, |
22 | | task_workspace::TaskWorkspaceSpec, |
23 | | terminal_session::TerminalSessionSpec, |
24 | | watch::{WatchEvent, WatchStart}, |
25 | | }; |
26 | | |
27 | | pub type Event = String; |
28 | | |
29 | | #[allow(async_fn_in_trait)] |
30 | | pub trait Reconciler: Send + Sync + 'static { |
31 | | type Resource: Resource; |
32 | | type Dependencies; |
33 | | |
34 | | /// Gather or prepare the dependency state needed for `reconcile()`. |
35 | | /// |
36 | | /// Reconcilers may use this hook for idempotent preparation work when the |
37 | | /// dependency result depends on performing that step first. |
38 | | async fn fetch_dependencies(&self, obj: &ResourceObject<Self::Resource>) -> Result<Self::Dependencies, ResourceError>; |
39 | | |
40 | | fn reconcile( |
41 | | &self, |
42 | | obj: &ResourceObject<Self::Resource>, |
43 | | deps: &Self::Dependencies, |
44 | | now: DateTime<Utc>, |
45 | | ) -> ReconcileOutcome<Self::Resource>; |
46 | | |
47 | | async fn run_finalizer(&self, obj: &ResourceObject<Self::Resource>) -> Result<(), ResourceError>; |
48 | | |
49 | | fn finalizer_name(&self) -> Option<&'static str>; |
50 | | } |
51 | | |
52 | | pub struct ReconcileOutcome<T: Resource> { |
53 | | pub patch: Option<T::StatusPatch>, |
54 | | pub actuations: Vec<Actuation>, |
55 | | pub events: Vec<Event>, |
56 | | pub requeue_after: Option<Duration>, |
57 | | } |
58 | | |
59 | | impl<T: Resource> ReconcileOutcome<T> { |
60 | 73 | pub fn new(patch: Option<T::StatusPatch>) -> Self { |
61 | 73 | Self { patch, actuations: Vec::new(), events: Vec::new(), requeue_after: None } |
62 | 73 | } |
63 | | |
64 | 195 | pub fn with_actuations(patch: Option<T::StatusPatch>, actuations: Vec<Actuation>) -> Self { |
65 | 195 | Self { patch, actuations, events: Vec::new(), requeue_after: None } |
66 | 195 | } |
67 | | } |
68 | | |
69 | | #[derive(Debug, Clone)] |
70 | | pub enum Actuation { |
71 | | CreateEnvironment { meta: InputMeta, spec: EnvironmentSpec }, |
72 | | CreateClone { meta: InputMeta, spec: CloneSpec }, |
73 | | CreateCheckout { meta: InputMeta, spec: CheckoutSpec }, |
74 | | CreateTerminalSession { meta: InputMeta, spec: TerminalSessionSpec }, |
75 | | CreateTaskWorkspace { meta: InputMeta, spec: TaskWorkspaceSpec }, |
76 | | CreatePresentation { meta: InputMeta, spec: PresentationSpec }, |
77 | | DeletePresentation { name: String }, |
78 | | DeleteTaskWorkspace { name: String }, |
79 | | } |
80 | | |
81 | | pub trait SecondaryWatch: Send + Sync { |
82 | | type Primary: Resource; |
83 | | |
84 | | fn clone_box(&self) -> Box<dyn SecondaryWatch<Primary = Self::Primary>>; |
85 | | |
86 | | fn spawn( |
87 | | self: Box<Self>, |
88 | | backend: ResourceBackend, |
89 | | namespace: String, |
90 | | sender: mpsc::Sender<String>, |
91 | | ) -> Pin<Box<dyn Future<Output = Result<(), ResourceError>> + Send>>; |
92 | | } |
93 | | |
94 | | impl<P: Resource> Clone for Box<dyn SecondaryWatch<Primary = P>> { |
95 | 56 | fn clone(&self) -> Self { |
96 | 56 | self.clone_box() |
97 | 56 | } |
98 | | } |
99 | | |
100 | | #[derive(Clone)] |
101 | | pub struct LabelMappedWatch<W: Resource, P: Resource> { |
102 | | pub label_key: &'static str, |
103 | | pub _marker: PhantomData<(W, P)>, |
104 | | } |
105 | | |
106 | | impl<W: Resource, P: Resource> LabelMappedWatch<W, P> { |
107 | 72 | async fn enqueue_from_object( |
108 | 72 | label_key: &'static str, |
109 | 72 | sender: &mpsc::Sender<String>, |
110 | 72 | object: &ResourceObject<W>, |
111 | 72 | ) -> Result<(), ResourceError> { |
112 | 72 | if let Some(primary57 ) = object.metadata.labels.get(label_key) { |
113 | 57 | sender |
114 | 57 | .send(primary.clone()) |
115 | 57 | .await |
116 | 57 | .map_err(|_| ResourceError::other0 ("controller queue closed while forwarding secondary event"))?0 ; |
117 | 15 | } |
118 | 72 | Ok(()) |
119 | 72 | } |
120 | | } |
121 | | |
122 | | impl<W: Resource, P: Resource> SecondaryWatch for LabelMappedWatch<W, P> { |
123 | | type Primary = P; |
124 | | |
125 | 39 | fn clone_box(&self) -> Box<dyn SecondaryWatch<Primary = Self::Primary>> { |
126 | 39 | Box::new(Self { label_key: self.label_key, _marker: PhantomData }) |
127 | 39 | } |
128 | | |
129 | 39 | fn spawn( |
130 | 39 | self: Box<Self>, |
131 | 39 | backend: ResourceBackend, |
132 | 39 | namespace: String, |
133 | 39 | sender: mpsc::Sender<String>, |
134 | 39 | ) -> Pin<Box<dyn Future<Output = Result<(), ResourceError>> + Send>> { |
135 | 39 | Box::pin(async move { |
136 | 39 | let resolver = backend.using::<W>(&namespace); |
137 | 39 | let listed = resolver.list().await?0 ; |
138 | 39 | for object11 in &listed.items { |
139 | 11 | Self::enqueue_from_object(self.label_key, &sender, object).await?0 ; |
140 | | } |
141 | | |
142 | 39 | let mut watch = resolver.watch(WatchStart::FromVersion(listed.resource_version)).await?0 ; |
143 | 100 | while let Some(event61 ) = watch.next().await { |
144 | 61 | match event?0 { |
145 | 38 | WatchEvent::Added(object15 ) | WatchEvent::Modified(object) | WatchEvent::Deleted(object8 ) => { |
146 | 61 | Self::enqueue_from_object(self.label_key, &sender, &object).await?0 ; |
147 | | } |
148 | | } |
149 | | } |
150 | 0 | Ok(()) |
151 | 0 | }) |
152 | 39 | } |
153 | | } |
154 | | |
155 | | #[derive(Clone)] |
156 | | pub struct LabelJoinWatch<W: Resource, P: Resource> { |
157 | | pub label_key: &'static str, |
158 | | pub _marker: PhantomData<(W, P)>, |
159 | | } |
160 | | |
161 | | impl<W: Resource, P: Resource> LabelJoinWatch<W, P> { |
162 | 11 | async fn enqueue_matching_primaries( |
163 | 11 | label_key: &'static str, |
164 | 11 | sender: &mpsc::Sender<String>, |
165 | 11 | watched: &ResourceObject<W>, |
166 | 11 | primaries: &TypedResolver<P>, |
167 | 11 | ) -> Result<(), ResourceError> { |
168 | 11 | let Some(value) = watched.metadata.labels.get(label_key) else { |
169 | 0 | return Ok(()); |
170 | | }; |
171 | 11 | let selector = BTreeMap::from([(label_key.to_string(), value.clone())]); |
172 | 11 | let listed = primaries.list_matching_labels(&selector).await?0 ; |
173 | 11 | for object9 in listed.items { |
174 | 9 | sender |
175 | 9 | .send(object.metadata.name) |
176 | 9 | .await |
177 | 9 | .map_err(|_| ResourceError::other0 ("controller queue closed while forwarding joined secondary event"))?0 ; |
178 | | } |
179 | 11 | Ok(()) |
180 | 11 | } |
181 | | } |
182 | | |
183 | | impl<W: Resource, P: Resource> SecondaryWatch for LabelJoinWatch<W, P> { |
184 | | type Primary = P; |
185 | | |
186 | 14 | fn clone_box(&self) -> Box<dyn SecondaryWatch<Primary = Self::Primary>> { |
187 | 14 | Box::new(Self { label_key: self.label_key, _marker: PhantomData }) |
188 | 14 | } |
189 | | |
190 | 14 | fn spawn( |
191 | 14 | self: Box<Self>, |
192 | 14 | backend: ResourceBackend, |
193 | 14 | namespace: String, |
194 | 14 | sender: mpsc::Sender<String>, |
195 | 14 | ) -> Pin<Box<dyn Future<Output = Result<(), ResourceError>> + Send>> { |
196 | 14 | Box::pin(async move { |
197 | 14 | let watched = backend.clone().using::<W>(&namespace); |
198 | 14 | let primaries = backend.using::<P>(&namespace); |
199 | 14 | let listed = watched.list().await?0 ; |
200 | 14 | for object2 in &listed.items { |
201 | 2 | Self::enqueue_matching_primaries(self.label_key, &sender, object, &primaries).await?0 ; |
202 | | } |
203 | | |
204 | 14 | let mut watch = watched.watch(WatchStart::FromVersion(listed.resource_version)).await?0 ; |
205 | 23 | while let Some(event9 ) = watch.next().await { |
206 | 9 | match event?0 { |
207 | 6 | WatchEvent::Added(object3 ) | WatchEvent::Modified(object) | WatchEvent::Deleted(object0 ) => { |
208 | 9 | Self::enqueue_matching_primaries(self.label_key, &sender, &object, &primaries).await?0 ; |
209 | | } |
210 | | } |
211 | | } |
212 | 0 | Ok(()) |
213 | 0 | }) |
214 | 14 | } |
215 | | } |
216 | | |
217 | | enum WatchExited { |
218 | | Primary(Result<(), ResourceError>), |
219 | | Secondary { index: usize, result: Result<(), ResourceError> }, |
220 | | } |
221 | | |
222 | | pub struct ControllerLoop<R: Reconciler> { |
223 | | pub primary: TypedResolver<R::Resource>, |
224 | | pub secondaries: Vec<Box<dyn SecondaryWatch<Primary = R::Resource>>>, |
225 | | pub reconciler: R, |
226 | | pub resync_interval: Duration, |
227 | | pub backend: ResourceBackend, |
228 | | } |
229 | | |
230 | | impl<R: Reconciler> ControllerLoop<R> { |
231 | | const WATCH_RESTART_BACKOFF: Duration = Duration::from_millis(100); |
232 | | |
233 | 31 | async fn apply_actuation(backend: &ResourceBackend, namespace: &str, actuation: Actuation) -> Result<(), ResourceError> { |
234 | 31 | match actuation { |
235 | 0 | Actuation::CreateEnvironment { meta, spec } => { |
236 | 0 | let resolver = backend.using::<crate::Environment>(namespace); |
237 | 0 | Self::create_if_missing(&resolver, meta, spec).await |
238 | | } |
239 | 1 | Actuation::CreateClone { meta, spec } => { |
240 | 1 | let resolver = backend.using::<crate::Clone>(namespace); |
241 | 1 | Self::create_if_missing(&resolver, meta, spec).await |
242 | | } |
243 | 2 | Actuation::CreateCheckout { meta, spec } => { |
244 | 2 | let resolver = backend.using::<crate::Checkout>(namespace); |
245 | 2 | Self::create_if_missing(&resolver, meta, spec).await |
246 | | } |
247 | 2 | Actuation::CreateTerminalSession { meta, spec } => { |
248 | 2 | let resolver = backend.using::<crate::TerminalSession>(namespace); |
249 | 2 | Self::create_if_missing(&resolver, meta, spec).await |
250 | | } |
251 | 3 | Actuation::CreateTaskWorkspace { meta, spec } => { |
252 | 3 | let resolver = backend.using::<crate::TaskWorkspace>(namespace); |
253 | 3 | Self::create_if_missing(&resolver, meta, spec).await |
254 | | } |
255 | 16 | Actuation::CreatePresentation { meta, spec } => { |
256 | 16 | let resolver = backend.using::<crate::Presentation>(namespace); |
257 | 16 | Self::create_if_missing(&resolver, meta, spec).await |
258 | | } |
259 | 4 | Actuation::DeletePresentation { name } => { |
260 | 4 | let resolver = backend.using::<crate::Presentation>(namespace); |
261 | 4 | match resolver.delete(&name).await { |
262 | 4 | Ok(()) | Err(ResourceError::NotFound { .. }) => Ok(()), |
263 | 0 | Err(err) => Err(err), |
264 | | } |
265 | | } |
266 | 3 | Actuation::DeleteTaskWorkspace { name } => { |
267 | 3 | let resolver = backend.using::<crate::TaskWorkspace>(namespace); |
268 | 3 | match resolver.delete(&name).await { |
269 | 3 | Ok(()) | Err(ResourceError::NotFound { .. }) => Ok(()), |
270 | 0 | Err(err) => Err(err), |
271 | | } |
272 | | } |
273 | | } |
274 | 31 | } |
275 | | |
276 | 24 | async fn create_if_missing<T: Resource>(resolver: &TypedResolver<T>, meta: InputMeta, spec: T::Spec) -> Result<(), ResourceError> { |
277 | 24 | match resolver.create(&meta, &spec).await { |
278 | 24 | Ok(_) | Err(ResourceError::Conflict { .. }) => Ok(()), |
279 | 0 | Err(err) => Err(err), |
280 | | } |
281 | 24 | } |
282 | | |
283 | 60 | fn spawn_primary_watch( |
284 | 60 | primary: TypedResolver<R::Resource>, |
285 | 60 | sender: mpsc::Sender<String>, |
286 | 60 | watch_exited: mpsc::UnboundedSender<WatchExited>, |
287 | 60 | restart_backoff: Option<Duration>, |
288 | 60 | ) -> JoinHandle<()> { |
289 | 60 | tokio::spawn(async move { |
290 | 60 | if let Some(backoff0 ) = restart_backoff { |
291 | 0 | tokio::time::sleep(backoff).await; |
292 | 60 | } |
293 | 60 | let result0 = async { |
294 | 60 | let listed = primary.list().await?0 ; |
295 | 60 | for object34 in &listed.items { |
296 | 34 | sender |
297 | 34 | .send(object.metadata.name.clone()) |
298 | 34 | .await |
299 | 34 | .map_err(|_| ResourceError::other0 ("controller queue closed while forwarding initial primary list"))?0 ; |
300 | | } |
301 | | |
302 | 60 | let mut watch = primary.watch(WatchStart::FromVersion(listed.resource_version)).await?0 ; |
303 | 327 | while let Some(event267 ) = watch.next().await { |
304 | 267 | match event?0 { |
305 | 252 | WatchEvent::Added(object9 ) | WatchEvent::Modified(object) | WatchEvent::Deleted(object6 ) => { |
306 | 267 | sender |
307 | 267 | .send(object.metadata.name) |
308 | 267 | .await |
309 | 267 | .map_err(|_| ResourceError::other0 ("controller queue closed while forwarding primary watch event"))?0 ; |
310 | | } |
311 | | } |
312 | | } |
313 | 0 | Ok(()) |
314 | 0 | } |
315 | 60 | .await; |
316 | | |
317 | 0 | let _ = watch_exited.send(WatchExited::Primary(result)); |
318 | 0 | }) |
319 | 60 | } |
320 | | |
321 | 56 | fn spawn_secondary_watch( |
322 | 56 | index: usize, |
323 | 56 | watch: Box<dyn SecondaryWatch<Primary = R::Resource>>, |
324 | 56 | backend: ResourceBackend, |
325 | 56 | namespace: String, |
326 | 56 | sender: mpsc::Sender<String>, |
327 | 56 | watch_exited: mpsc::UnboundedSender<WatchExited>, |
328 | 56 | restart_backoff: Option<Duration>, |
329 | 56 | ) -> JoinHandle<()> { |
330 | 56 | tokio::spawn(async move { |
331 | 56 | if let Some(backoff2 ) = restart_backoff { |
332 | 2 | tokio::time::sleep(backoff).await; |
333 | 54 | } |
334 | 55 | let result2 = watch.spawn(backend, namespace, sender).await; |
335 | 2 | let _ = watch_exited.send(WatchExited::Secondary { index, result }); |
336 | 2 | }) |
337 | 56 | } |
338 | | |
339 | 23 | async fn resync_all(primary: &TypedResolver<R::Resource>, sender: &mpsc::Sender<String>) -> Result<(), ResourceError> { |
340 | 23 | let listed = primary.list().await?0 ; |
341 | 23 | for object19 in listed.items { |
342 | 19 | sender |
343 | 19 | .send(object.metadata.name) |
344 | 19 | .await |
345 | 19 | .map_err(|_| ResourceError::other0 ("controller queue closed while forwarding resync item"))?0 ; |
346 | | } |
347 | 23 | Ok(()) |
348 | 23 | } |
349 | | |
350 | 60 | pub async fn run(self) -> Result<(), ResourceError> |
351 | 60 | where |
352 | 60 | <R::Resource as Resource>::Status: Default, |
353 | 60 | { |
354 | 60 | let ControllerLoop { primary, secondaries, reconciler, resync_interval, backend } = self; |
355 | 60 | let (sender, mut receiver) = mpsc::channel::<String>(128); |
356 | 60 | let (watch_exited_tx, mut watch_exited_rx) = mpsc::unbounded_channel(); |
357 | 60 | let _primary_watch = Self::spawn_primary_watch(primary.clone(), sender.clone(), watch_exited_tx.clone(), None); |
358 | 60 | let secondary_templates = secondaries; |
359 | 60 | let _secondary_watches: Vec<JoinHandle<()>> = secondary_templates |
360 | 60 | .iter() |
361 | 60 | .enumerate() |
362 | 60 | .map(|(index, watch)| {54 |
363 | 54 | Self::spawn_secondary_watch( |
364 | 54 | index, |
365 | 54 | watch.clone(), |
366 | 54 | backend.clone(), |
367 | 54 | primary.namespace.clone(), |
368 | 54 | sender.clone(), |
369 | 54 | watch_exited_tx.clone(), |
370 | 54 | None, |
371 | | ) |
372 | 54 | }) |
373 | 60 | .collect(); |
374 | 60 | let mut resync = tokio::time::interval(resync_interval); |
375 | 60 | resync.tick().await; |
376 | 53 | let mut pending: VecDeque<String> = VecDeque::new(); |
377 | | |
378 | | loop { |
379 | 747 | if let Some(name337 ) = pending.pop_front() { |
380 | 337 | let object330 = match primary.get(&name).await { |
381 | 330 | Ok(object) => object, |
382 | 7 | Err(ResourceError::NotFound { .. }) => continue, |
383 | 0 | Err(err) => return Err(err), |
384 | | }; |
385 | 330 | if let Some(finalizer_name317 ) = reconciler.finalizer_name() { |
386 | 317 | if object.metadata.deletion_timestamp.is_none() |
387 | 313 | && object.metadata.finalizers.iter().all(|finalizer| finalizer282 != finalizer_name282 ) |
388 | | { |
389 | 31 | let meta = InputMeta::from(&object.metadata).with_added_finalizer(finalizer_name); |
390 | | // A racing writer may win between get() and update(); rely on the resulting |
391 | | // watch event to requeue the object and retry finalizer attachment. |
392 | 31 | primary.update(&meta, &object.metadata.resource_version, &object.spec).await?0 ; |
393 | 31 | continue; |
394 | 286 | } |
395 | 286 | if object.metadata.deletion_timestamp.is_some() |
396 | 4 | && object.metadata.finalizers.iter().any(|finalizer| finalizer == finalizer_name) |
397 | | { |
398 | 4 | reconciler.run_finalizer(&object).await?0 ; |
399 | 4 | let meta = InputMeta::from(&object.metadata).without_finalizer(finalizer_name); |
400 | 4 | primary.update(&meta, &object.metadata.resource_version, &object.spec).await?0 ; |
401 | 4 | continue; |
402 | 282 | } |
403 | 13 | } |
404 | 295 | if object.metadata.deletion_timestamp.is_some() { |
405 | 0 | continue; |
406 | 295 | } |
407 | 295 | let deps = reconciler.fetch_dependencies(&object).await?0 ; |
408 | 295 | let outcome = reconciler.reconcile(&object, &deps, Utc::now()); |
409 | 295 | for actuation31 in outcome.actuations { |
410 | 31 | Self::apply_actuation(&primary.backend, &primary.namespace, actuation).await?0 ; |
411 | | } |
412 | 295 | if let Some(patch212 ) = outcome.patch { |
413 | 212 | apply_status_patch(&primary, &name, &patch).await?0 ; |
414 | 83 | } |
415 | 295 | continue; |
416 | 410 | } |
417 | | |
418 | 410 | tokio::select! { |
419 | 410 | maybe_name332 = receiver.recv() => { |
420 | 332 | let Some(name) = maybe_name else { |
421 | 0 | return Ok(()); |
422 | | }; |
423 | 383 | while let Ok(next51 ) = receiver.try_recv() { |
424 | 51 | if next != name { |
425 | 5 | pending.push_back(next); |
426 | 46 | } |
427 | | } |
428 | 332 | pending.push_front(name); |
429 | | } |
430 | 410 | _ = resync.tick() => { |
431 | 23 | Self::resync_all(&primary, &sender).await?0 ; |
432 | | } |
433 | 410 | Some(exited2 ) = watch_exited_rx.recv() => { |
434 | 2 | match exited { |
435 | 0 | WatchExited::Primary(result) => { |
436 | 0 | result?; |
437 | 0 | let _respawn = Self::spawn_primary_watch( |
438 | 0 | primary.clone(), |
439 | 0 | sender.clone(), |
440 | 0 | watch_exited_tx.clone(), |
441 | 0 | Some(Self::WATCH_RESTART_BACKOFF), |
442 | | ); |
443 | | } |
444 | 2 | WatchExited::Secondary { index, result } => { |
445 | 2 | result?0 ; |
446 | 2 | let _respawn = Self::spawn_secondary_watch( |
447 | 2 | index, |
448 | 2 | secondary_templates[index].clone(), |
449 | 2 | backend.clone(), |
450 | 2 | primary.namespace.clone(), |
451 | 2 | sender.clone(), |
452 | 2 | watch_exited_tx.clone(), |
453 | 2 | Some(Self::WATCH_RESTART_BACKOFF), |
454 | | ); |
455 | | } |
456 | | } |
457 | | } |
458 | | } |
459 | | } |
460 | 0 | } |
461 | | } |
462 | | |
463 | | /// List every resource matching `selector` and delete it, swallowing `NotFound` from |
464 | | /// races with concurrent deletes. Used by cascading-teardown reconcilers (TaskWorkspace, |
465 | | /// Convoy) to garbage-collect their owned children before the finalizer is removed. |
466 | 11 | pub async fn delete_matching<T: Resource>(resolver: &TypedResolver<T>, selector: &BTreeMap<String, String>) -> Result<(), ResourceError> { |
467 | 11 | let listed = resolver.list_matching_labels(selector).await?0 ; |
468 | 11 | for object9 in listed.items { |
469 | 9 | match resolver.delete(&object.metadata.name).await { |
470 | 9 | Ok(()) | Err(ResourceError::NotFound { .. }) => {} |
471 | 0 | Err(err) => return Err(err), |
472 | | } |
473 | | } |
474 | 11 | Ok(()) |
475 | 11 | } |