This commit is contained in:
Edgar 2023-09-18 14:45:36 +02:00
commit c6aad76b52
No known key found for this signature in database
GPG Key ID: 70ADAE8F35904387
8 changed files with 3431 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
.env

3103
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

34
Cargo.toml Normal file
View File

@ -0,0 +1,34 @@
[package]
name = "teeobserver"
version = "0.1.0"
edition = "2021"
authors = ["Edgar <git@edgarluque.com>"]
license = "AGPL-3.0"
repository = "https://github.com/edg-l/teeobserver"
homepage = "https://github.com/edg-l/teeobserver"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[profile.dev.package.sqlx-macros]
opt-level = 3
[dependencies]
axum = { version = "0.6.20", features = ["headers"] }
time = "0.3.28"
dotenvy = "0.15.7"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.107"
tar = "0.4.40"
tokio = { version = "1.32.0", features = ["full", "tracing"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
zstd = "0.12.4"
sqlx = { version = "0.7.1", default-features = false, features = ["runtime-tokio", "tls-rustls", "postgres", "macros", "time", "migrate", "uuid"] }
moka = { version = "0.11.3", features = ["future"] }
reqwest = { version = "0.11.20", features = ["json"] }
tower-http = { version = "0.4.4", features = ["full"] }
thiserror = "1.0.48"
headers = "0.3.9"
validator = { version = "0.16.1", features = ["derive"] }
anyhow = "1.0.75"
http = "0.2.9"

View File

@ -0,0 +1 @@
-- Add migration script here

183
src/main.rs Normal file
View File

@ -0,0 +1,183 @@
use std::{
collections::{HashSet, VecDeque},
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};
use axum::{routing::get, Router};
use http::Method;
use reqwest::Client;
use sqlx::{postgres::PgPoolOptions, PgPool};
use structures::{MasterEvent, ServerList, ServerListMap};
use tokio::{sync::RwLock, time::interval};
use tower_http::{
cors::{self, CorsLayer},
timeout::TimeoutLayer,
trace::TraceLayer,
};
use tracing::{error, info};
use crate::util::fetch_master;
pub mod routes;
pub mod structures;
pub mod util;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Better error reporting whe compiling if we are not inside a macro.
run().await
}
#[derive(Debug, Clone)]
pub struct AppState {
pub pool: PgPool,
pub servers: Arc<RwLock<ServerListMap>>,
pub client: Client,
pub events: Arc<RwLock<VecDeque<(MasterEvent, Instant)>>>,
}
async fn run() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
tracing_subscriber::fmt::init();
let pool = PgPoolOptions::new()
.max_connections(
std::env::var("DATABASE_MAX_CONNECTIONS")
.map(|x| x.parse().expect("valid number"))
.unwrap_or(30),
)
.connect(&std::env::var("DATABASE_URL")?)
.await?;
let client = reqwest::Client::new();
let servers = fetch_master(&client).await?;
let servers = Arc::new(RwLock::new(servers));
let events = Arc::new(RwLock::new(VecDeque::new()));
let task_pool = pool.clone();
let task_client = client.clone();
let task_servers = servers.clone();
let task_events = events.clone();
let state = AppState {
pool,
client,
servers,
events,
};
let mut fetch_interval = interval(Duration::from_secs(10));
let task = tokio::spawn(async move {
loop {
fetch_interval.tick().await;
let res = fetch_master(&task_client).await;
match res {
Ok(new_servers) => {
{
let old_servers = task_servers.read().await;
let mut events = task_events.write().await;
let now = Instant::now();
let mut new_players = HashSet::with_capacity(64);
let mut old_players = HashSet::with_capacity(64);
let empty = vec![];
for (address, server) in new_servers.servers.iter() {
if let Some(old_server) = old_servers.servers.get(address) {
new_players.clear();
old_players.clear();
old_players
.extend(old_server.info.clients.as_ref().unwrap_or(&empty));
new_players.extend(server.info.clients.as_ref().unwrap_or(&empty));
let diff = new_players.symmetric_difference(&old_players);
for player in diff {
// client joined
if new_players.contains(player) {
info!(
"'{:?}' client joined server '{:?}' ({:?})",
player.name, server.info.name, address
);
events.push_back((
MasterEvent::ClientJoined(
(*player).clone(),
server.clone(),
),
now,
));
} else {
info!(
"'{:?}' client left server '{:?}' ({:?})",
player.name, server.info.name, address
);
events.push_back((
MasterEvent::ClientLeft(
(*player).clone(),
server.clone(),
),
now,
));
}
}
} else {
info!(
"server '{:?}' ({:?}) went online",
server.info.name, address
);
events.push_back((
MasterEvent::ServerWentOnline(server.clone()),
now,
));
}
}
for (address, server) in old_servers.servers.iter() {
if !new_servers.servers.contains_key(address) {
info!(
"server '{:?}' ({:?}) went offline",
server.info.name, address
);
events.push_back((
MasterEvent::ServerWentOffline(server.clone()),
now,
));
}
}
}
let mut servers = task_servers.write().await;
*servers = new_servers;
}
Err(e) => error!("error fetching master: {e:?}"),
}
}
});
let app = Router::new()
.route("/", get(|| async { "a" }))
.layer(TraceLayer::new_for_http())
.layer(TimeoutLayer::new(Duration::from_secs(5)))
.layer(
CorsLayer::new()
.allow_methods([Method::GET, Method::POST])
.allow_origin(cors::Any),
)
.with_state(state);
let port = std::env::var("PORT")
.unwrap_or_else(|_| "3000".to_string())
.parse()?;
let addr = SocketAddr::from(([127, 0, 0, 1], port));
tracing::info!("listening on http://{}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;
task.abort();
Ok(())
}

1
src/routes.rs Normal file
View File

@ -0,0 +1 @@

84
src/structures.rs Normal file
View File

@ -0,0 +1,84 @@
use std::{collections::HashMap, hash::Hash, sync::Arc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Serialize, Hash)]
pub struct Skin {
pub name: Option<String>,
pub color_body: Option<i64>,
pub color_feet: Option<i64>,
}
#[derive(Debug, Deserialize, Clone, Eq, Serialize)]
pub struct Client {
pub name: Option<String>,
pub clan: Option<String>,
pub country: Option<i64>,
pub score: Option<i64>,
pub afk: Option<bool>,
pub team: Option<i64>,
pub is_player: Option<bool>,
pub skin: Option<Skin>,
}
impl PartialEq for Client {
fn eq(&self, other: &Self) -> bool {
self.name.eq(&other.name)
}
}
impl Hash for Client {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.name.hash(state)
}
}
#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Serialize)]
pub struct Map {
pub name: Option<String>,
pub sha256: Option<String>,
pub size: Option<i64>,
#[serde(flatten)]
pub other: HashMap<String, Value>,
}
#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Serialize)]
pub struct Info {
pub clients: Option<Vec<Client>>,
pub game_type: Option<String>,
pub max_clients: Option<i64>,
pub max_players: Option<i64>,
pub passworded: Option<bool>,
pub name: Option<String>,
pub version: Option<String>,
#[serde(flatten)]
pub other: HashMap<String, Value>,
}
#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Serialize)]
pub struct Server {
pub addresses: Vec<String>,
pub location: Option<String>,
pub info: Info,
#[serde(flatten)]
pub other: HashMap<String, Value>,
}
#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Serialize)]
pub struct ServerList {
pub servers: Vec<Arc<Server>>,
}
#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Serialize)]
pub struct ServerListMap {
pub servers: HashMap<String, Arc<Server>>,
}
#[derive(Debug, Serialize)]
pub enum MasterEvent {
ClientJoined(Client, Arc<Server>),
ClientLeft(Client, Arc<Server>),
ServerWentOnline(Arc<Server>),
ServerWentOffline(Arc<Server>),
}

23
src/util.rs Normal file
View File

@ -0,0 +1,23 @@
use std::collections::HashMap;
use reqwest::Client;
use tracing::debug;
use crate::structures::{ServerList, ServerListMap};
pub async fn fetch_master(client: &Client) -> Result<ServerListMap, reqwest::Error> {
debug!("making request to master");
let res: ServerList = client
.get("https://master1.ddnet.org/ddnet/15/servers.json")
.send()
.await?
.json()
.await?;
debug!("got {} servers", res.servers.len());
let servers: HashMap<_, _> = res
.servers
.into_iter()
.filter_map(|x| x.addresses.get(0).cloned().map(|address| (address, x)))
.collect();
Ok(ServerListMap { servers })
}