Coverage Report

Created: 2026-04-05 07:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
crates/flotilla-core/src/providers/issue_tracker/github.rs
Line
Count
Source
1
use std::{path::Path, sync::Arc};
2
3
use async_trait::async_trait;
4
5
use crate::providers::{
6
    gh_api_get, gh_api_get_with_headers,
7
    github_api::{clamp_per_page, GhApi},
8
    run,
9
    types::*,
10
    CommandRunner,
11
};
12
13
pub struct GitHubIssueTracker {
14
    provider_name: String,
15
    repo_slug: String,
16
    api: Arc<dyn GhApi>,
17
    runner: Arc<dyn CommandRunner>,
18
}
19
20
impl GitHubIssueTracker {
21
6
    pub fn new(provider_name: String, repo_slug: String, api: Arc<dyn GhApi>, runner: Arc<dyn CommandRunner>) -> Self {
22
6
        Self { provider_name, repo_slug, api, runner }
23
6
    }
24
}
25
26
46
pub(crate) fn parse_issue(provider_name: &str, v: &serde_json::Value) -> Option<(String, Issue)> {
27
46
    let number = v["number"].as_i64()
?0
;
28
46
    let title = v["title"].as_str()
?0
.to_string();
29
46
    let labels: Vec<String> = v["labels"]
30
46
        .as_array()
31
46
        .map(|arr| arr.iter().filter_map(|l| 
l["name"]26
.
as_str26
().
map26
(|s|
s26
.
to_string26
())).collect())
32
46
        .unwrap_or_default();
33
46
    let id = number.to_string();
34
46
    let association_keys = vec![AssociationKey::IssueRef(provider_name.to_string(), id.clone())];
35
46
    Some((id, Issue { title, labels, association_keys, provider_name: provider_name.to_string(), provider_display_name: "GitHub".into() }))
36
46
}
37
38
#[async_trait]
39
impl super::IssueProvider for GitHubIssueTracker {
40
1
    async fn list_issues(&self, repo_root: &Path, limit: usize) -> Result<Vec<(String, Issue)>, String> {
41
        let page = self.list_issues_page(repo_root, 1, limit).await?;
42
        Ok(page.issues)
43
1
    }
44
45
1
    async fn list_issues_page(&self, repo_root: &Path, page: u32, per_page: usize) -> Result<IssuePage, String> {
46
        let per_page = clamp_per_page(per_page);
47
        let endpoint = format!("repos/{}/issues?state=open&per_page={}&page={}", self.repo_slug, per_page, page);
48
        let response = gh_api_get_with_headers!(self.api, &endpoint, repo_root)?;
49
0
        let items: Vec<serde_json::Value> = serde_json::from_str(&response.body).map_err(|e| e.to_string())?;
50
51
        let issues: Vec<(String, Issue)> = items
52
            .into_iter()
53
30
            .filter(|v| !v.as_object().map(|o| o.contains_key("pull_request")).unwrap_or(false))
54
30
            .filter_map(|v| parse_issue(&self.provider_name, &v))
55
            .collect();
56
57
        Ok(IssuePage { issues, total_count: None, has_more: response.has_next_page })
58
1
    }
59
60
0
    async fn fetch_issues_by_id(&self, repo_root: &Path, ids: &[String]) -> Result<Vec<(String, Issue)>, String> {
61
        use futures::stream::{
62
            StreamExt, {self},
63
        };
64
        let futs: Vec<_> = ids
65
            .iter()
66
0
            .map(|id| {
67
0
                let endpoint = format!("repos/{}/issues/{}", self.repo_slug, id);
68
0
                let api = Arc::clone(&self.api);
69
0
                let repo_root = repo_root.to_path_buf();
70
0
                let provider_name = self.provider_name.clone();
71
0
                let id = id.clone();
72
0
                async move {
73
0
                    let body = gh_api_get!(api, &endpoint, &repo_root)?;
74
0
                    let v: serde_json::Value = serde_json::from_str(&body).map_err(|e| e.to_string())?;
75
0
                    parse_issue(&provider_name, &v).ok_or_else(|| format!("failed to parse issue {}", id))
76
0
                }
77
0
            })
78
            .collect();
79
80
        let results: Vec<_> = stream::iter(futs).buffer_unordered(10).collect().await;
81
        let mut issues = Vec::new();
82
        for result in results {
83
            match result {
84
                Ok(issue) => issues.push(issue),
85
                Err(e) => tracing::warn!(provider = "github", err = %e, "failed to fetch issue"),
86
            }
87
        }
88
        Ok(issues)
89
0
    }
90
91
0
    async fn search_issues(&self, repo_root: &Path, query: &str, limit: usize) -> Result<Vec<(String, Issue)>, String> {
92
        let per_page = clamp_per_page(limit);
93
        let raw_query = format!("repo:{} is:issue is:open {}", self.repo_slug, query);
94
        let encoded_query = urlencoding::encode(&raw_query);
95
        let endpoint = format!("search/issues?q={}&per_page={}", encoded_query, per_page);
96
        let body = gh_api_get!(self.api, &endpoint, repo_root)?;
97
0
        let response: serde_json::Value = serde_json::from_str(&body).map_err(|e| e.to_string())?;
98
99
        let items = response["items"].as_array().ok_or("no items array in search response")?;
100
0
        Ok(items.iter().filter_map(|v| parse_issue(&self.provider_name, v)).collect())
101
0
    }
102
103
4
    async fn list_issues_changed_since(&self, repo_root: &Path, since: &str, per_page: usize) -> Result<IssueChangeset, String> {
104
        let per_page = clamp_per_page(per_page);
105
        let encoded_since = urlencoding::encode(since);
106
        let endpoint =
107
            format!("repos/{}/issues?state=all&since={}&sort=updated&direction=desc&per_page={}", self.repo_slug, encoded_since, per_page);
108
        let response = gh_api_get_with_headers!(self.api, &endpoint, repo_root)?;
109
0
        let items: Vec<serde_json::Value> = serde_json::from_str(&response.body).map_err(|e| e.to_string())?;
110
111
        let mut updated = Vec::new();
112
        let mut closed_ids = Vec::new();
113
114
        for v in &items {
115
9
            if v.as_object().map(|o| o.contains_key("pull_request")).unwrap_or(false) {
116
                continue;
117
            }
118
            let state = v["state"].as_str().unwrap_or("open");
119
            if state == "open" {
120
                if let Some(issue) = parse_issue(&self.provider_name, v) {
121
                    updated.push(issue);
122
                }
123
            } else if let Some(number) = v["number"].as_i64() {
124
                closed_ids.push(number.to_string());
125
            }
126
        }
127
128
        // Escalate when there are more pages AND at least one real issue
129
        // on this page — remaining pages likely contain more issues too.
130
        // When issue_count == 0 (all PRs), don't escalate: there are no
131
        // issue changes to miss.
132
        let issue_count = updated.len() + closed_ids.len();
133
        Ok(IssueChangeset { updated, closed_ids, has_more: response.has_next_page && issue_count > 0 })
134
4
    }
135
136
0
    async fn open_in_browser(&self, repo_root: &Path, id: &str) -> Result<(), String> {
137
        run!(self.runner, "gh", &["issue", "view", id, "--web"], repo_root)?;
138
        Ok(())
139
0
    }
140
}
141
142
#[cfg(test)]
143
mod tests {
144
    use std::{collections::VecDeque, sync::Mutex};
145
146
    use super::*;
147
    use crate::providers::{
148
        github_api::{GhApi, GhApiResponse},
149
        github_test_support::{build_api_and_runner, repo_root_for_recording},
150
        issue_tracker::IssueProvider,
151
        testing::MockRunner,
152
        ChannelLabel,
153
    };
154
155
    struct MockGhApi {
156
        responses: Mutex<VecDeque<Result<GhApiResponse, String>>>,
157
    }
158
159
    impl MockGhApi {
160
4
        fn new(responses: Vec<Result<GhApiResponse, String>>) -> Self {
161
4
            Self { responses: Mutex::new(responses.into()) }
162
4
        }
163
    }
164
165
    #[async_trait]
166
    impl GhApi for MockGhApi {
167
0
        async fn get(&self, endpoint: &str, repo_root: &Path, label: &ChannelLabel) -> Result<String, String> {
168
            self.get_with_headers(endpoint, repo_root, label).await.map(|r| r.body)
169
0
        }
170
4
        async fn get_with_headers(&self, _endpoint: &str, _repo_root: &Path, _label: &ChannelLabel) -> Result<GhApiResponse, String> {
171
            self.responses.lock().unwrap().pop_front().expect("MockGhApi: no more responses")
172
4
        }
173
    }
174
175
4
    fn mock_tracker(responses: Vec<Result<GhApiResponse, String>>) -> GitHubIssueTracker {
176
4
        let api = Arc::new(MockGhApi::new(responses));
177
4
        let runner = Arc::new(MockRunner::new(vec![]));
178
4
        GitHubIssueTracker::new("github".into(), "owner/repo".into(), api, runner)
179
4
    }
180
181
    use std::path::PathBuf;
182
183
    use crate::providers::replay::{
184
        Masks, {self},
185
    };
186
187
1
    fn fixture(name: &str) -> String {
188
1
        crate::providers::testing::fixture_path("issue_tracker", name)
189
1
    }
190
191
    #[tokio::test]
192
1
    async fn record_replay_list_issues() {
193
1
        let repo_slug = "rjwittams/flotilla".to_string();
194
195
1
        let session = replay::test_session(&fixture("github_issues.yaml"), Masks::new());
196
1
        let repo_root = if session.is_live() { 
repo_root_for_recording0
() } else { PathBuf::from("/test/repo") };
197
1
        let (api, runner) = build_api_and_runner(&session);
198
199
1
        let tracker = GitHubIssueTracker::new("github".into(), repo_slug, api, runner);
200
1
        let issues = tracker.list_issues(&repo_root, 30).await.unwrap();
201
202
        // The repo has open issues, so we expect a non-empty result
203
1
        assert!(!issues.is_empty(), "expected at least one open issue in rjwittams/flotilla");
204
30
        
for (1
id, issue) in
&issues1
{
205
30
            assert!(!id.is_empty());
206
30
            assert!(!issue.title.is_empty());
207
1
            // Each issue should have an association key matching its id
208
30
            assert!(
209
30
                issue.association_keys.contains(&AssociationKey::IssueRef("github".into(), id.clone())),
210
1
                "issue {} missing expected association key",
211
1
                id
212
1
            );
213
1
        }
214
1
215
1
        session.finish();
216
1
    }
217
218
    #[test]
219
1
    fn parse_rest_api_issues_filters_pull_requests() {
220
1
        let json = r#"[
221
1
            {"number": 1, "title": "Bug report", "labels": [{"name": "bug"}]},
222
1
            {"number": 2, "title": "Feature PR", "labels": [], "pull_request": {"url": "..."}},
223
1
            {"number": 3, "title": "Enhancement", "labels": [{"name": "enhancement"}]}
224
1
        ]"#;
225
1
        let items: Vec<serde_json::Value> = serde_json::from_str(json).unwrap();
226
3
        let 
filtered1
:
Vec<&serde_json::Value>1
=
items.iter()1
.
filter1
(|v| !v.as_object().unwrap().contains_key("pull_request")).
collect1
();
227
1
        assert_eq!(filtered.len(), 2);
228
1
        assert_eq!(filtered[0]["number"], 1);
229
1
        assert_eq!(filtered[1]["number"], 3);
230
1
    }
