Coverage Report

Created: 2026-04-05 07:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
crates/flotilla-tui/src/cli.rs
Line
Count
Source
1
use std::{collections::HashMap, path::Path};
2
3
use comfy_table::{presets::UTF8_FULL_CONDENSED, Cell, Table};
4
use flotilla_core::daemon::DaemonHandle;
5
use flotilla_protocol::{
6
    output::OutputFormat, Command, CommandValue, DaemonEvent, EnvironmentInfo, EnvironmentStatus, HostProvidersResponse,
7
    HostStatusResponse, PeerConnectionState, RepoDetailResponse, RepoProvidersResponse, RepoWorkResponse, StatusResponse, StreamKey,
8
    TopologyResponse,
9
};
10
11
use crate::socket::SocketDaemon;
12
13
6
fn format_work_items_table(items: &[flotilla_protocol::snapshot::WorkItem]) -> Table {
14
6
    let mut table = Table::new();
15
6
    table.load_preset(UTF8_FULL_CONDENSED);
16
6
    table.set_header(vec!["Kind", "Branch", "Description", "PR", "Session", "Issues"]);
17
7
    for item in 
items6
{
18
7
        table.add_row(vec![
19
7
            Cell::new(format!("{:?}", item.kind)),
20
7
            Cell::new(item.branch.as_deref().unwrap_or("-")),
21
7
            Cell::new(&item.description),
22
7
            Cell::new(item.change_request_key.as_deref().unwrap_or("-")),
23
7
            Cell::new(item.session_key.as_deref().unwrap_or("-")),
24
7
            Cell::new(if item.issue_keys.is_empty() { 
"-"6
.
into6
() } else {
item.issue_keys1
.join(", ") }),
25
        ]);
26
    }
27
6
    table
28
6
}
29
30
2
fn format_status_response_human(status: &StatusResponse) -> String {
31
2
    if status.repos.is_empty() {
32
1
        return "No repos tracked.\n".into();
33
1
    }
34
1
    let mut table = Table::new();
35
1
    table.load_preset(UTF8_FULL_CONDENSED);
36
1
    table.set_header(vec!["Repo", "Path", "Work Items", "Errors", "Health"]);
37
1
    for repo in &status.repos {
38
1
        let name = repo.path.file_name().map(|n| n.to_string_lossy().to_string()).unwrap_or_default();
39
1
        let mut health: Vec<String> = repo
40
1
            .provider_health
41
1
            .iter()
42
1
            .flat_map(|(cat, providers)| {
43
1
                providers.iter().map(move |(name, ok)| format!("{cat}/{name}: {}", if *ok { "ok" } else { 
"error"0
}))
44
1
            })
45
1
            .collect();
46
1
        health.sort();
47
1
        let health_str = if health.is_empty() { 
"-"0
.
into0
() } else { health.join(", ") };
48
1
        table.add_row(vec![
49
1
            Cell::new(&name),
50
1
            Cell::new(repo.path.display()),
51
1
            Cell::new(repo.work_item_count),
52
1
            Cell::new(repo.error_count),
53
1
            Cell::new(&health_str),
54
        ]);
55
    }
56
1
    format!("{table}\n")
57
2
}
58
59
4
fn format_connection_status(status: &PeerConnectionState) -> &'static str {
60
4
    match status {
61
3
        PeerConnectionState::Connected => "connected",
62
1
        PeerConnectionState::Disconnected => "disconnected",
63
0
        PeerConnectionState::Connecting => "connecting",
64
0
        PeerConnectionState::Reconnecting => "reconnecting",
65
0
        PeerConnectionState::Rejected { .. } => "rejected",
66
    }
