summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/github.rs243
-rw-r--r--src/issues.graphql26
-rw-r--r--src/main.rs350
-rw-r--r--src/pulls.graphql28
-rw-r--r--src/types.rs72
5 files changed, 719 insertions, 0 deletions
diff --git a/src/github.rs b/src/github.rs
new file mode 100644
index 0000000..6e499be
--- /dev/null
+++ b/src/github.rs
@@ -0,0 +1,243 @@
+use std::fmt::Debug;
+
+use anyhow::{bail, Result};
+use chrono::Duration;
+use graphql_client::{reqwest::post_graphql_blocking as post_graphql, GraphQLQuery};
+
+use crate::types::{DateTime, Issue, PullRequest, HTML, URI};
+
+const API_URL: &str = "https://api.github.com/graphql";
+
+type Cursor = String;
+
+pub struct Github {
+ client: reqwest::blocking::Client,
+ owner: String,
+ repo: String,
+ label: String,
+}
+
+trait ChunkedQuery: GraphQLQuery {
+ type Item;
+
+ fn change_after(&self, v: Self::Variables, after: Option<String>) -> Self::Variables;
+ fn set_batch(&self, batch: i64, v: Self::Variables) -> Self::Variables;
+
+ fn process(&self, d: Self::ResponseData) -> Result<(Vec<Self::Item>, Option<Cursor>)>;
+}
+
+#[derive(Debug, GraphQLQuery)]
+#[graphql(
+ schema_path = "vendor/github.com/schema.docs.graphql",
+ query_path = "src/issues.graphql",
+ response_derives = "Debug",
+ variables_derives = "Clone,Debug"
+)]
+pub struct IssuesQuery;
+
+impl ChunkedQuery for IssuesQuery {
+ type Item = Issue;
+
+ fn change_after(&self, v: Self::Variables, after: Option<String>) -> Self::Variables {
+ Self::Variables { after, ..v }
+ }
+ fn set_batch(&self, batch: i64, v: Self::Variables) -> Self::Variables {
+ Self::Variables { batch, ..v }
+ }
+
+ fn process(&self, d: Self::ResponseData) -> Result<(Vec<Self::Item>, Option<Cursor>)> {
+ debug!("rate limits: {:?}", d.rate_limit);
+ let issues = match d.repository {
+ Some(r) => r.issues,
+ None => bail!("query returned no repo"),
+ };
+ // deliberately ignore all nulls. no idea why the schema doesn't make
+ // all of these links mandatory, having them nullable makes no sense.
+ let infos = issues
+ .edges
+ .unwrap_or_default()
+ .into_iter()
+ .filter_map(|e| e?.node)
+ .map(|n| Issue {
+ id: n.id,
+ title: n.title,
+ is_open: !n.closed,
+ body: n.body_html,
+ last_update: n.updated_at,
+ url: n.url,
+ })
+ .collect();
+ let cursor = if issues.page_info.has_next_page {
+ issues.page_info.end_cursor
+ } else {
+ None
+ };
+ Ok((infos, cursor))
+ }
+}
+
+#[derive(Debug, GraphQLQuery)]
+#[graphql(
+ schema_path = "vendor/github.com/schema.docs.graphql",
+ query_path = "src/pulls.graphql",
+ response_derives = "Debug",
+ variables_derives = "Clone,Debug"
+)]
+pub struct PullsQuery {
+ since: Option<DateTime>,
+}
+
+impl ChunkedQuery for PullsQuery {
+ type Item = PullRequest;
+
+ fn change_after(&self, v: Self::Variables, after: Option<String>) -> Self::Variables {
+ Self::Variables { after, ..v }
+ }
+ fn set_batch(&self, batch: i64, v: Self::Variables) -> Self::Variables {
+ Self::Variables { batch, ..v }
+ }
+
+ fn process(&self, d: Self::ResponseData) -> Result<(Vec<Self::Item>, Option<Cursor>)> {
+ debug!("rate limits: {:?}", d.rate_limit);
+ let prs = match d.repository {
+ Some(r) => r.pull_requests,
+ None => bail!("query returned no repo"),
+ };
+ // deliberately ignore all nulls. no idea why the schema doesn't make
+ // all of these links mandatory, having them nullable makes no sense.
+ let infos: Vec<PullRequest> = prs
+ .edges
+ .unwrap_or_default()
+ .into_iter()
+ .filter_map(|e| e?.node)
+ .map(|n| PullRequest {
+ id: n.id,
+ title: n.title,
+ is_open: !n.closed,
+ is_merged: n.merged,
+ body: n.body_html,
+ last_update: n.updated_at,
+ url: n.url,
+ base_ref: n.base_ref_name,
+ })
+ .collect();
+ let cursor = match (self.since, infos.last()) {
+ (Some(since), Some(last)) if last.last_update < since => None,
+ _ => {
+ if prs.page_info.has_next_page {
+ prs.page_info.end_cursor
+ } else {
+ None
+ }
+ }
+ };
+ Ok((infos, cursor))
+ }
+}
+
+impl Github {
+ pub fn new(api_token: &str, owner: &str, repo: &str, label: &str) -> Result<Self> {
+ use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
+
+ let headers = match HeaderValue::from_str(&format!("Bearer {}", api_token)) {
+ Ok(h) => [(AUTHORIZATION, h)].into_iter().collect::<HeaderMap>(),
+ Err(e) => bail!("invalid API token: {}", e),
+ };
+ let client = reqwest::blocking::Client::builder()
+ .user_agent(format!(
+ "{}/{}",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_VERSION")
+ ))
+ .default_headers(headers)
+ .build()?;
+ Ok(Github {
+ client,
+ owner: owner.to_string(),
+ repo: repo.to_string(),
+ label: label.to_string(),
+ })
+ }
+
+ fn query_raw<Q>(&self, q: Q, mut vars: <Q as GraphQLQuery>::Variables) -> Result<Vec<Q::Item>>
+ where
+ Q: ChunkedQuery + Debug,
+ Q::Variables: Clone + Debug,
+ {
+ let mut result = vec![];
+ let max_batch = 100;
+ let mut batch = max_batch;
+
+ loop {
+ vars = q.set_batch(batch, vars);
+
+ debug!("running query {:?} with {:?}", q, vars);
+ let started = chrono::Local::now();
+ let resp = post_graphql::<Q, _>(&self.client, API_URL, vars.clone())?;
+ let ended = chrono::Local::now();
+
+ // queries may time out. if that happens throttle the query once and try
+ // again, if that fails too we fail for good.
+ let resp = match resp.errors {
+ None => {
+ // time limit is 10 seconds. if we're well under that, increase
+ // the batch size again.
+ if batch != max_batch && ended - started < Duration::seconds(8) {
+ batch = (batch + batch / 10 + 1).min(max_batch);
+ info!("increasing batch size to {}", batch);
+ }
+ resp
+ }
+ Some(e) if batch > 1 && e.iter().all(|e| e.message.contains("timeout")) => {
+ warn!("throttling query due to timeout error: {:?}", e);
+ // anything larger than 1 seems to be unreliable here
+ batch = 1;
+ info!("new batch size: {}", batch);
+ continue;
+ }
+ Some(e) => bail!("query failed: {:?}", e),
+ };
+
+ match resp.data {
+ Some(d) => {
+ let (mut items, cursor) = q.process(d)?;
+ result.append(&mut items);
+ match cursor {
+ None => break,
+ cursor => vars = q.change_after(vars, cursor),
+ }
+ }
+ None => bail!("query returned no data"),
+ }
+ }
+
+ Ok(result)
+ }
+
+ pub fn query_issues(&self, since: Option<DateTime>) -> Result<Vec<Issue>> {
+ self.query_raw(
+ IssuesQuery,
+ issues_query::Variables {
+ owner: self.owner.clone(),
+ name: self.repo.clone(),
+ label: self.label.clone(),
+ after: None,
+ since,
+ batch: 100,
+ },
+ )
+ }
+
+ pub fn query_pulls(&self, since: Option<DateTime>) -> Result<Vec<PullRequest>> {
+ self.query_raw(
+ PullsQuery { since },
+ pulls_query::Variables {
+ owner: self.owner.clone(),
+ name: self.repo.clone(),
+ label: self.label.clone(),
+ after: None,
+ batch: 100,
+ },
+ )
+ }
+}
diff --git a/src/issues.graphql b/src/issues.graphql
new file mode 100644
index 0000000..93a4dc0
--- /dev/null
+++ b/src/issues.graphql
@@ -0,0 +1,26 @@
+query IssuesQuery($owner: String!, $name: String!, $label: String!, $after: String, $since: DateTime, $batch: Int!) {
+ rateLimit {
+ limit
+ cost
+ remaining
+ resetAt
+ }
+ repository(owner: $owner, name: $name) {
+ issues(first: $batch, after: $after, filterBy: { labels: [ $label ], since: $since }) {
+ pageInfo {
+ endCursor
+ hasNextPage
+ }
+ edges {
+ node {
+ id
+ bodyHTML
+ closed
+ title
+ updatedAt
+ url
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..7dba807
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,350 @@
+#[macro_use]
+extern crate anyhow;
+#[macro_use]
+extern crate log;
+
+mod github;
+mod types;
+
+use std::{
+ collections::{btree_map::Entry, BTreeMap},
+ env,
+ fs::File,
+ io::{BufReader, BufWriter},
+ path::{Path, PathBuf},
+};
+
+use anyhow::{Context, Result};
+use chrono::{Duration, Utc};
+use clap::{Args, Parser, Subcommand};
+use github::Github;
+use rss::{Channel, ChannelBuilder, Guid, Item, ItemBuilder};
+use serde_json::{from_reader, to_writer};
+use tempfile::NamedTempFile;
+use types::{DateTime, IssueAction, PullAction, State, STATE_VERSION};
+
+#[derive(Parser)]
+#[clap(version)]
+/// Poll github issues and PRs by label and generate RSS feeds.
+struct App {
+ #[clap(subcommand)]
+ command: Command,
+}
+
+#[derive(Subcommand)]
+#[clap(about)]
+enum Command {
+ /// Initialize a tracker state.
+ ///
+ /// Each tracker state applies to only one repository and only one label.
+ Init {
+ /// Path of the newly created state.
+ state_file: PathBuf,
+ /// Owner of the repository to query.
+ owner: String,
+ /// Name of the repository.
+ repo: String,
+ /// Name of the label to track.
+ label: String,
+ },
+ /// Sync issues on a state.
+ SyncIssues(SyncArgs),
+ /// Sync pull requests on a state.
+ SyncPrs(SyncArgs),
+ /// Emit an RSS feed for issue changes.
+ EmitIssues(EmitArgs),
+ /// Emit an RSS feed for PR changes.
+ EmitPrs(EmitArgs),
+}
+
+#[derive(Args)]
+struct SyncArgs {
+ /// State to sync.
+ state_file: PathBuf,
+}
+
+#[derive(Args)]
+struct EmitArgs {
+ /// State to read.
+ state_file: PathBuf,
+
+ #[clap(short, long = "max-age")]
+ /// How far to look back in history, in hours.
+ age_hours: u32,
+
+ #[clap(short, long)]
+ /// Target file for the generated feed. Defaults to stdout.
+ out: Option<PathBuf>,
+}
+
+fn with_state<F>(state_file: PathBuf, f: F) -> Result<()>
+where
+ F: FnOnce(State) -> Result<Option<State>>,
+{
+ let old_state: State = from_reader(BufReader::new(File::open(&state_file)?))?;
+ if old_state.version != STATE_VERSION {
+ bail!(
+ "expected state version {}, got {}",
+ STATE_VERSION,
+ old_state.version
+ );
+ }
+
+ let new_state = f(old_state)?;
+
+ if let Some(state) = new_state {
+ let new_state_file = NamedTempFile::new_in(
+ state_file
+ .ancestors()
+ .nth(1)
+ .unwrap_or_else(|| Path::new(".")),
+ )?;
+
+ to_writer(BufWriter::new(&new_state_file), &state)?;
+ new_state_file.persist(state_file)?;
+ }
+
+ Ok(())
+}
+
+fn with_state_and_github<F>(state_file: PathBuf, f: F) -> Result<()>
+where
+ F: FnOnce(State, &Github) -> Result<Option<State>>,
+{
+ let github_api_token =
+ env::var("GITHUB_API_TOKEN").context("failed to load GITHUB_API_TOKEN")?;
+
+ with_state(state_file, |old_state| {
+ let client = github::Github::new(
+ &github_api_token,
+ &old_state.owner,
+ &old_state.repo,
+ &old_state.label,
+ )?;
+
+ f(old_state, &client)
+ })
+}
+
+fn sync_issues(mut state: State, github: &github::Github) -> Result<Option<State>> {
+ let issues = github.query_issues(state.issues_updated)?;
+
+ let mut new_history = vec![];
+
+ for updated in issues {
+ let issue_state = |is_new| match (updated.is_open, is_new) {
+ (true, _) => IssueAction::New,
+ (false, true) => IssueAction::NewClosed,
+ (false, false) => IssueAction::Closed,
+ };
+ match state.issues.entry(updated.id.clone()) {
+ Entry::Occupied(mut e) => {
+ let stored = e.get_mut();
+ if stored.is_open != updated.is_open {
+ new_history.push((updated.last_update, updated.id.clone(), issue_state(false)));
+ }
+ *stored = updated;
+ }
+ Entry::Vacant(e) => {
+ new_history.push((updated.last_update, updated.id.clone(), issue_state(true)));
+ e.insert(updated);
+ }
+ }
+ }
+
+ new_history.sort_by(|a, b| (a.0, &a.1).cmp(&(b.0, &b.1)));
+ if let Some(&(at, _, _)) = new_history.last() {
+ state.issues_updated = Some(at);
+ }
+ state.issue_history.append(&mut new_history);
+
+ Ok(Some(state))
+}
+
+fn sync_prs(mut state: State, github: &github::Github) -> Result<Option<State>> {
+ let prs = github.query_pulls(state.pull_requests_updated)?;
+
+ let mut new_history = vec![];
+
+ for updated in prs {
+ let pr_state = |is_new| match (updated.is_open, updated.is_merged, is_new) {
+ (false, false, true) => PullAction::NewClosed,
+ (false, false, false) => PullAction::Closed,
+ (true, false, _) => PullAction::New,
+ (_, true, true) => PullAction::NewMerged,
+ (_, true, false) => PullAction::Merged,
+ };
+ match state.pull_requests.entry(updated.id.clone()) {
+ Entry::Occupied(mut e) => {
+ let stored = e.get_mut();
+ if (stored.is_open, stored.is_merged) != (updated.is_open, updated.is_merged) {
+ new_history.push((updated.last_update, updated.id.clone(), pr_state(false)));
+ }
+ *stored = updated;
+ }
+ Entry::Vacant(e) => {
+ new_history.push((updated.last_update, updated.id.clone(), pr_state(true)));
+ e.insert(updated);
+ }
+ }
+ }
+
+ new_history.sort_by(|a, b| (a.0, &a.1).cmp(&(b.0, &b.1)));
+ if let Some(&(at, _, _)) = new_history.last() {
+ state.pull_requests_updated = Some(at);
+ }
+ state.pull_history.append(&mut new_history);
+
+ Ok(Some(state))
+}
+
+fn format_history<V, A: Copy, F: Fn(&V, DateTime, A) -> Item>(
+ items: &BTreeMap<String, V>,
+ history: &[(DateTime, String, A)],
+ age_hours: u32,
+ format_entry: F,
+) -> Result<Vec<Item>> {
+ let since = Utc::now() - Duration::hours(age_hours as i64);
+
+ history
+ .iter()
+ .rev()
+ .take_while(|(changed, _, _)| changed >= &since)
+ .map(|(changed, id, how)| {
+ let entry = match items.get(id.as_str()) {
+ Some(i) => i,
+ None => bail!("database is corrupted (dangling key {})", id),
+ };
+ Ok(Item {
+ guid: Some(Guid {
+ value: format!("{}/{}", changed.to_rfc3339(), id),
+ permalink: false,
+ }),
+ ..format_entry(entry, *changed, *how)
+ })
+ })
+ .collect::<Result<Vec<_>, _>>()
+}
+
+fn new_rss_item(tag: &str, title: &str, url: &str, changed: DateTime, body: &str) -> Item {
+ ItemBuilder::default()
+ .title(Some(format!("{} {}", tag, title)))
+ .link(Some(url.to_string()))
+ .pub_date(Some(changed.to_rfc2822()))
+ .content(Some(body.to_string()))
+ .build()
+}
+
+fn emit_issues(state: &State, age_hours: u32) -> Result<Channel> {
+ let entries = format_history(
+ &state.issues,
+ &state.issue_history,
+ age_hours,
+ |issue, changed, how| {
+ let tag = match how {
+ IssueAction::New => "[NEW]",
+ IssueAction::Closed => "[CLOSED]",
+ IssueAction::NewClosed => "[NEW][CLOSED]",
+ };
+ new_rss_item(tag, &issue.title, &issue.url, changed, &issue.body)
+ },
+ )?;
+
+ let channel = ChannelBuilder::default()
+ .title(format!(
+ "Issues labeled `{}' in {}/{}",
+ state.label, state.owner, state.repo
+ ))
+ .items(entries)
+ .build();
+
+ Ok(channel)
+}
+
+fn emit_prs(state: &State, age_hours: u32) -> Result<Channel> {
+ let entries = format_history(
+ &state.pull_requests,
+ &state.pull_history,
+ age_hours,
+ |pr, changed, how| {
+ let tag = match how {
+ PullAction::New => "[NEW]",
+ PullAction::NewMerged => "[NEW][MERGED]",
+ PullAction::Closed => "[CLOSED]",
+ PullAction::NewClosed => "[NEW][CLOSED]",
+ PullAction::Merged => "[MERGED]",
+ };
+ let info = format!("{}({})", tag, pr.base_ref);
+ new_rss_item(&info, &pr.title, &pr.url, changed, &pr.body)
+ },
+ )?;
+
+ let channel = ChannelBuilder::default()
+ .title(format!(
+ "Pull requests labeled `{}' in {}/{}",
+ state.label, state.owner, state.repo
+ ))
+ .items(entries)
+ .build();
+
+ Ok(channel)
+}
+
+fn write_feed(to: Option<PathBuf>, channel: Channel) -> Result<Option<State>> {
+ match to {
+ Some(to) => {
+ let new_file =
+ NamedTempFile::new_in(to.ancestors().nth(1).unwrap_or_else(|| Path::new(".")))?;
+
+ channel.write_to(BufWriter::new(&new_file))?;
+ new_file.persist(to)?;
+ }
+ None => println!("{}", channel.to_string()),
+ }
+ Ok(None)
+}
+
+fn main() -> Result<()> {
+ pretty_env_logger::init();
+
+ match App::parse().command {
+ Command::Init {
+ state_file,
+ owner,
+ repo,
+ label,
+ } => {
+ let state = State {
+ version: STATE_VERSION,
+ owner,
+ repo,
+ label,
+ ..State::default()
+ };
+
+ let file = File::options()
+ .create_new(true)
+ .write(true)
+ .open(state_file)?;
+ to_writer(file, &state)?;
+ }
+ Command::SyncIssues(cmd) => {
+ with_state_and_github(cmd.state_file, sync_issues)?;
+ }
+ Command::SyncPrs(cmd) => {
+ with_state_and_github(cmd.state_file, sync_prs)?;
+ }
+ Command::EmitIssues(cmd) => {
+ with_state(cmd.state_file, |s| {
+ write_feed(cmd.out, emit_issues(&s, cmd.age_hours)?)
+ })?;
+ }
+ Command::EmitPrs(cmd) => {
+ with_state(cmd.state_file, |s| {
+ write_feed(cmd.out, emit_prs(&s, cmd.age_hours)?)
+ })?;
+ }
+ };
+
+ Ok(())
+}
diff --git a/src/pulls.graphql b/src/pulls.graphql
new file mode 100644
index 0000000..a25f247
--- /dev/null
+++ b/src/pulls.graphql
@@ -0,0 +1,28 @@
+query PullsQuery($owner: String!, $name: String!, $label: String!, $after: String, $batch: Int!) {
+ rateLimit {
+ limit
+ cost
+ remaining
+ resetAt
+ }
+ repository(owner: $owner, name: $name) {
+ pullRequests(first: $batch, after: $after, labels: [ $label ], orderBy: { direction: DESC, field: UPDATED_AT }) {
+ pageInfo {
+ endCursor
+ hasNextPage
+ }
+ edges {
+ node {
+ id
+ bodyHTML
+ closed
+ merged
+ baseRefName
+ title
+ updatedAt
+ url
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/types.rs b/src/types.rs
new file mode 100644
index 0000000..a960647
--- /dev/null
+++ b/src/types.rs
@@ -0,0 +1,72 @@
+#![allow(non_camel_case_types)]
+
+use std::collections::BTreeMap;
+
+use serde::{Deserialize, Serialize};
+
+pub type DateTime = chrono::DateTime<chrono::Utc>;
+#[allow(clippy::upper_case_acronyms)]
+pub type HTML = String;
+#[allow(clippy::upper_case_acronyms)]
+pub type URI = String;
+
+pub const STATE_VERSION: u32 = 1;
+
+#[derive(Default, Debug, Serialize, Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct State {
+ pub version: u32,
+ pub owner: String,
+ pub repo: String,
+ pub label: String,
+ pub issues_updated: Option<DateTime>,
+ pub issues: BTreeMap<String, Issue>,
+ pub issue_history: Vec<(DateTime, String, IssueAction)>,
+ pub pull_requests_updated: Option<DateTime>,
+ pub pull_requests: BTreeMap<String, PullRequest>,
+ pub pull_history: Vec<(DateTime, String, PullAction)>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct Issue {
+ #[serde(skip)]
+ pub id: String,
+ pub title: String,
+ pub is_open: bool,
+ pub body: String,
+ pub last_update: DateTime,
+ pub url: String,
+}
+
+#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
+#[serde(deny_unknown_fields)]
+pub enum IssueAction {
+ New,
+ Closed,
+ NewClosed,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct PullRequest {
+ #[serde(skip)]
+ pub id: String,
+ pub title: String,
+ pub is_open: bool,
+ pub is_merged: bool,
+ pub body: String,
+ pub last_update: DateTime,
+ pub url: String,
+ pub base_ref: String,
+}
+
+#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
+#[serde(deny_unknown_fields)]
+pub enum PullAction {
+ New,
+ Closed,
+ NewClosed,
+ Merged,
+ NewMerged,
+}