231
232
    #[tokio::test]
233
1
    async fn changed_since_partitions_open_and_closed() {
234
1
        let body = r#"[
235
1
            {"number": 1, "title": "Open issue", "state": "open", "labels": []},
236
1
            {"number": 2, "title": "Closed issue", "state": "closed", "labels": []},
237
1
            {"number": 3, "title": "Another open", "state": "open", "labels": []}
238
1
        ]"#;
239
1
        let tracker = mock_tracker(vec![Ok(GhApiResponse {
240
1
            status: 200,
241
1
            etag: None,
242
1
            body: body.to_string(),
243
1
            has_next_page: false,
244
1
            total_count: None,
245
1
        })]);
246
247
1
        let changeset = tracker.list_issues_changed_since(Path::new("/tmp/repo"), "2026-03-09T00:00:00Z", 50).await.unwrap();
248
249
1
        assert_eq!(changeset.updated.len(), 2);
250
1
        assert_eq!(changeset.updated[0].0, "1");
251
1
        assert_eq!(changeset.updated[1].0, "3");
252
1
        assert_eq!(changeset.closed_ids, vec!["2"]);
253
1
        assert!(!changeset.has_more);
254
1
    }
255
256
    #[tokio::test]
257
1
    async fn changed_since_filters_pull_requests() {
258
1
        let body = r#"[
259
1
            {"number": 1, "title": "Issue", "state": "open", "labels": []},
260
1
            {"number": 2, "title": "PR", "state": "open", "labels": [], "pull_request": {"url": "..."}}
261
1
        ]"#;