67
4
}
68
69
1
fn inventory_is_empty(inventory: &flotilla_protocol::ToolInventory) -> bool {
70
1
    inventory.binaries.is_empty() && inventory.sockets.is_empty() && inventory.auth.is_empty() && inventory.env_vars.is_empty()
71
1
}
72
73
4
fn environment_status_label(status: &EnvironmentStatus) -> String {
74
4
    match status {
75
0
        EnvironmentStatus::Building => "building".to_string(),
76
0
        EnvironmentStatus::Starting => "starting".to_string(),
77
4
        EnvironmentStatus::Running => "running".to_string(),
78
0
        EnvironmentStatus::Stopped => "stopped".to_string(),
79
0
        EnvironmentStatus::Failed(message) => format!("failed: {message}"),
80
    }
81
4
}
82
83
2
fn format_visible_environments_human(environments: &[EnvironmentInfo]) -> String {
84
2
    if environments.is_empty() {
85
0
        return String::new();
86
2
    }
87
88
2
    let mut table = Table::new();
89
2
    table.load_preset(UTF8_FULL_CONDENSED);
90
2
    table.set_header(vec!["Kind", "Id", "Display Name", "Status", "Image"]);
91
4
    for environment in 
environments2
{
92
4
        match environment {
93
2
            EnvironmentInfo::Direct { id, display_name, status, .. } => {
94
2
                table.add_row(vec![
95
2
                    Cell::new("direct"),
96
2
                    Cell::new(id.as_str()),
97
2
                    Cell::new(display_name.as_deref().unwrap_or("-")),
98
2
                    Cell::new(environment_status_label(status)),
99
2
                    Cell::new("-"),
100
2
                ]);
101
2
            }
102
2
            EnvironmentInfo::Provisioned { id, display_name, image, status } => {
103
2
                table.add_row(vec![
104
2
                    Cell::new("provisioned"),
105
2
                    Cell::new(id.as_str()),
106
2
                    Cell::new(display_name.as_deref().unwrap_or("-")),
107
2
                    Cell::new(environment_status_label(status)),
108
2
                    Cell::new(image.as_str()),
109
2
                ]);
110
2
            }
111
        }
112
    }
113
2
    format!("Visible Environments:\n{table}\n")
114
2
}
115
116
2
fn format_host_list_human(response: &flotilla_protocol::HostListResponse) -> String {
117
2
    if response.hosts.is_empty() {
118
1
        return "No hosts known.\n".into();
119
1
    }
120
121
1
    let mut table = Table::new();
122
1
    table.load_preset(UTF8_FULL_CONDENSED);
123
1
    table.set_header(vec!["Host", "Local", "Configured", "Status", "Summary", "Repos", "Work"]);
124
2
    for host in 
&response.hosts1
{
125
2
        table.add_row(vec![
126
2
            Cell::new(host.host.as_str()),
127
2
            Cell::new(if host.is_local { 
"yes"1
} else {
"no"1
}),
128
2
            Cell::new(if host.configured { 
"yes"1
} else {
"no"1
}),
129
2
            Cell::new(format_connection_status(&host.connection_status)),
130
2
            Cell::new(if host.has_summary { 
"yes"1
} else {
"no"1
}),
131
2
            Cell::new(host.repo_count),
132
2
            Cell::new(host.work_item_count),
133
        ]);
134
    }
135
1
    format!("{table}\n")
136
2
}
137
138
1
fn format_host_status_human(response: &HostStatusResponse) -> String {
139
1
    let mut out = String::new();
140
1
    out.push_str(&format!("Host: {}\n", response.host));
141
1
    out.push_str(&format!("Status: {}\n", format_connection_status(&response.connection_status)));
142
1
    out.push_str(&format!("Configured: {}\n", if response.configured { 
"yes"0
} else { "no" }));
143
1
    out.push_str(&format!("Repositories: {}\n", response.repo_count));
144
1
    out.push_str(&format!("Work Items: {}\n", response.work_item_count));
145
146
1
    if let Some(summary) = &response.summary {
147
1
        out.push_str("\nSystem:\n");
148
1
        if let Some(os) = &summary.system.os {
149
1
            out.push_str(&format!("  OS: {os}\n"));
150
1
        
}0
151
1
        if let Some(arch) = &summary.system.arch {
152
1
            out.push_str(&format!("  Arch: {arch}\n"));
153
1
        
}0
154
1
        if let Some(cpus) = summary.system.cpu_count {
155
1
            out.push_str(&format!("  CPUs: {cpus}\n"));
156
1
        
}0
157
1
        if let Some(memory) = summary.system.memory_total_mb {
158
1
            out.push_str(&format!("  Memory: {} MB\n", memory));
159
1
        
}0
160
0
    }
161
162
1
    out.push_str(&format_visible_environments_human(&response.visible_environments));
163
164
1
    out
165
1
}
166
167
1
fn format_host_providers_human(response: &HostProvidersResponse) -> String {
168
1
    let mut out = String::new();
169
1
    out.push_str(&format!("Host: {}\n", response.host));
170
1
    out.push_str(&format!("Status: {}\n", format_connection_status(&response.connection_status)));
171
1
    out.push_str(&format!("Configured: {}\n", if response.configured { 
"yes"0
} else { "no" }));
172
173
1
    out.push_str("\nInventory:\n");
174
1
    if inventory_is_empty(&response.summary.inventory) {
175
1
        out.push_str("  No inventory facts.\n");
176
1
    } else {
177
0
        for fact in &response.summary.inventory.binaries {
178
0
            out.push_str(&format!("  binary: {}\n", fact.name));
179
0
        }
180
0
        for fact in &response.summary.inventory.sockets {
181
0
            out.push_str(&format!("  socket: {}\n", fact.name));
182
0
        }
183
0
        for fact in &response.summary.inventory.auth {
184
0
            out.push_str(&format!("  auth: {}\n", fact.name));
185
0
        }
186
0
        for fact in &response.summary.inventory.env_vars {
187
0
            out.push_str(&format!("  env: {}\n", fact.name));
188
0
        }
189
    }
190
191
1
    out.push_str("\nProviders:\n");
192
1
    let mut table = Table::new();
193
1
    table.load_preset(UTF8_FULL_CONDENSED);
194
1
    table.set_header(vec!["Category", "Name", "Health"]);
195
1
    for provider in &response.summary.providers {
196
1
        table.add_row(vec![
197
1
            Cell::new(&provider.category),
198
1
            Cell::new(&provider.name),
199
1
            Cell::new(if provider.healthy { "ok" } else { 
"error"0
}),
200
        ]);
201
    }
202
1
    out.push_str(&table.to_string());
203
1
    out.push('\n');
204
1
    out.push_str(&format_visible_environments_human(&response.visible_environments));
205
1
    out
206
1
}
207
208
1
fn format_topology_human(response: &TopologyResponse) -> String {
209
1
    let mut out = String::new();
210
1
    out.push_str(&format!("Local Host: {}\n", response.local_host));
211
1
    if response.routes.is_empty() {
212
0
        out.push_str("No routes.\n");
213
0
        return out;
214
1
    }
215
216
1
    let mut table = Table::new();
217
1
    table.load_preset(UTF8_FULL_CONDENSED);
218
1
    table.set_header(vec!["Target", "Via", "Direct", "Connected", "Fallbacks"]);
219
1
    for route in &response.routes {
220
1
        let fallbacks = if route.fallbacks.is_empty() {
221
0
            "-".to_string()
222
        } else {
223
1
            route.fallbacks.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
224
        };
225
1
        table.add_row(vec![
226
1
            Cell::new(route.target.as_str()),
227
1
            Cell::new(route.next_hop.as_str()),
228
1
            Cell::new(if route.direct { 
"yes"0
} else { "no" }),
229
1
            Cell::new(if route.connected { "yes" } else { 
"no"0
}),
230
1
            Cell::new(fallbacks),
231
        ]);
232
    }
233
1
    out.push_str(&table.to_string());
234
1
    out.push('\n');
235
1
    out
236
1
}
237
238
/// Extract a short display name from a repo path (last path component).
239
/// Falls back to the full path display for root or non-UTF-8 paths,
240
/// matching `flotilla_core::model::repo_name`.
241
11
fn repo_name(path: &std::path::Path) -> String {
242
11
    path.file_name().map(|n| 
n.to_string_lossy()10
.
to_string10
()).unwrap_or_else(||
path.to_string_lossy()1
.
to_string1
())
243
11
}
244
245
8
fn repo_label(path: Option<&std::path::Path>, identity: &flotilla_protocol::RepoIdentity) -> String {
246
8
    path.map(repo_name).unwrap_or_else(|| 
identity.path0
.
clone0
())
247
8
}
248
249
/// Format a `CommandValue` as a short human-readable string.
250
20
fn format_command_result(result: &flotilla_protocol::commands::CommandValue) -> String {
251
    use flotilla_protocol::commands::CommandValue;
252
20
    match result {
253
3
        CommandValue::Ok => "ok".to_string(),
254
2
        CommandValue::RepoTracked { path, resolved_from } => match resolved_from {
255
1
            Some(original) => format!("repo tracked: {} (resolved from {})", path.display(), original.display()),
256
1
            None => format!("repo tracked: {}", path.display()),
257
        },
258
1
        CommandValue::RepoUntracked { path } => format!("repo untracked: {}", path.display()),
259
2
        CommandValue::Refreshed { repos } => format!("refreshed {} repo(s)", repos.len()),
260
1
        CommandValue::CheckoutCreated { branch, .. } => format!("checkout created: {branch}"),
261
1
        CommandValue::CheckoutRemoved { branch } => format!("checkout removed: {branch}"),
262
0
        CommandValue::TerminalPrepared { branch, target_host, .. } => format!("terminal prepared: {branch} on {target_host}"),
263
1
        CommandValue::BranchNameGenerated { name, .. } => format!("branch name: {name}"),
264
3
        CommandValue::CheckoutStatus(status) => {
265
3
            let mut parts = vec![format!("checkout status: {}", status.branch)];
266
3
            if let Some(
cr2
) = &status.change_request_status {
267
2
                parts.push(format!("PR: {cr}"));
268
2
            
}1
269
3
            if let Some(
sha1
) = &status.merge_commit_sha {
270
1
                parts.push(format!("merged via {}", &sha[..sha.len().min(7)]));
271
2
            }
272
3
            if !status.unpushed_commits.is_empty() {
273
1
                parts.push(format!("{} unpushed", status.unpushed_commits.len()));
274
2
            }
275
3
            if status.has_uncommitted {
276
1
                parts.push("uncommitted changes".to_string());
277
2
            }
278
3
            if let Some(
warning0
) = &status.base_detection_warning {
279
0
                parts.push(format!("warning: {warning}"));
280
3
            }
281
3
            parts.join(", ")
282
        }
283
3
        CommandValue::Error { message } => format!("error: {message}"),
284
1
        CommandValue::Cancelled => "cancelled".to_string(),
285
        CommandValue::PreparedWorkspace(_) | CommandValue::AttachCommandResolved { .. } | CommandValue::CheckoutPathResolved { .. } => {
286
1
            "internal step result".to_string()
287
        }
288
0
        CommandValue::RepoDetail(detail) => format_repo_detail_human(detail),
289
0
        CommandValue::RepoProviders(providers) => format_repo_providers_human(providers),
290
0
        CommandValue::RepoWork(work) => format_repo_work_human(work),
291
1
        CommandValue::HostList(hosts) => format_host_list_human(hosts),
292
0
        CommandValue::HostStatus(status) => format_host_status_human(status),
293
0
        CommandValue::HostProviders(providers) => format_host_providers_human(providers),
294
0
        CommandValue::ImageEnsured { image } => format!("image ensured: {image}"),
295
0
        CommandValue::EnvironmentCreated { env_id } => format!("environment created: {env_id}"),
296
0
        CommandValue::EnvironmentSpecRead { .. } => "environment spec read".to_string(),
297
0
        CommandValue::IssuePage(page) => format!("issue page: {} items, has_more={}", page.items.len(), page.has_more),
298
0
        CommandValue::IssuesByIds { items } => format!("issues by ids: {} items", items.len()),
299
    }
300
20
}
301
302
17
pub(crate) fn format_event_human(event: &flotilla_protocol::DaemonEvent) -> String {
303
    use flotilla_protocol::{DaemonEvent, PeerConnectionState};
304
17
    match event {
305
1
        DaemonEvent::RepoSnapshot(snap) => {
306
1
            format!(
307
                "[snapshot] {}: full snapshot (seq {}, {} work items)",
308
1
                repo_label(snap.repo.as_deref(), &snap.repo_identity),
309
                snap.seq,
310
1
                snap.work_items.len()
311
            )
312
        }
313
1
        DaemonEvent::RepoDelta(delta) => {
314
1
            format!(
315
                "[delta]    {}: delta seq {}\u{2192}{} ({} changes)",
316
1
                repo_label(delta.repo.as_deref(), &delta.repo_identity),
317
                delta.prev_seq,
318
                delta.seq,
319
1
                delta.changes.len()
320
            )
321
        }
322
1
        DaemonEvent::RepoTracked(info) => {
323
1
            format!("[repo]     {}: tracked", info.name)
324
        }
325
1
        DaemonEvent::RepoUntracked { repo_identity, path } => {
326
1
            format!("[repo]     {}: untracked", repo_label(path.as_deref(), repo_identity))
327
        }
328
3
        DaemonEvent::CommandStarted { repo_identity, repo, description, .. } => {
329
3
            if repo.is_none() && 
repo_identity.authority1
.
is_empty1
() &&
repo_identity.path1
.
is_empty1
() {
330
                // Query commands have no repo context — show description only
331
1
                format!("[query]    {description}")
332
            } else {
333
2
                format!("[command]  {}: started \"{}\"", repo_label(repo.as_deref(), repo_identity), description)
334
            }
335
        }
336
5
        DaemonEvent::CommandFinished { repo_identity, repo, result, .. } => {
337
5
            if repo.is_none() && 
repo_identity.authority2
.
is_empty2
() &&
repo_identity.path2
.
is_empty2
() {
338
                // Query commands have no repo context — show result directly
339
2
                format_command_result(result)
340
            } else {
341
3
                format!("[command]  {}: finished \u{2192} {}", repo_label(repo.as_deref(), repo_identity), format_command_result(result))
342
            }
343
        }
344
0
        DaemonEvent::CommandStepUpdate { repo_identity, repo, description, step_index, step_count, .. } => {
345
0
            format!("[step]     {}: {} ({}/{})", repo_label(repo.as_deref(), repo_identity), description, step_index + 1, step_count)
346
        }
347
5
        DaemonEvent::PeerStatusChanged { host, status } => {
348
5
            let state = match status {
349
1
                PeerConnectionState::Connected => "connected".to_string(),
350
1
                PeerConnectionState::Disconnected => "disconnected".to_string(),
351
1
                PeerConnectionState::Connecting => "connecting".to_string(),
352
1
                PeerConnectionState::Reconnecting => "reconnecting".to_string(),
353
1
                PeerConnectionState::Rejected { reason } => format!("rejected: {reason}"),
354
            };
355
5
            format!("[peer]     {host}: {state}")
356
        }
357
0
        DaemonEvent::HostSnapshot(snap) => {
358
0
            let state = match &snap.connection_status {
359
0
                PeerConnectionState::Connected => "connected",
360
0
                PeerConnectionState::Disconnected => "disconnected",
361
0
                PeerConnectionState::Connecting => "connecting",
362
0
                PeerConnectionState::Reconnecting => "reconnecting",
363
0
                PeerConnectionState::Rejected { .. } => "rejected",
364
            };
365
0
            format!("[host]     {}: {} (seq {})", snap.host_name, state, snap.seq)
366
        }
367
0
        DaemonEvent::HostRemoved { host, seq } => {
368
0
            format!("[host]     {host}: removed (seq {seq})")
369
        }
370
    }
371
17
}
372
373
/// Extract the (stream_key, seq) from a snapshot/delta event, if present.
374
0
fn event_stream_seq(event: &DaemonEvent) -> Option<(StreamKey, u64)> {
375
0
    match event {
376
0
        DaemonEvent::RepoSnapshot(snap) => Some((StreamKey::Repo { identity: snap.repo_identity.clone() }, snap.seq)),
377
0
        DaemonEvent::RepoDelta(delta) => Some((StreamKey::Repo { identity: delta.repo_identity.clone() }, delta.seq)),
378
0
        DaemonEvent::HostSnapshot(snap) => Some((StreamKey::Host { host_name: snap.host_name.clone() }, snap.seq)),
379
0
        DaemonEvent::HostRemoved { host, seq } => Some((StreamKey::Host { host_name: host.clone() }, *seq)),
380
0
        _ => None,
381
    }
382
0
}
383
384
0
pub async fn run_status(socket_path: &Path, format: OutputFormat) -> Result<(), String> {
385
0
    let daemon = SocketDaemon::connect(socket_path).await.map_err(|e| format!("cannot connect to daemon: {e}"))?;
386
0
    let status = daemon.get_status().await?;
387
0
    let output = match format {
388
0
        OutputFormat::Human => format_status_response_human(&status),
389
0
        OutputFormat::Json => flotilla_protocol::output::json_pretty(&status),
390
    };
391
0
    print!("{output}");
392
0
    Ok(())
393
0
}
394
395
0
pub async fn run_topology(daemon: &dyn DaemonHandle, format: OutputFormat) -> Result<(), String> {
396
0
    let topology = daemon.get_topology().await?;
397
0
    let output = match format {
398
0
        OutputFormat::Human => format_topology_human(&topology),
399
0
        OutputFormat::Json => flotilla_protocol::output::json_pretty(&topology),
400
    };
401
0
    print!("{output}");
402
0
    Ok(())
403
0
}
404
405
4
fn format_repo_detail_human(detail: &RepoDetailResponse) -> String {
406
4
    let mut out = String::new();
407
4
    out.push_str(&format!("Repo: {}\n", detail.path.display()));
408
4
    if let Some(
slug1
) = &detail.slug {
409
1
        out.push_str(&format!("Slug: {slug}\n"));
410
3
    }
411
4
    out.push('\n');
412
413
4
    if !detail.work_items.is_empty() {
414
1
        let table = format_work_items_table(&detail.work_items);
415
1
        out.push_str(&table.to_string());
416
1
        out.push('\n');
417
3
    }
418
419
4
    if !detail.errors.is_empty() {
420
1
        out.push_str("\nErrors:\n");
421
1
        for err in &detail.errors {
422
1
            out.push_str(&format!("  [{}/{}] {}\n", err.category, err.provider, err.message));
423
1
        }
424
3
    }
425
4
    out
426
4
}
427
428
6
fn format_repo_providers_human(resp: &RepoProvidersResponse) -> String {
429
6
    let mut out = String::new();
430
6
    out.push_str(&format!("Repo: {}\n", resp.path.display()));
431
6
    if let Some(
slug1
) = &resp.slug {
432
1
        out.push_str(&format!("Slug: {slug}\n"));
433
5
    }
434
435
6
    if !resp.host_discovery.is_empty() {
436
1
        out.push_str("\nHost Discovery:\n");
437
1
        for entry in &resp.host_discovery {
438
1
            let mut details: Vec<String> = entry.detail.iter().map(|(k, v)| format!("{k}={v}")).collect();
439
1
            details.sort();
440
1
            out.push_str(&format!("  {} ({})\n", entry.kind, details.join(", ")));
441
        }
442
5
    }
443
444
6
    if !resp.repo_discovery.is_empty() {
445
1
        out.push_str("\nRepo Discovery:\n");
446
1
        for entry in &resp.repo_discovery {
447
1
            let mut details: Vec<String> = entry.detail.iter().map(|(k, v)| format!("{k}={v}")).collect();
448
1
            details.sort();
449
1
            out.push_str(&format!("  {} ({})\n", entry.kind, details.join(", ")));
450
        }
451
5
    }
452
453
6
    if !resp.providers.is_empty() {
454
1
        out.push_str("\nProviders:\n");
455
1
        let mut table = Table::new();
456
1
        table.load_preset(UTF8_FULL_CONDENSED);
457
1
        table.set_header(vec!["Category", "Name", "Health"]);
458
2
        for p in 
&resp.providers1
{
459
2
            table.add_row(vec![Cell::new(&p.category), Cell::new(&p.name), Cell::new(if p.healthy { 
"ok"1
} else {
"error"1
})]);
460
        }
461
1
        out.push_str(&table.to_string());
462
1
        out.push('\n');
463
5
    }
464
465
6
    if !resp.unmet_requirements.is_empty() {
466
1
        out.push_str("\nUnmet Requirements:\n");
467
2
        for ur in 
&resp.unmet_requirements1
{
468
2
            match &ur.value {
469
1
                Some(value) => out.push_str(&format!("  {}: {} ({value})\n", ur.factory, ur.kind)),
470
1
                None => out.push_str(&format!("  {}: {}\n", ur.factory, ur.kind)),
471
            }
472
        }
473
5
    }
474
6
    out
475
6
}
476
477
3
fn format_repo_work_human(resp: &RepoWorkResponse) -> String {
478
3
    let mut out = String::new();
479
3
    out.push_str(&format!("Repo: {}\n", resp.path.display()));
480
3
    if let Some(
slug1
) = &resp.slug {
481
1
        out.push_str(&format!("Slug: {slug}\n"));
482
2
    }
483
3
    out.push('\n');
484
485
3
    if resp.work_items.is_empty() {
486
2
        out.push_str("No work items.\n");
487
2
    } else {
488
1
        let table = format_work_items_table(&resp.work_items);
489
1
        out.push_str(&table.to_string());
490
1
        out.push('\n');
491
1
    }
492
3
    out
493
3
}
494
495
0
pub async fn run_watch(socket_path: &Path, format: OutputFormat) -> Result<(), String> {
496
0
    let daemon = SocketDaemon::connect(socket_path).await.map_err(|e| format!("cannot connect to daemon: {e}"))?;
497
498
    // Subscribe before replay so events emitted between replay and the loop
499
    // are buffered rather than silently dropped.
500
0
    let mut rx = daemon.subscribe();
501
502
    // Replay current state so the user sees an initial snapshot for every
503
    // tracked repo, matching how the TUI bootstraps.  Track the seq per repo
504
    // so we can skip duplicate events that the broadcast buffer may also deliver.
505
0
    let mut replay_seqs: HashMap<StreamKey, u64> = HashMap::new();
506
0
    match daemon.replay_since(&HashMap::new()).await {
507
0
        Ok(events) => {
508
0
            for event in &events {
509
0
                if let Some((stream_key, seq)) = event_stream_seq(event) {
510
0
                    replay_seqs.entry(stream_key).and_modify(|s| *s = (*s).max(seq)).or_insert(seq);
511
0
                }
512
0
                let line = match format {
513
0
                    OutputFormat::Human => format_event_human(event),
514
0
                    OutputFormat::Json => flotilla_protocol::output::json_line(event),
515
                };
516
0
                println!("{line}");
517
            }
518
        }
519
0
        Err(e) => {
520
0
            eprintln!("warning: failed to replay initial state: {e}");
521
0
        }
522
    }
523
524
0
    if matches!(format, OutputFormat::Human) {
525
0
        eprintln!("watching events (Ctrl-C to stop)...");
526
0
    }
527
528
    loop {
529
0
        match rx.recv().await {
530
0
            Ok(event) => {
531
                // Skip events already covered by replay to avoid duplicates.
532
0
                if let Some((stream_key, seq)) = event_stream_seq(&event) {
533
0
                    if let Some(&replay_seq) = replay_seqs.get(&stream_key) {
534
0
                        if seq <= replay_seq {
535
0
                            continue;
536
0
                        }
537
0
                    }
538
0
                }
539
0
                let line = match format {
540
0
                    OutputFormat::Human => format_event_human(&event),
541
0
                    OutputFormat::Json => flotilla_protocol::output::json_line(&event),
542
                };
543
0
                println!("{line}");
544
            }
545
0
            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
546
0
                eprintln!("warning: skipped {n} events");
547
0
            }
548
            Err(_) => {
549
0
                eprintln!("daemon disconnected");
550
0
                break;
551
            }
552
        }
553
    }
