crates/flotilla-tui/src/cli.rs
Line | Count | Source |
1 | | use std::{collections::HashMap, path::Path}; |
2 | | |
3 | | use comfy_table::{presets::UTF8_FULL_CONDENSED, Cell, Table}; |
4 | | use flotilla_core::daemon::DaemonHandle; |
5 | | use flotilla_protocol::{ |
6 | | output::OutputFormat, Command, CommandValue, DaemonEvent, EnvironmentInfo, EnvironmentStatus, HostProvidersResponse, |
7 | | HostStatusResponse, PeerConnectionState, RepoDetailResponse, RepoProvidersResponse, RepoWorkResponse, StatusResponse, StreamKey, |
8 | | TopologyResponse, |
9 | | }; |
10 | | |
11 | | use crate::socket::SocketDaemon; |
12 | | |
13 | 6 | fn format_work_items_table(items: &[flotilla_protocol::snapshot::WorkItem]) -> Table { |
14 | 6 | let mut table = Table::new(); |
15 | 6 | table.load_preset(UTF8_FULL_CONDENSED); |
16 | 6 | table.set_header(vec!["Kind", "Branch", "Description", "PR", "Session", "Issues"]); |
17 | 7 | for item in items6 { |
18 | 7 | table.add_row(vec![ |
19 | 7 | Cell::new(format!("{:?}", item.kind)), |
20 | 7 | Cell::new(item.branch.as_deref().unwrap_or("-")), |
21 | 7 | Cell::new(&item.description), |
22 | 7 | Cell::new(item.change_request_key.as_deref().unwrap_or("-")), |
23 | 7 | Cell::new(item.session_key.as_deref().unwrap_or("-")), |
24 | 7 | Cell::new(if item.issue_keys.is_empty() { "-"6 .into6 () } else { item.issue_keys1 .join(", ") }), |
25 | | ]); |
26 | | } |
27 | 6 | table |
28 | 6 | } |
29 | | |
30 | 2 | fn format_status_response_human(status: &StatusResponse) -> String { |
31 | 2 | if status.repos.is_empty() { |
32 | 1 | return "No repos tracked.\n".into(); |
33 | 1 | } |
34 | 1 | let mut table = Table::new(); |
35 | 1 | table.load_preset(UTF8_FULL_CONDENSED); |
36 | 1 | table.set_header(vec!["Repo", "Path", "Work Items", "Errors", "Health"]); |
37 | 1 | for repo in &status.repos { |
38 | 1 | let name = repo.path.file_name().map(|n| n.to_string_lossy().to_string()).unwrap_or_default(); |
39 | 1 | let mut health: Vec<String> = repo |
40 | 1 | .provider_health |
41 | 1 | .iter() |
42 | 1 | .flat_map(|(cat, providers)| { |
43 | 1 | providers.iter().map(move |(name, ok)| format!("{cat}/{name}: {}", if *ok { "ok" } else { "error"0 })) |
44 | 1 | }) |
45 | 1 | .collect(); |
46 | 1 | health.sort(); |
47 | 1 | let health_str = if health.is_empty() { "-"0 .into0 () } else { health.join(", ") }; |
48 | 1 | table.add_row(vec![ |
49 | 1 | Cell::new(&name), |
50 | 1 | Cell::new(repo.path.display()), |
51 | 1 | Cell::new(repo.work_item_count), |
52 | 1 | Cell::new(repo.error_count), |
53 | 1 | Cell::new(&health_str), |
54 | | ]); |
55 | | } |
56 | 1 | format!("{table}\n") |
57 | 2 | } |
58 | | |
59 | 4 | fn format_connection_status(status: &PeerConnectionState) -> &'static str { |
60 | 4 | match status { |
61 | 3 | PeerConnectionState::Connected => "connected", |
62 | 1 | PeerConnectionState::Disconnected => "disconnected", |
63 | 0 | PeerConnectionState::Connecting => "connecting", |
64 | 0 | PeerConnectionState::Reconnecting => "reconnecting", |
65 | 0 | PeerConnectionState::Rejected { .. } => "rejected", |
66 | | } |
67 | 4 | } |
68 | | |
69 | 1 | fn inventory_is_empty(inventory: &flotilla_protocol::ToolInventory) -> bool { |
70 | 1 | inventory.binaries.is_empty() && inventory.sockets.is_empty() && inventory.auth.is_empty() && inventory.env_vars.is_empty() |
71 | 1 | } |
72 | | |
73 | 4 | fn environment_status_label(status: &EnvironmentStatus) -> String { |
74 | 4 | match status { |
75 | 0 | EnvironmentStatus::Building => "building".to_string(), |
76 | 0 | EnvironmentStatus::Starting => "starting".to_string(), |
77 | 4 | EnvironmentStatus::Running => "running".to_string(), |
78 | 0 | EnvironmentStatus::Stopped => "stopped".to_string(), |
79 | 0 | EnvironmentStatus::Failed(message) => format!("failed: {message}"), |
80 | | } |
81 | 4 | } |
82 | | |
83 | 2 | fn format_visible_environments_human(environments: &[EnvironmentInfo]) -> String { |
84 | 2 | if environments.is_empty() { |
85 | 0 | return String::new(); |
86 | 2 | } |
87 | | |
88 | 2 | let mut table = Table::new(); |
89 | 2 | table.load_preset(UTF8_FULL_CONDENSED); |
90 | 2 | table.set_header(vec!["Kind", "Id", "Display Name", "Status", "Image"]); |
91 | 4 | for environment in environments2 { |
92 | 4 | match environment { |
93 | 2 | EnvironmentInfo::Direct { id, display_name, status, .. } => { |
94 | 2 | table.add_row(vec![ |
95 | 2 | Cell::new("direct"), |
96 | 2 | Cell::new(id.as_str()), |
97 | 2 | Cell::new(display_name.as_deref().unwrap_or("-")), |
98 | 2 | Cell::new(environment_status_label(status)), |
99 | 2 | Cell::new("-"), |
100 | 2 | ]); |
101 | 2 | } |
102 | 2 | EnvironmentInfo::Provisioned { id, display_name, image, status } => { |
103 | 2 | table.add_row(vec![ |
104 | 2 | Cell::new("provisioned"), |
105 | 2 | Cell::new(id.as_str()), |
106 | 2 | Cell::new(display_name.as_deref().unwrap_or("-")), |
107 | 2 | Cell::new(environment_status_label(status)), |
108 | 2 | Cell::new(image.as_str()), |
109 | 2 | ]); |
110 | 2 | } |
111 | | } |
112 | | } |
113 | 2 | format!("Visible Environments:\n{table}\n") |
114 | 2 | } |
115 | | |
116 | 2 | fn format_host_list_human(response: &flotilla_protocol::HostListResponse) -> String { |
117 | 2 | if response.hosts.is_empty() { |
118 | 1 | return "No hosts known.\n".into(); |
119 | 1 | } |
120 | | |
121 | 1 | let mut table = Table::new(); |
122 | 1 | table.load_preset(UTF8_FULL_CONDENSED); |
123 | 1 | table.set_header(vec!["Host", "Local", "Configured", "Status", "Summary", "Repos", "Work"]); |
124 | 2 | for host in &response.hosts1 { |
125 | 2 | table.add_row(vec![ |
126 | 2 | Cell::new(host.host.as_str()), |
127 | 2 | Cell::new(if host.is_local { "yes"1 } else { "no"1 }), |
128 | 2 | Cell::new(if host.configured { "yes"1 } else { "no"1 }), |
129 | 2 | Cell::new(format_connection_status(&host.connection_status)), |
130 | 2 | Cell::new(if host.has_summary { "yes"1 } else { "no"1 }), |
131 | 2 | Cell::new(host.repo_count), |
132 | 2 | Cell::new(host.work_item_count), |
133 | | ]); |
134 | | } |
135 | 1 | format!("{table}\n") |
136 | 2 | } |
137 | | |
138 | 1 | fn format_host_status_human(response: &HostStatusResponse) -> String { |
139 | 1 | let mut out = String::new(); |
140 | 1 | out.push_str(&format!("Host: {}\n", response.host)); |
141 | 1 | out.push_str(&format!("Status: {}\n", format_connection_status(&response.connection_status))); |
142 | 1 | out.push_str(&format!("Configured: {}\n", if response.configured { "yes"0 } else { "no" })); |
143 | 1 | out.push_str(&format!("Repositories: {}\n", response.repo_count)); |
144 | 1 | out.push_str(&format!("Work Items: {}\n", response.work_item_count)); |
145 | | |
146 | 1 | if let Some(summary) = &response.summary { |
147 | 1 | out.push_str("\nSystem:\n"); |
148 | 1 | if let Some(os) = &summary.system.os { |
149 | 1 | out.push_str(&format!(" OS: {os}\n")); |
150 | 1 | }0 |
151 | 1 | if let Some(arch) = &summary.system.arch { |
152 | 1 | out.push_str(&format!(" Arch: {arch}\n")); |
153 | 1 | }0 |
154 | 1 | if let Some(cpus) = summary.system.cpu_count { |
155 | 1 | out.push_str(&format!(" CPUs: {cpus}\n")); |
156 | 1 | }0 |
157 | 1 | if let Some(memory) = summary.system.memory_total_mb { |
158 | 1 | out.push_str(&format!(" Memory: {} MB\n", memory)); |
159 | 1 | }0 |
160 | 0 | } |
161 | | |
162 | 1 | out.push_str(&format_visible_environments_human(&response.visible_environments)); |
163 | | |
164 | 1 | out |
165 | 1 | } |
166 | | |
167 | 1 | fn format_host_providers_human(response: &HostProvidersResponse) -> String { |
168 | 1 | let mut out = String::new(); |
169 | 1 | out.push_str(&format!("Host: {}\n", response.host)); |
170 | 1 | out.push_str(&format!("Status: {}\n", format_connection_status(&response.connection_status))); |
171 | 1 | out.push_str(&format!("Configured: {}\n", if response.configured { "yes"0 } else { "no" })); |
172 | | |
173 | 1 | out.push_str("\nInventory:\n"); |
174 | 1 | if inventory_is_empty(&response.summary.inventory) { |
175 | 1 | out.push_str(" No inventory facts.\n"); |
176 | 1 | } else { |
177 | 0 | for fact in &response.summary.inventory.binaries { |
178 | 0 | out.push_str(&format!(" binary: {}\n", fact.name)); |
179 | 0 | } |
180 | 0 | for fact in &response.summary.inventory.sockets { |
181 | 0 | out.push_str(&format!(" socket: {}\n", fact.name)); |
182 | 0 | } |
183 | 0 | for fact in &response.summary.inventory.auth { |
184 | 0 | out.push_str(&format!(" auth: {}\n", fact.name)); |
185 | 0 | } |
186 | 0 | for fact in &response.summary.inventory.env_vars { |
187 | 0 | out.push_str(&format!(" env: {}\n", fact.name)); |
188 | 0 | } |
189 | | } |
190 | | |
191 | 1 | out.push_str("\nProviders:\n"); |
192 | 1 | let mut table = Table::new(); |
193 | 1 | table.load_preset(UTF8_FULL_CONDENSED); |
194 | 1 | table.set_header(vec!["Category", "Name", "Health"]); |
195 | 1 | for provider in &response.summary.providers { |
196 | 1 | table.add_row(vec![ |
197 | 1 | Cell::new(&provider.category), |
198 | 1 | Cell::new(&provider.name), |
199 | 1 | Cell::new(if provider.healthy { "ok" } else { "error"0 }), |
200 | | ]); |
201 | | } |
202 | 1 | out.push_str(&table.to_string()); |
203 | 1 | out.push('\n'); |
204 | 1 | out.push_str(&format_visible_environments_human(&response.visible_environments)); |
205 | 1 | out |
206 | 1 | } |
207 | | |
208 | 1 | fn format_topology_human(response: &TopologyResponse) -> String { |
209 | 1 | let mut out = String::new(); |
210 | 1 | out.push_str(&format!("Local Host: {}\n", response.local_host)); |
211 | 1 | if response.routes.is_empty() { |
212 | 0 | out.push_str("No routes.\n"); |
213 | 0 | return out; |
214 | 1 | } |
215 | | |
216 | 1 | let mut table = Table::new(); |
217 | 1 | table.load_preset(UTF8_FULL_CONDENSED); |
218 | 1 | table.set_header(vec!["Target", "Via", "Direct", "Connected", "Fallbacks"]); |
219 | 1 | for route in &response.routes { |
220 | 1 | let fallbacks = if route.fallbacks.is_empty() { |
221 | 0 | "-".to_string() |
222 | | } else { |
223 | 1 | route.fallbacks.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ") |
224 | | }; |
225 | 1 | table.add_row(vec![ |
226 | 1 | Cell::new(route.target.as_str()), |
227 | 1 | Cell::new(route.next_hop.as_str()), |
228 | 1 | Cell::new(if route.direct { "yes"0 } else { "no" }), |
229 | 1 | Cell::new(if route.connected { "yes" } else { "no"0 }), |
230 | 1 | Cell::new(fallbacks), |
231 | | ]); |
232 | | } |
233 | 1 | out.push_str(&table.to_string()); |
234 | 1 | out.push('\n'); |
235 | 1 | out |
236 | 1 | } |
237 | | |
238 | | /// Extract a short display name from a repo path (last path component). |
239 | | /// Falls back to the full path display for root or non-UTF-8 paths, |
240 | | /// matching `flotilla_core::model::repo_name`. |
241 | 11 | fn repo_name(path: &std::path::Path) -> String { |
242 | 11 | path.file_name().map(|n| n.to_string_lossy()10 .to_string10 ()).unwrap_or_else(|| path.to_string_lossy()1 .to_string1 ()) |
243 | 11 | } |
244 | | |
245 | 8 | fn repo_label(path: Option<&std::path::Path>, identity: &flotilla_protocol::RepoIdentity) -> String { |
246 | 8 | path.map(repo_name).unwrap_or_else(|| identity.path0 .clone0 ()) |
247 | 8 | } |
248 | | |
249 | | /// Format a `CommandValue` as a short human-readable string. |
250 | 20 | fn format_command_result(result: &flotilla_protocol::commands::CommandValue) -> String { |
251 | | use flotilla_protocol::commands::CommandValue; |
252 | 20 | match result { |
253 | 3 | CommandValue::Ok => "ok".to_string(), |
254 | 2 | CommandValue::RepoTracked { path, resolved_from } => match resolved_from { |
255 | 1 | Some(original) => format!("repo tracked: {} (resolved from {})", path.display(), original.display()), |
256 | 1 | None => format!("repo tracked: {}", path.display()), |
257 | | }, |
258 | 1 | CommandValue::RepoUntracked { path } => format!("repo untracked: {}", path.display()), |
259 | 2 | CommandValue::Refreshed { repos } => format!("refreshed {} repo(s)", repos.len()), |
260 | 1 | CommandValue::CheckoutCreated { branch, .. } => format!("checkout created: {branch}"), |
261 | 1 | CommandValue::CheckoutRemoved { branch } => format!("checkout removed: {branch}"), |
262 | 0 | CommandValue::TerminalPrepared { branch, target_host, .. } => format!("terminal prepared: {branch} on {target_host}"), |
263 | 1 | CommandValue::BranchNameGenerated { name, .. } => format!("branch name: {name}"), |
264 | 3 | CommandValue::CheckoutStatus(status) => { |
265 | 3 | let mut parts = vec![format!("checkout status: {}", status.branch)]; |
266 | 3 | if let Some(cr2 ) = &status.change_request_status { |
267 | 2 | parts.push(format!("PR: {cr}")); |
268 | 2 | }1 |
269 | 3 | if let Some(sha1 ) = &status.merge_commit_sha { |
270 | 1 | parts.push(format!("merged via {}", &sha[..sha.len().min(7)])); |
271 | 2 | } |
272 | 3 | if !status.unpushed_commits.is_empty() { |
273 | 1 | parts.push(format!("{} unpushed", status.unpushed_commits.len())); |
274 | 2 | } |
275 | 3 | if status.has_uncommitted { |
276 | 1 | parts.push("uncommitted changes".to_string()); |
277 | 2 | } |
278 | 3 | if let Some(warning0 ) = &status.base_detection_warning { |
279 | 0 | parts.push(format!("warning: {warning}")); |
280 | 3 | } |
281 | 3 | parts.join(", ") |
282 | | } |
283 | 3 | CommandValue::Error { message } => format!("error: {message}"), |
284 | 1 | CommandValue::Cancelled => "cancelled".to_string(), |
285 | | CommandValue::PreparedWorkspace(_) | CommandValue::AttachCommandResolved { .. } | CommandValue::CheckoutPathResolved { .. } => { |
286 | 1 | "internal step result".to_string() |
287 | | } |
288 | 0 | CommandValue::RepoDetail(detail) => format_repo_detail_human(detail), |
289 | 0 | CommandValue::RepoProviders(providers) => format_repo_providers_human(providers), |
290 | 0 | CommandValue::RepoWork(work) => format_repo_work_human(work), |
291 | 1 | CommandValue::HostList(hosts) => format_host_list_human(hosts), |
292 | 0 | CommandValue::HostStatus(status) => format_host_status_human(status), |
293 | 0 | CommandValue::HostProviders(providers) => format_host_providers_human(providers), |
294 | 0 | CommandValue::ImageEnsured { image } => format!("image ensured: {image}"), |
295 | 0 | CommandValue::EnvironmentCreated { env_id } => format!("environment created: {env_id}"), |
296 | 0 | CommandValue::EnvironmentSpecRead { .. } => "environment spec read".to_string(), |
297 | 0 | CommandValue::IssuePage(page) => format!("issue page: {} items, has_more={}", page.items.len(), page.has_more), |
298 | 0 | CommandValue::IssuesByIds { items } => format!("issues by ids: {} items", items.len()), |
299 | | } |
300 | 20 | } |
301 | | |
302 | 17 | pub(crate) fn format_event_human(event: &flotilla_protocol::DaemonEvent) -> String { |
303 | | use flotilla_protocol::{DaemonEvent, PeerConnectionState}; |
304 | 17 | match event { |
305 | 1 | DaemonEvent::RepoSnapshot(snap) => { |
306 | 1 | format!( |
307 | | "[snapshot] {}: full snapshot (seq {}, {} work items)", |
308 | 1 | repo_label(snap.repo.as_deref(), &snap.repo_identity), |
309 | | snap.seq, |
310 | 1 | snap.work_items.len() |
311 | | ) |
312 | | } |
313 | 1 | DaemonEvent::RepoDelta(delta) => { |
314 | 1 | format!( |
315 | | "[delta] {}: delta seq {}\u{2192}{} ({} changes)", |
316 | 1 | repo_label(delta.repo.as_deref(), &delta.repo_identity), |
317 | | delta.prev_seq, |
318 | | delta.seq, |
319 | 1 | delta.changes.len() |
320 | | ) |
321 | | } |
322 | 1 | DaemonEvent::RepoTracked(info) => { |
323 | 1 | format!("[repo] {}: tracked", info.name) |
324 | | } |
325 | 1 | DaemonEvent::RepoUntracked { repo_identity, path } => { |
326 | 1 | format!("[repo] {}: untracked", repo_label(path.as_deref(), repo_identity)) |
327 | | } |
328 | 3 | DaemonEvent::CommandStarted { repo_identity, repo, description, .. } => { |
329 | 3 | if repo.is_none() && repo_identity.authority1 .is_empty1 () && repo_identity.path1 .is_empty1 () { |
330 | | // Query commands have no repo context — show description only |
331 | 1 | format!("[query] {description}") |
332 | | } else { |
333 | 2 | format!("[command] {}: started \"{}\"", repo_label(repo.as_deref(), repo_identity), description) |
334 | | } |
335 | | } |
336 | 5 | DaemonEvent::CommandFinished { repo_identity, repo, result, .. } => { |
337 | 5 | if repo.is_none() && repo_identity.authority2 .is_empty2 () && repo_identity.path2 .is_empty2 () { |
338 | | // Query commands have no repo context — show result directly |
339 | 2 | format_command_result(result) |
340 | | } else { |
341 | 3 | format!("[command] {}: finished \u{2192} {}", repo_label(repo.as_deref(), repo_identity), format_command_result(result)) |
342 | | } |
343 | | } |
344 | 0 | DaemonEvent::CommandStepUpdate { repo_identity, repo, description, step_index, step_count, .. } => { |
345 | 0 | format!("[step] {}: {} ({}/{})", repo_label(repo.as_deref(), repo_identity), description, step_index + 1, step_count) |
346 | | } |
347 | 5 | DaemonEvent::PeerStatusChanged { host, status } => { |
348 | 5 | let state = match status { |
349 | 1 | PeerConnectionState::Connected => "connected".to_string(), |
350 | 1 | PeerConnectionState::Disconnected => "disconnected".to_string(), |
351 | 1 | PeerConnectionState::Connecting => "connecting".to_string(), |
352 | 1 | PeerConnectionState::Reconnecting => "reconnecting".to_string(), |
353 | 1 | PeerConnectionState::Rejected { reason } => format!("rejected: {reason}"), |
354 | | }; |
355 | 5 | format!("[peer] {host}: {state}") |
356 | | } |
357 | 0 | DaemonEvent::HostSnapshot(snap) => { |
358 | 0 | let state = match &snap.connection_status { |
359 | 0 | PeerConnectionState::Connected => "connected", |
360 | 0 | PeerConnectionState::Disconnected => "disconnected", |
361 | 0 | PeerConnectionState::Connecting => "connecting", |
362 | 0 | PeerConnectionState::Reconnecting => "reconnecting", |
363 | 0 | PeerConnectionState::Rejected { .. } => "rejected", |
364 | | }; |
365 | 0 | format!("[host] {}: {} (seq {})", snap.host_name, state, snap.seq) |
366 | | } |
367 | 0 | DaemonEvent::HostRemoved { host, seq } => { |
368 | 0 | format!("[host] {host}: removed (seq {seq})") |
369 | | } |
370 | | } |
371 | 17 | } |
372 | | |
373 | | /// Extract the (stream_key, seq) from a snapshot/delta event, if present. |
374 | 0 | fn event_stream_seq(event: &DaemonEvent) -> Option<(StreamKey, u64)> { |
375 | 0 | match event { |
376 | 0 | DaemonEvent::RepoSnapshot(snap) => Some((StreamKey::Repo { identity: snap.repo_identity.clone() }, snap.seq)), |
377 | 0 | DaemonEvent::RepoDelta(delta) => Some((StreamKey::Repo { identity: delta.repo_identity.clone() }, delta.seq)), |
378 | 0 | DaemonEvent::HostSnapshot(snap) => Some((StreamKey::Host { host_name: snap.host_name.clone() }, snap.seq)), |
379 | 0 | DaemonEvent::HostRemoved { host, seq } => Some((StreamKey::Host { host_name: host.clone() }, *seq)), |
380 | 0 | _ => None, |
381 | | } |
382 | 0 | } |
383 | | |
384 | 0 | pub async fn run_status(socket_path: &Path, format: OutputFormat) -> Result<(), String> { |
385 | 0 | let daemon = SocketDaemon::connect(socket_path).await.map_err(|e| format!("cannot connect to daemon: {e}"))?; |
386 | 0 | let status = daemon.get_status().await?; |
387 | 0 | let output = match format { |
388 | 0 | OutputFormat::Human => format_status_response_human(&status), |
389 | 0 | OutputFormat::Json => flotilla_protocol::output::json_pretty(&status), |
390 | | }; |
391 | 0 | print!("{output}"); |
392 | 0 | Ok(()) |
393 | 0 | } |
394 | | |
395 | 0 | pub async fn run_topology(daemon: &dyn DaemonHandle, format: OutputFormat) -> Result<(), String> { |
396 | 0 | let topology = daemon.get_topology().await?; |
397 | 0 | let output = match format { |
398 | 0 | OutputFormat::Human => format_topology_human(&topology), |
399 | 0 | OutputFormat::Json => flotilla_protocol::output::json_pretty(&topology), |
400 | | }; |
401 | 0 | print!("{output}"); |
402 | 0 | Ok(()) |
403 | 0 | } |
404 | | |
405 | 4 | fn format_repo_detail_human(detail: &RepoDetailResponse) -> String { |
406 | 4 | let mut out = String::new(); |
407 | 4 | out.push_str(&format!("Repo: {}\n", detail.path.display())); |
408 | 4 | if let Some(slug1 ) = &detail.slug { |
409 | 1 | out.push_str(&format!("Slug: {slug}\n")); |
410 | 3 | } |
411 | 4 | out.push('\n'); |
412 | | |
413 | 4 | if !detail.work_items.is_empty() { |
414 | 1 | let table = format_work_items_table(&detail.work_items); |
415 | 1 | out.push_str(&table.to_string()); |
416 | 1 | out.push('\n'); |
417 | 3 | } |
418 | | |
419 | 4 | if !detail.errors.is_empty() { |
420 | 1 | out.push_str("\nErrors:\n"); |
421 | 1 | for err in &detail.errors { |
422 | 1 | out.push_str(&format!(" [{}/{}] {}\n", err.category, err.provider, err.message)); |
423 | 1 | } |
424 | 3 | } |
425 | 4 | out |
426 | 4 | } |
427 | | |
428 | 6 | fn format_repo_providers_human(resp: &RepoProvidersResponse) -> String { |
429 | 6 | let mut out = String::new(); |
430 | 6 | out.push_str(&format!("Repo: {}\n", resp.path.display())); |
431 | 6 | if let Some(slug1 ) = &resp.slug { |
432 | 1 | out.push_str(&format!("Slug: {slug}\n")); |
433 | 5 | } |
434 | | |
435 | 6 | if !resp.host_discovery.is_empty() { |
436 | 1 | out.push_str("\nHost Discovery:\n"); |
437 | 1 | for entry in &resp.host_discovery { |
438 | 1 | let mut details: Vec<String> = entry.detail.iter().map(|(k, v)| format!("{k}={v}")).collect(); |
439 | 1 | details.sort(); |
440 | 1 | out.push_str(&format!(" {} ({})\n", entry.kind, details.join(", "))); |
441 | | } |
442 | 5 | } |
443 | | |
444 | 6 | if !resp.repo_discovery.is_empty() { |
445 | 1 | out.push_str("\nRepo Discovery:\n"); |
446 | 1 | for entry in &resp.repo_discovery { |
447 | 1 | let mut details: Vec<String> = entry.detail.iter().map(|(k, v)| format!("{k}={v}")).collect(); |
448 | 1 | details.sort(); |
449 | 1 | out.push_str(&format!(" {} ({})\n", entry.kind, details.join(", "))); |
450 | | } |
451 | 5 | } |
452 | | |
453 | 6 | if !resp.providers.is_empty() { |
454 | 1 | out.push_str("\nProviders:\n"); |
455 | 1 | let mut table = Table::new(); |
456 | 1 | table.load_preset(UTF8_FULL_CONDENSED); |
457 | 1 | table.set_header(vec!["Category", "Name", "Health"]); |
458 | 2 | for p in &resp.providers1 { |
459 | 2 | table.add_row(vec![Cell::new(&p.category), Cell::new(&p.name), Cell::new(if p.healthy { "ok"1 } else { "error"1 })]); |
460 | | } |
461 | 1 | out.push_str(&table.to_string()); |
462 | 1 | out.push('\n'); |
463 | 5 | } |
464 | | |
465 | 6 | if !resp.unmet_requirements.is_empty() { |
466 | 1 | out.push_str("\nUnmet Requirements:\n"); |
467 | 2 | for ur in &resp.unmet_requirements1 { |
468 | 2 | match &ur.value { |
469 | 1 | Some(value) => out.push_str(&format!(" {}: {} ({value})\n", ur.factory, ur.kind)), |
470 | 1 | None => out.push_str(&format!(" {}: {}\n", ur.factory, ur.kind)), |
471 | | } |
472 | | } |
473 | 5 | } |
474 | 6 | out |
475 | 6 | } |
476 | | |
477 | 3 | fn format_repo_work_human(resp: &RepoWorkResponse) -> String { |
478 | 3 | let mut out = String::new(); |
479 | 3 | out.push_str(&format!("Repo: {}\n", resp.path.display())); |
480 | 3 | if let Some(slug1 ) = &resp.slug { |
481 | 1 | out.push_str(&format!("Slug: {slug}\n")); |
482 | 2 | } |
483 | 3 | out.push('\n'); |
484 | | |
485 | 3 | if resp.work_items.is_empty() { |
486 | 2 | out.push_str("No work items.\n"); |
487 | 2 | } else { |
488 | 1 | let table = format_work_items_table(&resp.work_items); |
489 | 1 | out.push_str(&table.to_string()); |
490 | 1 | out.push('\n'); |
491 | 1 | } |
492 | 3 | out |
493 | 3 | } |
494 | | |
495 | 0 | pub async fn run_watch(socket_path: &Path, format: OutputFormat) -> Result<(), String> { |
496 | 0 | let daemon = SocketDaemon::connect(socket_path).await.map_err(|e| format!("cannot connect to daemon: {e}"))?; |
497 | | |
498 | | // Subscribe before replay so events emitted between replay and the loop |
499 | | // are buffered rather than silently dropped. |
500 | 0 | let mut rx = daemon.subscribe(); |
501 | | |
502 | | // Replay current state so the user sees an initial snapshot for every |
503 | | // tracked repo, matching how the TUI bootstraps. Track the seq per repo |
504 | | // so we can skip duplicate events that the broadcast buffer may also deliver. |
505 | 0 | let mut replay_seqs: HashMap<StreamKey, u64> = HashMap::new(); |
506 | 0 | match daemon.replay_since(&HashMap::new()).await { |
507 | 0 | Ok(events) => { |
508 | 0 | for event in &events { |
509 | 0 | if let Some((stream_key, seq)) = event_stream_seq(event) { |
510 | 0 | replay_seqs.entry(stream_key).and_modify(|s| *s = (*s).max(seq)).or_insert(seq); |
511 | 0 | } |
512 | 0 | let line = match format { |
513 | 0 | OutputFormat::Human => format_event_human(event), |
514 | 0 | OutputFormat::Json => flotilla_protocol::output::json_line(event), |
515 | | }; |
516 | 0 | println!("{line}"); |
517 | | } |
518 | | } |
519 | 0 | Err(e) => { |
520 | 0 | eprintln!("warning: failed to replay initial state: {e}"); |
521 | 0 | } |
522 | | } |
523 | | |
524 | 0 | if matches!(format, OutputFormat::Human) { |
525 | 0 | eprintln!("watching events (Ctrl-C to stop)..."); |
526 | 0 | } |
527 | | |
528 | | loop { |
529 | 0 | match rx.recv().await { |
530 | 0 | Ok(event) => { |
531 | | // Skip events already covered by replay to avoid duplicates. |
532 | 0 | if let Some((stream_key, seq)) = event_stream_seq(&event) { |
533 | 0 | if let Some(&replay_seq) = replay_seqs.get(&stream_key) { |
534 | 0 | if seq <= replay_seq { |
535 | 0 | continue; |
536 | 0 | } |
537 | 0 | } |
538 | 0 | } |
539 | 0 | let line = match format { |
540 | 0 | OutputFormat::Human => format_event_human(&event), |
541 | 0 | OutputFormat::Json => flotilla_protocol::output::json_line(&event), |
542 | | }; |
543 | 0 | println!("{line}"); |
544 | | } |
545 | 0 | Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { |
546 | 0 | eprintln!("warning: skipped {n} events"); |
547 | 0 | } |
548 | | Err(_) => { |
549 | 0 | eprintln!("daemon disconnected"); |
550 | 0 | break; |
551 | | } |
552 | | } |
553 | | } |
554 | | |
555 | 0 | Ok(()) |
556 | 0 | } |
557 | | |
558 | 0 | pub async fn run_command(daemon: &dyn DaemonHandle, command: Command, format: OutputFormat) -> Result<(), String> { |
559 | 0 | if command.action.is_query() { |
560 | 0 | return run_query_command(daemon, command, format).await; |
561 | 0 | } |
562 | | |
563 | 0 | let mut rx = daemon.subscribe(); |
564 | 0 | let command_id = daemon.execute(command).await?; |
565 | | |
566 | | loop { |
567 | 0 | match rx.recv().await { |
568 | 0 | Ok(ref event @ DaemonEvent::CommandStarted { command_id: id, .. }) if id == command_id => { |
569 | 0 | if matches!(format, OutputFormat::Human) { |
570 | 0 | println!("{}", format_event_human(event)); |
571 | 0 | } |
572 | | } |
573 | 0 | Ok(event @ DaemonEvent::CommandStepUpdate { command_id: id, .. }) if id == command_id => { |
574 | 0 | if matches!(format, OutputFormat::Human) { |
575 | 0 | println!("{}", format_event_human(&event)); |
576 | 0 | } |
577 | | } |
578 | 0 | Ok(ref event @ DaemonEvent::CommandFinished { command_id: id, ref result, .. }) if id == command_id => { |
579 | 0 | match format { |
580 | 0 | OutputFormat::Human => { |
581 | 0 | println!("{}", format_event_human(event)); |
582 | 0 | } |
583 | 0 | OutputFormat::Json => { |
584 | 0 | println!("{}", flotilla_protocol::output::json_pretty(&result)); |
585 | 0 | } |
586 | | } |
587 | 0 | let result = result.clone(); |
588 | 0 | return match result { |
589 | 0 | CommandValue::Error { message } => Err(message), |
590 | 0 | CommandValue::Cancelled => Err("command cancelled".into()), |
591 | 0 | _ => Ok(()), |
592 | | }; |
593 | | } |
594 | 0 | Ok(_) => {} |
595 | 0 | Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { |
596 | 0 | if matches!(format, OutputFormat::Human) { |
597 | 0 | eprintln!("warning: skipped {n} events"); |
598 | 0 | } |
599 | | } |
600 | | Err(tokio::sync::broadcast::error::RecvError::Closed) => { |
601 | 0 | return Err("daemon disconnected".into()); |
602 | | } |
603 | | } |
604 | | } |
605 | 0 | } |
606 | | |
607 | 0 | async fn run_query_command(daemon: &dyn DaemonHandle, command: Command, format: OutputFormat) -> Result<(), String> { |
608 | 0 | let result = daemon.execute_query(command, uuid::Uuid::new_v4()).await?; |
609 | 0 | match format { |
610 | 0 | OutputFormat::Human => { |
611 | 0 | print!("{}", format_command_result(&result)); |
612 | 0 | } |
613 | 0 | OutputFormat::Json => { |
614 | 0 | println!("{}", flotilla_protocol::output::json_pretty(&result)); |
615 | 0 | } |
616 | | } |
617 | 0 | match result { |
618 | 0 | CommandValue::Error { message } => Err(message), |
619 | 0 | CommandValue::Cancelled => Err("command cancelled".into()), |
620 | 0 | _ => Ok(()), |
621 | | } |
622 | 0 | } |
623 | | |
624 | | #[cfg(test)] |
625 | | mod tests; |