262
1
        let tracker = mock_tracker(vec![Ok(GhApiResponse {
263
1
            status: 200,
264
1
            etag: None,
265
1
            body: body.to_string(),
266
1
            has_next_page: false,
267
1
            total_count: None,
268
1
        })]);
269
270
1
        let changeset = tracker.list_issues_changed_since(Path::new("/tmp/repo"), "2026-03-09T00:00:00Z", 50).await.unwrap();
271
272
1
        assert_eq!(changeset.updated.len(), 1);
273
1
        assert_eq!(changeset.updated[0].0, "1");
274
1
        assert!(changeset.closed_ids.is_empty());
275
1
    }
276
277
    #[tokio::test]
278
1
    async fn changed_since_has_more_ignores_pr_heavy_pages() {
279
        // Page is full (has_next_page) but all items are PRs — has_more should be false
280
        // because the filtered issue count is 0, not >= per_page.
281
1
        let body = r#"[
282
1
            {"number": 10, "title": "PR A", "state": "open", "labels": [], "pull_request": {"url": "..."}},
283
1
            {"number": 11, "title": "PR B", "state": "open", "labels": [], "pull_request": {"url": "..."}}
284
1
        ]"#;
285
1
        let tracker = mock_tracker(vec![Ok(GhApiResponse {
286
1
            status: 200,
287
1
            etag: None,
288
1
            body: body.to_string(),
289
1
            has_next_page: true,
290
1
            total_count: None,
291
1
        })]);
