crates/flotilla-core/src/providers/issue_tracker/github.rs
Line | Count | Source |
1 | | use std::{path::Path, sync::Arc}; |
2 | | |
3 | | use async_trait::async_trait; |
4 | | |
5 | | use crate::providers::{ |
6 | | gh_api_get, gh_api_get_with_headers, |
7 | | github_api::{clamp_per_page, GhApi}, |
8 | | run, |
9 | | types::*, |
10 | | CommandRunner, |
11 | | }; |
12 | | |
13 | | pub struct GitHubIssueTracker { |
14 | | provider_name: String, |
15 | | repo_slug: String, |
16 | | api: Arc<dyn GhApi>, |
17 | | runner: Arc<dyn CommandRunner>, |
18 | | } |
19 | | |
20 | | impl GitHubIssueTracker { |
21 | 6 | pub fn new(provider_name: String, repo_slug: String, api: Arc<dyn GhApi>, runner: Arc<dyn CommandRunner>) -> Self { |
22 | 6 | Self { provider_name, repo_slug, api, runner } |
23 | 6 | } |
24 | | } |
25 | | |
26 | 46 | pub(crate) fn parse_issue(provider_name: &str, v: &serde_json::Value) -> Option<(String, Issue)> { |
27 | 46 | let number = v["number"].as_i64()?0 ; |
28 | 46 | let title = v["title"].as_str()?0 .to_string(); |
29 | 46 | let labels: Vec<String> = v["labels"] |
30 | 46 | .as_array() |
31 | 46 | .map(|arr| arr.iter().filter_map(|l| l["name"]26 .as_str26 ().map26 (|s| s26 .to_string26 ())).collect()) |
32 | 46 | .unwrap_or_default(); |
33 | 46 | let id = number.to_string(); |
34 | 46 | let association_keys = vec![AssociationKey::IssueRef(provider_name.to_string(), id.clone())]; |
35 | 46 | Some((id, Issue { title, labels, association_keys, provider_name: provider_name.to_string(), provider_display_name: "GitHub".into() })) |
36 | 46 | } |
37 | | |
38 | | #[async_trait] |
39 | | impl super::IssueProvider for GitHubIssueTracker { |
40 | 1 | async fn list_issues(&self, repo_root: &Path, limit: usize) -> Result<Vec<(String, Issue)>, String> { |
41 | | let page = self.list_issues_page(repo_root, 1, limit).await?; |
42 | | Ok(page.issues) |
43 | 1 | } |
44 | | |
45 | 1 | async fn list_issues_page(&self, repo_root: &Path, page: u32, per_page: usize) -> Result<IssuePage, String> { |
46 | | let per_page = clamp_per_page(per_page); |
47 | | let endpoint = format!("repos/{}/issues?state=open&per_page={}&page={}", self.repo_slug, per_page, page); |
48 | | let response = gh_api_get_with_headers!(self.api, &endpoint, repo_root)?; |
49 | 0 | let items: Vec<serde_json::Value> = serde_json::from_str(&response.body).map_err(|e| e.to_string())?; |
50 | | |
51 | | let issues: Vec<(String, Issue)> = items |
52 | | .into_iter() |
53 | 30 | .filter(|v| !v.as_object().map(|o| o.contains_key("pull_request")).unwrap_or(false)) |
54 | 30 | .filter_map(|v| parse_issue(&self.provider_name, &v)) |
55 | | .collect(); |
56 | | |
57 | | Ok(IssuePage { issues, total_count: None, has_more: response.has_next_page }) |
58 | 1 | } |
59 | | |
60 | 0 | async fn fetch_issues_by_id(&self, repo_root: &Path, ids: &[String]) -> Result<Vec<(String, Issue)>, String> { |
61 | | use futures::stream::{ |
62 | | StreamExt, {self}, |
63 | | }; |
64 | | let futs: Vec<_> = ids |
65 | | .iter() |
66 | 0 | .map(|id| { |
67 | 0 | let endpoint = format!("repos/{}/issues/{}", self.repo_slug, id); |
68 | 0 | let api = Arc::clone(&self.api); |
69 | 0 | let repo_root = repo_root.to_path_buf(); |
70 | 0 | let provider_name = self.provider_name.clone(); |
71 | 0 | let id = id.clone(); |
72 | 0 | async move { |
73 | 0 | let body = gh_api_get!(api, &endpoint, &repo_root)?; |
74 | 0 | let v: serde_json::Value = serde_json::from_str(&body).map_err(|e| e.to_string())?; |
75 | 0 | parse_issue(&provider_name, &v).ok_or_else(|| format!("failed to parse issue {}", id)) |
76 | 0 | } |
77 | 0 | }) |
78 | | .collect(); |
79 | | |
80 | | let results: Vec<_> = stream::iter(futs).buffer_unordered(10).collect().await; |
81 | | let mut issues = Vec::new(); |
82 | | for result in results { |
83 | | match result { |
84 | | Ok(issue) => issues.push(issue), |
85 | | Err(e) => tracing::warn!(provider = "github", err = %e, "failed to fetch issue"), |
86 | | } |
87 | | } |
88 | | Ok(issues) |
89 | 0 | } |
90 | | |
91 | 0 | async fn search_issues(&self, repo_root: &Path, query: &str, limit: usize) -> Result<Vec<(String, Issue)>, String> { |
92 | | let per_page = clamp_per_page(limit); |
93 | | let raw_query = format!("repo:{} is:issue is:open {}", self.repo_slug, query); |
94 | | let encoded_query = urlencoding::encode(&raw_query); |
95 | | let endpoint = format!("search/issues?q={}&per_page={}", encoded_query, per_page); |
96 | | let body = gh_api_get!(self.api, &endpoint, repo_root)?; |
97 | 0 | let response: serde_json::Value = serde_json::from_str(&body).map_err(|e| e.to_string())?; |
98 | | |
99 | | let items = response["items"].as_array().ok_or("no items array in search response")?; |
100 | 0 | Ok(items.iter().filter_map(|v| parse_issue(&self.provider_name, v)).collect()) |
101 | 0 | } |
102 | | |
103 | 4 | async fn list_issues_changed_since(&self, repo_root: &Path, since: &str, per_page: usize) -> Result<IssueChangeset, String> { |
104 | | let per_page = clamp_per_page(per_page); |
105 | | let encoded_since = urlencoding::encode(since); |
106 | | let endpoint = |
107 | | format!("repos/{}/issues?state=all&since={}&sort=updated&direction=desc&per_page={}", self.repo_slug, encoded_since, per_page); |
108 | | let response = gh_api_get_with_headers!(self.api, &endpoint, repo_root)?; |
109 | 0 | let items: Vec<serde_json::Value> = serde_json::from_str(&response.body).map_err(|e| e.to_string())?; |
110 | | |
111 | | let mut updated = Vec::new(); |
112 | | let mut closed_ids = Vec::new(); |
113 | | |
114 | | for v in &items { |
115 | 9 | if v.as_object().map(|o| o.contains_key("pull_request")).unwrap_or(false) { |
116 | | continue; |
117 | | } |
118 | | let state = v["state"].as_str().unwrap_or("open"); |
119 | | if state == "open" { |
120 | | if let Some(issue) = parse_issue(&self.provider_name, v) { |
121 | | updated.push(issue); |
122 | | } |
123 | | } else if let Some(number) = v["number"].as_i64() { |
124 | | closed_ids.push(number.to_string()); |
125 | | } |
126 | | } |
127 | | |
128 | | // Escalate when there are more pages AND at least one real issue |
129 | | // on this page — remaining pages likely contain more issues too. |
130 | | // When issue_count == 0 (all PRs), don't escalate: there are no |
131 | | // issue changes to miss. |
132 | | let issue_count = updated.len() + closed_ids.len(); |
133 | | Ok(IssueChangeset { updated, closed_ids, has_more: response.has_next_page && issue_count > 0 }) |
134 | 4 | } |
135 | | |
136 | 0 | async fn open_in_browser(&self, repo_root: &Path, id: &str) -> Result<(), String> { |
137 | | run!(self.runner, "gh", &["issue", "view", id, "--web"], repo_root)?; |
138 | | Ok(()) |
139 | 0 | } |
140 | | } |
141 | | |
142 | | #[cfg(test)] |
143 | | mod tests { |
144 | | use std::{collections::VecDeque, sync::Mutex}; |
145 | | |
146 | | use super::*; |
147 | | use crate::providers::{ |
148 | | github_api::{GhApi, GhApiResponse}, |
149 | | github_test_support::{build_api_and_runner, repo_root_for_recording}, |
150 | | issue_tracker::IssueProvider, |
151 | | testing::MockRunner, |
152 | | ChannelLabel, |
153 | | }; |
154 | | |
155 | | struct MockGhApi { |
156 | | responses: Mutex<VecDeque<Result<GhApiResponse, String>>>, |
157 | | } |
158 | | |
159 | | impl MockGhApi { |
160 | 4 | fn new(responses: Vec<Result<GhApiResponse, String>>) -> Self { |
161 | 4 | Self { responses: Mutex::new(responses.into()) } |
162 | 4 | } |
163 | | } |
164 | | |
165 | | #[async_trait] |
166 | | impl GhApi for MockGhApi { |
167 | 0 | async fn get(&self, endpoint: &str, repo_root: &Path, label: &ChannelLabel) -> Result<String, String> { |
168 | | self.get_with_headers(endpoint, repo_root, label).await.map(|r| r.body) |
169 | 0 | } |
170 | 4 | async fn get_with_headers(&self, _endpoint: &str, _repo_root: &Path, _label: &ChannelLabel) -> Result<GhApiResponse, String> { |
171 | | self.responses.lock().unwrap().pop_front().expect("MockGhApi: no more responses") |
172 | 4 | } |
173 | | } |
174 | | |
175 | 4 | fn mock_tracker(responses: Vec<Result<GhApiResponse, String>>) -> GitHubIssueTracker { |
176 | 4 | let api = Arc::new(MockGhApi::new(responses)); |
177 | 4 | let runner = Arc::new(MockRunner::new(vec![])); |
178 | 4 | GitHubIssueTracker::new("github".into(), "owner/repo".into(), api, runner) |
179 | 4 | } |
180 | | |
181 | | use std::path::PathBuf; |
182 | | |
183 | | use crate::providers::replay::{ |
184 | | Masks, {self}, |
185 | | }; |
186 | | |
187 | 1 | fn fixture(name: &str) -> String { |
188 | 1 | crate::providers::testing::fixture_path("issue_tracker", name) |
189 | 1 | } |
190 | | |
191 | | #[tokio::test] |
192 | 1 | async fn record_replay_list_issues() { |
193 | 1 | let repo_slug = "rjwittams/flotilla".to_string(); |
194 | | |
195 | 1 | let session = replay::test_session(&fixture("github_issues.yaml"), Masks::new()); |
196 | 1 | let repo_root = if session.is_live() { repo_root_for_recording0 () } else { PathBuf::from("/test/repo") }; |
197 | 1 | let (api, runner) = build_api_and_runner(&session); |
198 | | |
199 | 1 | let tracker = GitHubIssueTracker::new("github".into(), repo_slug, api, runner); |
200 | 1 | let issues = tracker.list_issues(&repo_root, 30).await.unwrap(); |
201 | | |
202 | | // The repo has open issues, so we expect a non-empty result |
203 | 1 | assert!(!issues.is_empty(), "expected at least one open issue in rjwittams/flotilla"); |
204 | 30 | for (1 id, issue) in &issues1 { |
205 | 30 | assert!(!id.is_empty()); |
206 | 30 | assert!(!issue.title.is_empty()); |
207 | 1 | // Each issue should have an association key matching its id |
208 | 30 | assert!( |
209 | 30 | issue.association_keys.contains(&AssociationKey::IssueRef("github".into(), id.clone())), |
210 | 1 | "issue {} missing expected association key", |
211 | 1 | id |
212 | 1 | ); |
213 | 1 | } |
214 | 1 | |
215 | 1 | session.finish(); |
216 | 1 | } |
217 | | |
218 | | #[test] |
219 | 1 | fn parse_rest_api_issues_filters_pull_requests() { |
220 | 1 | let json = r#"[ |
221 | 1 | {"number": 1, "title": "Bug report", "labels": [{"name": "bug"}]}, |
222 | 1 | {"number": 2, "title": "Feature PR", "labels": [], "pull_request": {"url": "..."}}, |
223 | 1 | {"number": 3, "title": "Enhancement", "labels": [{"name": "enhancement"}]} |
224 | 1 | ]"#; |
225 | 1 | let items: Vec<serde_json::Value> = serde_json::from_str(json).unwrap(); |
226 | 3 | let filtered1 : Vec<&serde_json::Value>1 = items.iter()1 .filter1 (|v| !v.as_object().unwrap().contains_key("pull_request")).collect1 (); |
227 | 1 | assert_eq!(filtered.len(), 2); |
228 | 1 | assert_eq!(filtered[0]["number"], 1); |
229 | 1 | assert_eq!(filtered[1]["number"], 3); |
230 | 1 | } |
231 | | |
232 | | #[tokio::test] |
233 | 1 | async fn changed_since_partitions_open_and_closed() { |
234 | 1 | let body = r#"[ |
235 | 1 | {"number": 1, "title": "Open issue", "state": "open", "labels": []}, |
236 | 1 | {"number": 2, "title": "Closed issue", "state": "closed", "labels": []}, |
237 | 1 | {"number": 3, "title": "Another open", "state": "open", "labels": []} |
238 | 1 | ]"#; |
239 | 1 | let tracker = mock_tracker(vec![Ok(GhApiResponse { |
240 | 1 | status: 200, |
241 | 1 | etag: None, |
242 | 1 | body: body.to_string(), |
243 | 1 | has_next_page: false, |
244 | 1 | total_count: None, |
245 | 1 | })]); |
246 | | |
247 | 1 | let changeset = tracker.list_issues_changed_since(Path::new("/tmp/repo"), "2026-03-09T00:00:00Z", 50).await.unwrap(); |
248 | | |
249 | 1 | assert_eq!(changeset.updated.len(), 2); |
250 | 1 | assert_eq!(changeset.updated[0].0, "1"); |
251 | 1 | assert_eq!(changeset.updated[1].0, "3"); |
252 | 1 | assert_eq!(changeset.closed_ids, vec!["2"]); |
253 | 1 | assert!(!changeset.has_more); |
254 | 1 | } |
255 | | |
256 | | #[tokio::test] |
257 | 1 | async fn changed_since_filters_pull_requests() { |
258 | 1 | let body = r#"[ |
259 | 1 | {"number": 1, "title": "Issue", "state": "open", "labels": []}, |
260 | 1 | {"number": 2, "title": "PR", "state": "open", "labels": [], "pull_request": {"url": "..."}} |
261 | 1 | ]"#; |
262 | 1 | let tracker = mock_tracker(vec![Ok(GhApiResponse { |
263 | 1 | status: 200, |
264 | 1 | etag: None, |
265 | 1 | body: body.to_string(), |
266 | 1 | has_next_page: false, |
267 | 1 | total_count: None, |
268 | 1 | })]); |
269 | | |
270 | 1 | let changeset = tracker.list_issues_changed_since(Path::new("/tmp/repo"), "2026-03-09T00:00:00Z", 50).await.unwrap(); |
271 | | |
272 | 1 | assert_eq!(changeset.updated.len(), 1); |
273 | 1 | assert_eq!(changeset.updated[0].0, "1"); |
274 | 1 | assert!(changeset.closed_ids.is_empty()); |
275 | 1 | } |
276 | | |
277 | | #[tokio::test] |
278 | 1 | async fn changed_since_has_more_ignores_pr_heavy_pages() { |
279 | | // Page is full (has_next_page) but all items are PRs — has_more should be false |
280 | | // because the filtered issue count is 0, not >= per_page. |
281 | 1 | let body = r#"[ |
282 | 1 | {"number": 10, "title": "PR A", "state": "open", "labels": [], "pull_request": {"url": "..."}}, |
283 | 1 | {"number": 11, "title": "PR B", "state": "open", "labels": [], "pull_request": {"url": "..."}} |
284 | 1 | ]"#; |
285 | 1 | let tracker = mock_tracker(vec![Ok(GhApiResponse { |
286 | 1 | status: 200, |
287 | 1 | etag: None, |
288 | 1 | body: body.to_string(), |
289 | 1 | has_next_page: true, |
290 | 1 | total_count: None, |
291 | 1 | })]); |
292 | | |
293 | 1 | let changeset = tracker.list_issues_changed_since(Path::new("/tmp/repo"), "2026-03-09T00:00:00Z", 2).await.unwrap(); |
294 | | |
295 | 1 | assert!(changeset.updated.is_empty()); |
296 | 1 | assert!(changeset.closed_ids.is_empty()); |
297 | 1 | assert!(!changeset.has_more, "should not escalate when all items are PRs"); |
298 | 1 | } |
299 | | |
300 | | #[tokio::test] |
301 | 1 | async fn changed_since_escalates_on_mixed_pr_issue_page() { |
302 | | // Page has both PRs and issues with has_next_page — should escalate |
303 | | // because remaining pages may contain more issues. |
304 | 1 | let body = r#"[ |
305 | 1 | {"number": 1, "title": "Issue", "state": "open", "labels": []}, |
306 | 1 | {"number": 2, "title": "PR", "state": "open", "labels": [], "pull_request": {"url": "..."}} |
307 | 1 | ]"#; |
308 | 1 | let tracker = mock_tracker(vec![Ok(GhApiResponse { |
309 | 1 | status: 200, |
310 | 1 | etag: None, |
311 | 1 | body: body.to_string(), |
312 | 1 | has_next_page: true, |
313 | 1 | total_count: None, |
314 | 1 | })]); |
315 | | |
316 | 1 | let changeset = tracker.list_issues_changed_since(Path::new("/tmp/repo"), "2026-03-09T00:00:00Z", 2).await.unwrap(); |
317 | | |
318 | 1 | assert_eq!(changeset.updated.len(), 1); |
319 | 1 | assert!(changeset.has_more, "should escalate when page has issues and more pages exist"); |
320 | 1 | } |
321 | | } |