Skip to content

Commit

Permalink
skip unauthorized replays
Browse files Browse the repository at this point in the history
Updates the connector to skip replays that are not public, unless the uploader
`steam_id` matches that of the caller (in which case, we presumably will have access).
This avoids making API calls that result in a 404 response when we don't have access
to a replay. For some reason, these replays still get returned in a list response.
  • Loading branch information
psFried committed Sep 22, 2023
1 parent 525372c commit 0b85ff9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 8 deletions.
16 changes: 16 additions & 0 deletions src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,27 @@ struct GroupListing {
list: Vec<GroupSummary>,
}

#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Visibility {
Public,
Private,
Unlisted,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Uploader {
pub steam_id: String,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ReplaySummary {
pub id: String,
#[serde(with = "time::serde::rfc3339")]
pub created: OffsetDateTime,

pub visibility: Option<Visibility>,
pub uploader: Uploader,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
45 changes: 37 additions & 8 deletions src/pull.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
fetcher::{Fetcher, ReplaySummary},
fetcher::{Fetcher, ReplaySummary, Visibility},
state::{BindingState, State, TodoGroup},
write_capture_response, EndpointConfig, ResourceConfig,
};
Expand All @@ -13,6 +13,7 @@ use proto_flow::{
};
use serde::{Deserialize, Serialize};

use time::OffsetDateTime;
use tokio::io;

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -95,6 +96,12 @@ async fn run_sweep(
fetcher: &Fetcher,
emitter: &mut Emitter,
) -> anyhow::Result<()> {
let ping_response = fetcher
.ping_server()
.await
.context("failed to ping server")?;
let caller_steam_id = ping_response.steam_id;

// Is there an in-progress sweep? If not, then we'll start one.
for binding_state in state.bindings.values_mut() {
if binding_state.sweep_start.is_none() {
Expand All @@ -110,7 +117,9 @@ async fn run_sweep(
continue;
}
tracing::debug!(%binding_key, ?binding_state, todo_groups = binding_state.todo_groups.len(), "checking for next replays");
if let Some((lineage, replays)) = next_replays(binding_state, fetcher).await? {
if let Some((lineage, replays)) =
next_replays(binding_state, fetcher, &caller_steam_id).await?
{
tracing::debug!(%binding_key, ?lineage, num_replays = replays.len(), "found replays to fetch");
let binding_idx = binding_indices.get(binding_key).unwrap();
ingest_replays(lineage, *binding_idx, &replays, fetcher, emitter)
Expand Down Expand Up @@ -141,12 +150,37 @@ fn lineage_info(grp: &TodoGroup) -> ParentGroup {
}
}

fn should_ingest(
last_completed_sweep: Option<OffsetDateTime>,
replay: &ReplaySummary,
caller_steam_id: &str,
) -> bool {
// Filter out replays that we've already captured
if !last_completed_sweep
.map(|sc| replay.created > sc)
.unwrap_or(true)
{
return false;
}

// Filter out replays that we don't have permission to download
if replay.visibility.unwrap_or(Visibility::Public) == Visibility::Public {
true
} else if replay.uploader.steam_id == caller_steam_id {
true
} else {
tracing::warn!(?replay, %caller_steam_id, "skipping replay because it is not public and does not belong to the caller");
false
}
}

/// Does a depth-first search of the graph of groups. Does not use recursion
/// because async rust does not yet allow it
#[tracing::instrument(skip(fetcher))]
async fn next_replays(
state: &mut BindingState,
fetcher: &Fetcher,
caller_steam_id: &str,
) -> anyhow::Result<Option<(Vec<ParentGroup>, Vec<ReplaySummary>)>> {
let BindingState {
last_completed_sweep,
Expand All @@ -168,12 +202,7 @@ async fn next_replays(
if next_group.must_fetch_replays {
next_group.must_fetch_replays = false;
let mut replays = fetcher.fetch_replay_ids(&next_group.id).await?;
replays.retain(|rp| {
last_completed_sweep
.as_ref()
.map(|sc| rp.created > *sc)
.unwrap_or(true)
});
replays.retain(|rp| should_ingest(*last_completed_sweep, rp, caller_steam_id));
if !replays.is_empty() {
return Ok(Some((lineage, replays)));
}
Expand Down

0 comments on commit 0b85ff9

Please sign in to comment.