292
293
1
        let changeset = tracker.list_issues_changed_since(Path::new("/tmp/repo"), "2026-03-09T00:00:00Z", 2).await.unwrap();
294
295
1
        assert!(changeset.updated.is_empty());
296
1
        assert!(changeset.closed_ids.is_empty());
297
1
        assert!(!changeset.has_more, "should not escalate when all items are PRs");
298
1
    }
299
300
    #[tokio::test]
301
1
    async fn changed_since_escalates_on_mixed_pr_issue_page() {
302
        // Page has both PRs and issues with has_next_page — should escalate
303
        // because remaining pages may contain more issues.
304
1
        let body = r#"[
305
1
            {"number": 1, "title": "Issue", "state": "open", "labels": []},
306
1
            {"number": 2, "title": "PR", "state": "open", "labels": [], "pull_request": {"url": "..."}}
307
1
        ]"#;
308
1
        let tracker = mock_tracker(vec![Ok(GhApiResponse {
309
1
            status: 200,
310
1
            etag: None,
311
1
            body: body.to_string(),
312
1
            has_next_page: true,
313
1
            total_count: None,
314
1
        })]);
315
316
1
        let changeset = tracker.list_issues_changed_since(Path::new("/tmp/repo"), "2026-03-09T00:00:00Z", 2).await.unwrap();
317
318
1
        assert_eq!(changeset.updated.len(), 1);
319
1
        assert!(changeset.has_more, "should escalate when page has issues and more pages exist");
320
1
    }
321
}