554
555
0
    Ok(())
556
0
}
557
558
0
pub async fn run_command(daemon: &dyn DaemonHandle, command: Command, format: OutputFormat) -> Result<(), String> {
559
0
    if command.action.is_query() {
560
0
        return run_query_command(daemon, command, format).await;
561
0
    }
562
563
0
    let mut rx = daemon.subscribe();
564
0
    let command_id = daemon.execute(command).await?;
565
566
    loop {
567
0
        match rx.recv().await {
568
0
            Ok(ref event @ DaemonEvent::CommandStarted { command_id: id, .. }) if id == command_id => {
569
0
                if matches!(format, OutputFormat::Human) {
570
0
                    println!("{}", format_event_human(event));
571
0
                }
572
            }
573
0
            Ok(event @ DaemonEvent::CommandStepUpdate { command_id: id, .. }) if id == command_id => {
574
0
                if matches!(format, OutputFormat::Human) {
575
0
                    println!("{}", format_event_human(&event));
576
0
                }
577
            }
578
0
            Ok(ref event @ DaemonEvent::CommandFinished { command_id: id, ref result, .. }) if id == command_id => {
579
0
                match format {
580
0
                    OutputFormat::Human => {
581
0
                        println!("{}", format_event_human(event));
582
0
                    }
583
0
                    OutputFormat::Json => {
584
0
                        println!("{}", flotilla_protocol::output::json_pretty(&result));
585
0
                    }
586
                }
587
0
                let result = result.clone();
588
0
                return match result {
589
0
                    CommandValue::Error { message } => Err(message),
590
0
                    CommandValue::Cancelled => Err("command cancelled".into()),
591
0
                    _ => Ok(()),
592
                };
593
            }
594
0
            Ok(_) => {}
595
0
            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
596
0
                if matches!(format, OutputFormat::Human) {
597
0
                    eprintln!("warning: skipped {n} events");
598
0
                }
599
            }
600
            Err(tokio::sync::broadcast::error::RecvError::Closed) => {
601
0
                return Err("daemon disconnected".into());
602
            }
603
        }
604
    }
605
0
}
606
607
0
async fn run_query_command(daemon: &dyn DaemonHandle, command: Command, format: OutputFormat) -> Result<(), String> {
608
0
    let result = daemon.execute_query(command, uuid::Uuid::new_v4()).await?;
609
0
    match format {
610
0
        OutputFormat::Human => {
611
0
            print!("{}", format_command_result(&result));
612
0
        }
613
0
        OutputFormat::Json => {
614
0
            println!("{}", flotilla_protocol::output::json_pretty(&result));
615
0
        }
616
    }
617
0
    match result {
618
0
        CommandValue::Error { message } => Err(message),
619
0
        CommandValue::Cancelled => Err("command cancelled".into()),
620
0
        _ => Ok(()),
621
    }
622
0
}
623
624
#[cfg(test)]
625
mod tests;