From 50068653801991e67487f1b555b83f9232a3bd48 Mon Sep 17 00:00:00 2001 From: pennae Date: Sat, 28 May 2022 15:14:52 +0200 Subject: initial commit --- src/github.rs | 243 +++++++++++++++++++++++++++++++++++++ src/issues.graphql | 26 ++++ src/main.rs | 350 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/pulls.graphql | 28 +++++ src/types.rs | 72 +++++++++++ 5 files changed, 719 insertions(+) create mode 100644 src/github.rs create mode 100644 src/issues.graphql create mode 100644 src/main.rs create mode 100644 src/pulls.graphql create mode 100644 src/types.rs (limited to 'src') diff --git a/src/github.rs b/src/github.rs new file mode 100644 index 0000000..6e499be --- /dev/null +++ b/src/github.rs @@ -0,0 +1,243 @@ +use std::fmt::Debug; + +use anyhow::{bail, Result}; +use chrono::Duration; +use graphql_client::{reqwest::post_graphql_blocking as post_graphql, GraphQLQuery}; + +use crate::types::{DateTime, Issue, PullRequest, HTML, URI}; + +const API_URL: &str = "https://api.github.com/graphql"; + +type Cursor = String; + +pub struct Github { + client: reqwest::blocking::Client, + owner: String, + repo: String, + label: String, +} + +trait ChunkedQuery: GraphQLQuery { + type Item; + + fn change_after(&self, v: Self::Variables, after: Option) -> Self::Variables; + fn set_batch(&self, batch: i64, v: Self::Variables) -> Self::Variables; + + fn process(&self, d: Self::ResponseData) -> Result<(Vec, Option)>; +} + +#[derive(Debug, GraphQLQuery)] +#[graphql( + schema_path = "vendor/github.com/schema.docs.graphql", + query_path = "src/issues.graphql", + response_derives = "Debug", + variables_derives = "Clone,Debug" +)] +pub struct IssuesQuery; + +impl ChunkedQuery for IssuesQuery { + type Item = Issue; + + fn change_after(&self, v: Self::Variables, after: Option) -> Self::Variables { + Self::Variables { after, ..v } + } + fn set_batch(&self, batch: i64, v: Self::Variables) -> Self::Variables { + Self::Variables { batch, ..v } + } + + fn process(&self, d: Self::ResponseData) -> Result<(Vec, Option)> { + debug!("rate limits: {:?}", d.rate_limit); + let issues = match d.repository { + Some(r) => r.issues, + None => bail!("query returned no repo"), + }; + // deliberately ignore all nulls. no idea why the schema doesn't make + // all of these links mandatory, having them nullable makes no sense. + let infos = issues + .edges + .unwrap_or_default() + .into_iter() + .filter_map(|e| e?.node) + .map(|n| Issue { + id: n.id, + title: n.title, + is_open: !n.closed, + body: n.body_html, + last_update: n.updated_at, + url: n.url, + }) + .collect(); + let cursor = if issues.page_info.has_next_page { + issues.page_info.end_cursor + } else { + None + }; + Ok((infos, cursor)) + } +} + +#[derive(Debug, GraphQLQuery)] +#[graphql( + schema_path = "vendor/github.com/schema.docs.graphql", + query_path = "src/pulls.graphql", + response_derives = "Debug", + variables_derives = "Clone,Debug" +)] +pub struct PullsQuery { + since: Option, +} + +impl ChunkedQuery for PullsQuery { + type Item = PullRequest; + + fn change_after(&self, v: Self::Variables, after: Option) -> Self::Variables { + Self::Variables { after, ..v } + } + fn set_batch(&self, batch: i64, v: Self::Variables) -> Self::Variables { + Self::Variables { batch, ..v } + } + + fn process(&self, d: Self::ResponseData) -> Result<(Vec, Option)> { + debug!("rate limits: {:?}", d.rate_limit); + let prs = match d.repository { + Some(r) => r.pull_requests, + None => bail!("query returned no repo"), + }; + // deliberately ignore all nulls. no idea why the schema doesn't make + // all of these links mandatory, having them nullable makes no sense. + let infos: Vec = prs + .edges + .unwrap_or_default() + .into_iter() + .filter_map(|e| e?.node) + .map(|n| PullRequest { + id: n.id, + title: n.title, + is_open: !n.closed, + is_merged: n.merged, + body: n.body_html, + last_update: n.updated_at, + url: n.url, + base_ref: n.base_ref_name, + }) + .collect(); + let cursor = match (self.since, infos.last()) { + (Some(since), Some(last)) if last.last_update < since => None, + _ => { + if prs.page_info.has_next_page { + prs.page_info.end_cursor + } else { + None + } + } + }; + Ok((infos, cursor)) + } +} + +impl Github { + pub fn new(api_token: &str, owner: &str, repo: &str, label: &str) -> Result { + use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION}; + + let headers = match HeaderValue::from_str(&format!("Bearer {}", api_token)) { + Ok(h) => [(AUTHORIZATION, h)].into_iter().collect::(), + Err(e) => bail!("invalid API token: {}", e), + }; + let client = reqwest::blocking::Client::builder() + .user_agent(format!( + "{}/{}", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_VERSION") + )) + .default_headers(headers) + .build()?; + Ok(Github { + client, + owner: owner.to_string(), + repo: repo.to_string(), + label: label.to_string(), + }) + } + + fn query_raw(&self, q: Q, mut vars: ::Variables) -> Result> + where + Q: ChunkedQuery + Debug, + Q::Variables: Clone + Debug, + { + let mut result = vec![]; + let max_batch = 100; + let mut batch = max_batch; + + loop { + vars = q.set_batch(batch, vars); + + debug!("running query {:?} with {:?}", q, vars); + let started = chrono::Local::now(); + let resp = post_graphql::(&self.client, API_URL, vars.clone())?; + let ended = chrono::Local::now(); + + // queries may time out. if that happens throttle the query once and try + // again, if that fails too we fail for good. + let resp = match resp.errors { + None => { + // time limit is 10 seconds. if we're well under that, increase + // the batch size again. + if batch != max_batch && ended - started < Duration::seconds(8) { + batch = (batch + batch / 10 + 1).min(max_batch); + info!("increasing batch size to {}", batch); + } + resp + } + Some(e) if batch > 1 && e.iter().all(|e| e.message.contains("timeout")) => { + warn!("throttling query due to timeout error: {:?}", e); + // anything larger than 1 seems to be unreliable here + batch = 1; + info!("new batch size: {}", batch); + continue; + } + Some(e) => bail!("query failed: {:?}", e), + }; + + match resp.data { + Some(d) => { + let (mut items, cursor) = q.process(d)?; + result.append(&mut items); + match cursor { + None => break, + cursor => vars = q.change_after(vars, cursor), + } + } + None => bail!("query returned no data"), + } + } + + Ok(result) + } + + pub fn query_issues(&self, since: Option) -> Result> { + self.query_raw( + IssuesQuery, + issues_query::Variables { + owner: self.owner.clone(), + name: self.repo.clone(), + label: self.label.clone(), + after: None, + since, + batch: 100, + }, + ) + } + + pub fn query_pulls(&self, since: Option) -> Result> { + self.query_raw( + PullsQuery { since }, + pulls_query::Variables { + owner: self.owner.clone(), + name: self.repo.clone(), + label: self.label.clone(), + after: None, + batch: 100, + }, + ) + } +} diff --git a/src/issues.graphql b/src/issues.graphql new file mode 100644 index 0000000..93a4dc0 --- /dev/null +++ b/src/issues.graphql @@ -0,0 +1,26 @@ +query IssuesQuery($owner: String!, $name: String!, $label: String!, $after: String, $since: DateTime, $batch: Int!) { + rateLimit { + limit + cost + remaining + resetAt + } + repository(owner: $owner, name: $name) { + issues(first: $batch, after: $after, filterBy: { labels: [ $label ], since: $since }) { + pageInfo { + endCursor + hasNextPage + } + edges { + node { + id + bodyHTML + closed + title + updatedAt + url + } + } + } + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..7dba807 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,350 @@ +#[macro_use] +extern crate anyhow; +#[macro_use] +extern crate log; + +mod github; +mod types; + +use std::{ + collections::{btree_map::Entry, BTreeMap}, + env, + fs::File, + io::{BufReader, BufWriter}, + path::{Path, PathBuf}, +}; + +use anyhow::{Context, Result}; +use chrono::{Duration, Utc}; +use clap::{Args, Parser, Subcommand}; +use github::Github; +use rss::{Channel, ChannelBuilder, Guid, Item, ItemBuilder}; +use serde_json::{from_reader, to_writer}; +use tempfile::NamedTempFile; +use types::{DateTime, IssueAction, PullAction, State, STATE_VERSION}; + +#[derive(Parser)] +#[clap(version)] +/// Poll github issues and PRs by label and generate RSS feeds. +struct App { + #[clap(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +#[clap(about)] +enum Command { + /// Initialize a tracker state. + /// + /// Each tracker state applies to only one repository and only one label. + Init { + /// Path of the newly created state. + state_file: PathBuf, + /// Owner of the repository to query. + owner: String, + /// Name of the repository. + repo: String, + /// Name of the label to track. + label: String, + }, + /// Sync issues on a state. + SyncIssues(SyncArgs), + /// Sync pull requests on a state. + SyncPrs(SyncArgs), + /// Emit an RSS feed for issue changes. + EmitIssues(EmitArgs), + /// Emit an RSS feed for PR changes. + EmitPrs(EmitArgs), +} + +#[derive(Args)] +struct SyncArgs { + /// State to sync. + state_file: PathBuf, +} + +#[derive(Args)] +struct EmitArgs { + /// State to read. + state_file: PathBuf, + + #[clap(short, long = "max-age")] + /// How far to look back in history, in hours. + age_hours: u32, + + #[clap(short, long)] + /// Target file for the generated feed. Defaults to stdout. + out: Option, +} + +fn with_state(state_file: PathBuf, f: F) -> Result<()> +where + F: FnOnce(State) -> Result>, +{ + let old_state: State = from_reader(BufReader::new(File::open(&state_file)?))?; + if old_state.version != STATE_VERSION { + bail!( + "expected state version {}, got {}", + STATE_VERSION, + old_state.version + ); + } + + let new_state = f(old_state)?; + + if let Some(state) = new_state { + let new_state_file = NamedTempFile::new_in( + state_file + .ancestors() + .nth(1) + .unwrap_or_else(|| Path::new(".")), + )?; + + to_writer(BufWriter::new(&new_state_file), &state)?; + new_state_file.persist(state_file)?; + } + + Ok(()) +} + +fn with_state_and_github(state_file: PathBuf, f: F) -> Result<()> +where + F: FnOnce(State, &Github) -> Result>, +{ + let github_api_token = + env::var("GITHUB_API_TOKEN").context("failed to load GITHUB_API_TOKEN")?; + + with_state(state_file, |old_state| { + let client = github::Github::new( + &github_api_token, + &old_state.owner, + &old_state.repo, + &old_state.label, + )?; + + f(old_state, &client) + }) +} + +fn sync_issues(mut state: State, github: &github::Github) -> Result> { + let issues = github.query_issues(state.issues_updated)?; + + let mut new_history = vec![]; + + for updated in issues { + let issue_state = |is_new| match (updated.is_open, is_new) { + (true, _) => IssueAction::New, + (false, true) => IssueAction::NewClosed, + (false, false) => IssueAction::Closed, + }; + match state.issues.entry(updated.id.clone()) { + Entry::Occupied(mut e) => { + let stored = e.get_mut(); + if stored.is_open != updated.is_open { + new_history.push((updated.last_update, updated.id.clone(), issue_state(false))); + } + *stored = updated; + } + Entry::Vacant(e) => { + new_history.push((updated.last_update, updated.id.clone(), issue_state(true))); + e.insert(updated); + } + } + } + + new_history.sort_by(|a, b| (a.0, &a.1).cmp(&(b.0, &b.1))); + if let Some(&(at, _, _)) = new_history.last() { + state.issues_updated = Some(at); + } + state.issue_history.append(&mut new_history); + + Ok(Some(state)) +} + +fn sync_prs(mut state: State, github: &github::Github) -> Result> { + let prs = github.query_pulls(state.pull_requests_updated)?; + + let mut new_history = vec![]; + + for updated in prs { + let pr_state = |is_new| match (updated.is_open, updated.is_merged, is_new) { + (false, false, true) => PullAction::NewClosed, + (false, false, false) => PullAction::Closed, + (true, false, _) => PullAction::New, + (_, true, true) => PullAction::NewMerged, + (_, true, false) => PullAction::Merged, + }; + match state.pull_requests.entry(updated.id.clone()) { + Entry::Occupied(mut e) => { + let stored = e.get_mut(); + if (stored.is_open, stored.is_merged) != (updated.is_open, updated.is_merged) { + new_history.push((updated.last_update, updated.id.clone(), pr_state(false))); + } + *stored = updated; + } + Entry::Vacant(e) => { + new_history.push((updated.last_update, updated.id.clone(), pr_state(true))); + e.insert(updated); + } + } + } + + new_history.sort_by(|a, b| (a.0, &a.1).cmp(&(b.0, &b.1))); + if let Some(&(at, _, _)) = new_history.last() { + state.pull_requests_updated = Some(at); + } + state.pull_history.append(&mut new_history); + + Ok(Some(state)) +} + +fn format_history Item>( + items: &BTreeMap, + history: &[(DateTime, String, A)], + age_hours: u32, + format_entry: F, +) -> Result> { + let since = Utc::now() - Duration::hours(age_hours as i64); + + history + .iter() + .rev() + .take_while(|(changed, _, _)| changed >= &since) + .map(|(changed, id, how)| { + let entry = match items.get(id.as_str()) { + Some(i) => i, + None => bail!("database is corrupted (dangling key {})", id), + }; + Ok(Item { + guid: Some(Guid { + value: format!("{}/{}", changed.to_rfc3339(), id), + permalink: false, + }), + ..format_entry(entry, *changed, *how) + }) + }) + .collect::, _>>() +} + +fn new_rss_item(tag: &str, title: &str, url: &str, changed: DateTime, body: &str) -> Item { + ItemBuilder::default() + .title(Some(format!("{} {}", tag, title))) + .link(Some(url.to_string())) + .pub_date(Some(changed.to_rfc2822())) + .content(Some(body.to_string())) + .build() +} + +fn emit_issues(state: &State, age_hours: u32) -> Result { + let entries = format_history( + &state.issues, + &state.issue_history, + age_hours, + |issue, changed, how| { + let tag = match how { + IssueAction::New => "[NEW]", + IssueAction::Closed => "[CLOSED]", + IssueAction::NewClosed => "[NEW][CLOSED]", + }; + new_rss_item(tag, &issue.title, &issue.url, changed, &issue.body) + }, + )?; + + let channel = ChannelBuilder::default() + .title(format!( + "Issues labeled `{}' in {}/{}", + state.label, state.owner, state.repo + )) + .items(entries) + .build(); + + Ok(channel) +} + +fn emit_prs(state: &State, age_hours: u32) -> Result { + let entries = format_history( + &state.pull_requests, + &state.pull_history, + age_hours, + |pr, changed, how| { + let tag = match how { + PullAction::New => "[NEW]", + PullAction::NewMerged => "[NEW][MERGED]", + PullAction::Closed => "[CLOSED]", + PullAction::NewClosed => "[NEW][CLOSED]", + PullAction::Merged => "[MERGED]", + }; + let info = format!("{}({})", tag, pr.base_ref); + new_rss_item(&info, &pr.title, &pr.url, changed, &pr.body) + }, + )?; + + let channel = ChannelBuilder::default() + .title(format!( + "Pull requests labeled `{}' in {}/{}", + state.label, state.owner, state.repo + )) + .items(entries) + .build(); + + Ok(channel) +} + +fn write_feed(to: Option, channel: Channel) -> Result> { + match to { + Some(to) => { + let new_file = + NamedTempFile::new_in(to.ancestors().nth(1).unwrap_or_else(|| Path::new(".")))?; + + channel.write_to(BufWriter::new(&new_file))?; + new_file.persist(to)?; + } + None => println!("{}", channel.to_string()), + } + Ok(None) +} + +fn main() -> Result<()> { + pretty_env_logger::init(); + + match App::parse().command { + Command::Init { + state_file, + owner, + repo, + label, + } => { + let state = State { + version: STATE_VERSION, + owner, + repo, + label, + ..State::default() + }; + + let file = File::options() + .create_new(true) + .write(true) + .open(state_file)?; + to_writer(file, &state)?; + } + Command::SyncIssues(cmd) => { + with_state_and_github(cmd.state_file, sync_issues)?; + } + Command::SyncPrs(cmd) => { + with_state_and_github(cmd.state_file, sync_prs)?; + } + Command::EmitIssues(cmd) => { + with_state(cmd.state_file, |s| { + write_feed(cmd.out, emit_issues(&s, cmd.age_hours)?) + })?; + } + Command::EmitPrs(cmd) => { + with_state(cmd.state_file, |s| { + write_feed(cmd.out, emit_prs(&s, cmd.age_hours)?) + })?; + } + }; + + Ok(()) +} diff --git a/src/pulls.graphql b/src/pulls.graphql new file mode 100644 index 0000000..a25f247 --- /dev/null +++ b/src/pulls.graphql @@ -0,0 +1,28 @@ +query PullsQuery($owner: String!, $name: String!, $label: String!, $after: String, $batch: Int!) { + rateLimit { + limit + cost + remaining + resetAt + } + repository(owner: $owner, name: $name) { + pullRequests(first: $batch, after: $after, labels: [ $label ], orderBy: { direction: DESC, field: UPDATED_AT }) { + pageInfo { + endCursor + hasNextPage + } + edges { + node { + id + bodyHTML + closed + merged + baseRefName + title + updatedAt + url + } + } + } + } +} \ No newline at end of file diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..a960647 --- /dev/null +++ b/src/types.rs @@ -0,0 +1,72 @@ +#![allow(non_camel_case_types)] + +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +pub type DateTime = chrono::DateTime; +#[allow(clippy::upper_case_acronyms)] +pub type HTML = String; +#[allow(clippy::upper_case_acronyms)] +pub type URI = String; + +pub const STATE_VERSION: u32 = 1; + +#[derive(Default, Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct State { + pub version: u32, + pub owner: String, + pub repo: String, + pub label: String, + pub issues_updated: Option, + pub issues: BTreeMap, + pub issue_history: Vec<(DateTime, String, IssueAction)>, + pub pull_requests_updated: Option, + pub pull_requests: BTreeMap, + pub pull_history: Vec<(DateTime, String, PullAction)>, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Issue { + #[serde(skip)] + pub id: String, + pub title: String, + pub is_open: bool, + pub body: String, + pub last_update: DateTime, + pub url: String, +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub enum IssueAction { + New, + Closed, + NewClosed, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct PullRequest { + #[serde(skip)] + pub id: String, + pub title: String, + pub is_open: bool, + pub is_merged: bool, + pub body: String, + pub last_update: DateTime, + pub url: String, + pub base_ref: String, +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub enum PullAction { + New, + Closed, + NewClosed, + Merged, + NewMerged, +} -- cgit v1.2.3