diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..42ce8a1 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,56 @@ +name: Lint + +on: + push: + branches: + - main + - master + pull_request: + paths: + - "**/*.rs" + - "**/Cargo.toml" + - "Cargo.lock" + - ".github/workflows/lint.yml" + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: stable + components: clippy, rustfmt + + - name: Cache cargo registry + uses: actions/cache@v4 + with: + path: ~/.cargo/registry + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + + - name: Cache cargo index + uses: actions/cache@v4 + with: + path: ~/.cargo/git + key: ${{ runner.os }}-cargo-index-${{ hashFiles('**/Cargo.lock') }} + + - name: Cache cargo build + uses: actions/cache@v4 + with: + path: target + key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.lock') }} + + - name: Check formatting + run: cargo fmt --all -- --check + + - name: Run clippy + run: cargo clippy --all-targets --all-features -- -D warnings + + - name: Run tests + run: cargo test --all-targets --all-features + + - name: Check documentation + run: cargo doc --no-deps --all-features diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..745acfe --- /dev/null +++ b/readme.md @@ -0,0 +1,60 @@ +# Reestream - RTMP Multistream Demuxer + +## Overview +RTMP relay server that receives a single stream and forwards to multiple platforms (Twitch, Facebook, Instagram, YouTube). + +## Architecture + +### Components +- **main.rs**: TCP listener, connection handling, config loading +- **server.rs**: RTMP handshake, server session setup (low-latency config) +- **client.rs**: Publisher connection handling, stream forwarding to platforms +- **config.rs**: TOML-based configuration parsing +- **provider.rs**: Stream key provider abstraction (OAuth2) +- **error.rs**: Centralized error types + +### Flow +1. Listen on RTMP port (default 1945) +2. Accept publisher connection +3. Perform RTMP handshake +4. Validate stream key +5. Forward packets to all configured platforms (RTMP/RTMPS) + +## Configuration + +### config.toml +```toml +rtmp_addr = "0.0.0.0" +rtmp_port = 1945 +stream_key = "your-key" + +[[platform]] +url = "rtmp://live.twitch.tv/app" +key = "stream-key" +orientation = "horizontal" # or "vertical" +``` + +### CLI +```bash +reestream --config config.toml +``` + +## Specs + +### Low Latency +- Chunk size: 128 bytes +- ACK window: 256KB +- TCP_NODELAY enabled + +### Supported Protocols +- Input: RTMP +- Output: RTMP, RTMPS (via tokio-native-tls) + +### Platforms +Pre-configured for: Twitch, Facebook, Instagram, YouTube. Extensible via config. + +## Dependencies +- rml_rtmp: RTMP protocol +- tokio: Async runtime +- reqwest: HTTP client (OAuth2) +- toml: Config parsing \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index dde36f2..fcf7ce2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -114,7 +114,7 @@ pub async fn handle_publisher( ServerSessionResult::RaisedEvent(ev) => match ev { ServerSessionEvent::ConnectionRequested { request_id, - app_name, + app_name: _, } => { if let Ok(out) = server_session.accept_request(request_id) { for r in out { @@ -238,7 +238,7 @@ pub async fn handle_publisher( let mut state = pc.client_state.write().await; match state.session.publish_video_data( data.clone(), - timestamp.clone(), + timestamp, true, ) { Ok(ClientSessionResult::OutboundResponse(packet)) => { @@ -278,7 +278,7 @@ pub async fn handle_publisher( let mut state = pc.client_state.write().await; match state.session.publish_audio_data( data.clone(), - timestamp.clone(), + timestamp, true, ) { Ok(ClientSessionResult::OutboundResponse(packet)) => { @@ -315,20 +315,19 @@ pub async fn handle_publisher( if *pc.publish_ready_rx.borrow() { let mut state = pc.client_state.write().await; match state.session.publish_metadata(&metadata) { - Ok(client_res) => match client_res { - ClientSessionResult::OutboundResponse(packet) => { - if let Err(e) = pc + Ok(client_res) => { + if let ClientSessionResult::OutboundResponse(packet) = + client_res + && let Err(e) = pc .tx_feed .try_send(Bytes::from(packet.bytes.clone())) - { - debug!( - "Dropped publish_metadata packet for push client: {}", - e - ); - } + { + debug!( + "Dropped publish_metadata packet for push client: {}", + e + ); } - _ => {} - }, + } Err(e) => { error!("Error publish_metadata to push client: {:?}", e); } diff --git a/src/client/push.rs b/src/client/push.rs index 5f91393..24c695c 100644 --- a/src/client/push.rs +++ b/src/client/push.rs @@ -109,10 +109,10 @@ impl PushClient { // send initial results (use try_send, drop if full) for r in initial_results { - if let ClientSessionResult::OutboundResponse(packet) = r { - if let Err(e) = tx.try_send(Bytes::from(packet.bytes.clone())) { - debug!("Dropped initial packet for {}: {}", addr, e); - } + if let ClientSessionResult::OutboundResponse(packet) = r + && let Err(e) = tx.try_send(Bytes::from(packet.bytes.clone())) + { + debug!("Dropped initial packet for {}: {}", addr, e); } } diff --git a/src/config.rs b/src/config.rs index e78d009..ea48592 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,7 +15,7 @@ pub struct Config { pub struct Platform { pub url: Url, pub key: String, - pub orientation: Orientation, + pub _orientation: Orientation, } #[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq)] diff --git a/src/error.rs b/src/error.rs index 1a44203..91a3555 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,7 @@ use std::fmt; #[derive(Debug)] +#[allow(dead_code)] pub enum RelayError { Io(std::io::Error), Tls(tokio_native_tls::native_tls::Error), @@ -41,4 +42,5 @@ impl From for RelayError { } } +#[allow(dead_code)] pub type Result = std::result::Result; diff --git a/src/main.rs b/src/main.rs index 9ce80a0..fa065e1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ +use clap::Parser; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; -use clap::Parser; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio::sync::RwLock; diff --git a/src/provider.rs b/src/provider.rs index e8642f0..49f99bc 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -4,6 +4,8 @@ use std::fmt; use serde::{Deserialize, Serialize}; #[derive(Debug)] +#[allow(dead_code)] +#[allow(clippy::enum_variant_names)] pub enum StreamKeyError { OAuthError(String), ApiError(String), @@ -25,12 +27,14 @@ impl fmt::Display for StreamKeyError { impl Error for StreamKeyError {} #[derive(Debug, Clone, Serialize, Deserialize)] +#[allow(dead_code)] pub struct StreamKey { pub key: String, pub rtmp_url: String, } #[derive(Debug, Clone)] +#[allow(dead_code)] pub struct OAuth2Config { pub client_id: String, pub client_secret: String, @@ -38,6 +42,7 @@ pub struct OAuth2Config { pub access_token: Option, } +#[allow(dead_code)] pub trait StreamKeyProvider: Send + Sync { const NAME: &str; diff --git a/src/server.rs b/src/server.rs index 51678c1..14e89cd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -9,7 +9,6 @@ pub async fn handshake_and_create_server_session( ) -> Result<(ServerSession, Vec), Box> { let mut hs = Handshake::new(PeerType::Server); let mut buf = [0u8; 4096]; - let mut leftover = Vec::new(); loop { let n = stream.read(&mut buf).await?; @@ -30,23 +29,24 @@ pub async fn handshake_and_create_server_session( if !response_bytes.is_empty() { stream.write_all(&response_bytes).await?; } - leftover = remaining_bytes; - break; - } - } - } + return Ok(( + { + // Reduce latency: use smaller chunk size and smaller ack window to have quicker acks + let mut config = ServerSessionConfig::new(); + config.chunk_size = 128; // smaller chunks -> lower per-chunk latency (tradeoff CPU) + config.window_ack_size = 262_144; // 256KB ack window to get more frequent acks - // Reduce latency: use smaller chunk size and smaller ack window to have quicker acks - let mut config = ServerSessionConfig::new(); - config.chunk_size = 128; // smaller chunks -> lower per-chunk latency (tradeoff CPU) - config.window_ack_size = 262_144; // 256KB ack window to get more frequent acks - - let (server_session, initial_results) = ServerSession::new(config)?; - for res in initial_results { - if let ServerSessionResult::OutboundResponse(packet) = res { - stream.write_all(&packet.bytes).await?; + let (server_session, initial_results) = ServerSession::new(config)?; + for res in initial_results { + if let ServerSessionResult::OutboundResponse(packet) = res { + stream.write_all(&packet.bytes).await?; + } + } + server_session + }, + remaining_bytes, + )); + } } } - - Ok((server_session, leftover)) }