Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: Lint

on:
push:
branches:
- main
- master
pull_request:
paths:
- "**/*.rs"
- "**/Cargo.toml"
- "Cargo.lock"
- ".github/workflows/lint.yml"

jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: stable
components: clippy, rustfmt

- name: Cache cargo registry
uses: actions/cache@v4
with:
path: ~/.cargo/registry
key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }}

- name: Cache cargo index
uses: actions/cache@v4
with:
path: ~/.cargo/git
key: ${{ runner.os }}-cargo-index-${{ hashFiles('**/Cargo.lock') }}

- name: Cache cargo build
uses: actions/cache@v4
with:
path: target
key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.lock') }}

- name: Check formatting
run: cargo fmt --all -- --check

- name: Run clippy
run: cargo clippy --all-targets --all-features -- -D warnings

- name: Run tests
run: cargo test --all-targets --all-features

- name: Check documentation
run: cargo doc --no-deps --all-features
60 changes: 60 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Reestream - RTMP Multistream Demuxer

## Overview
RTMP relay server that receives a single stream and forwards to multiple platforms (Twitch, Facebook, Instagram, YouTube).

## Architecture

### Components
- **main.rs**: TCP listener, connection handling, config loading
- **server.rs**: RTMP handshake, server session setup (low-latency config)
- **client.rs**: Publisher connection handling, stream forwarding to platforms
- **config.rs**: TOML-based configuration parsing
- **provider.rs**: Stream key provider abstraction (OAuth2)
- **error.rs**: Centralized error types

### Flow
1. Listen on RTMP port (default 1945)
2. Accept publisher connection
3. Perform RTMP handshake
4. Validate stream key
5. Forward packets to all configured platforms (RTMP/RTMPS)

## Configuration

### config.toml
```toml
rtmp_addr = "0.0.0.0"
rtmp_port = 1945
stream_key = "your-key"

[[platform]]
url = "rtmp://live.twitch.tv/app"
key = "stream-key"
orientation = "horizontal" # or "vertical"
```

### CLI
```bash
reestream --config config.toml
```

## Specs

### Low Latency
- Chunk size: 128 bytes
- ACK window: 256KB
- TCP_NODELAY enabled

### Supported Protocols
- Input: RTMP
- Output: RTMP, RTMPS (via tokio-native-tls)

### Platforms
Pre-configured for: Twitch, Facebook, Instagram, YouTube. Extensible via config.

## Dependencies
- rml_rtmp: RTMP protocol
- tokio: Async runtime
- reqwest: HTTP client (OAuth2)
- toml: Config parsing
27 changes: 13 additions & 14 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub async fn handle_publisher(
ServerSessionResult::RaisedEvent(ev) => match ev {
ServerSessionEvent::ConnectionRequested {
request_id,
app_name,
app_name: _,
} => {
if let Ok(out) = server_session.accept_request(request_id) {
for r in out {
Expand Down Expand Up @@ -238,7 +238,7 @@ pub async fn handle_publisher(
let mut state = pc.client_state.write().await;
match state.session.publish_video_data(
data.clone(),
timestamp.clone(),
timestamp,
true,
) {
Ok(ClientSessionResult::OutboundResponse(packet)) => {
Expand Down Expand Up @@ -278,7 +278,7 @@ pub async fn handle_publisher(
let mut state = pc.client_state.write().await;
match state.session.publish_audio_data(
data.clone(),
timestamp.clone(),
timestamp,
true,
) {
Ok(ClientSessionResult::OutboundResponse(packet)) => {
Expand Down Expand Up @@ -315,20 +315,19 @@ pub async fn handle_publisher(
if *pc.publish_ready_rx.borrow() {
let mut state = pc.client_state.write().await;
match state.session.publish_metadata(&metadata) {
Ok(client_res) => match client_res {
ClientSessionResult::OutboundResponse(packet) => {
if let Err(e) = pc
Ok(client_res) => {
if let ClientSessionResult::OutboundResponse(packet) =
client_res
&& let Err(e) = pc
.tx_feed
.try_send(Bytes::from(packet.bytes.clone()))
{
debug!(
"Dropped publish_metadata packet for push client: {}",
e
);
}
{
debug!(
"Dropped publish_metadata packet for push client: {}",
e
);
}
_ => {}
},
}
Err(e) => {
error!("Error publish_metadata to push client: {:?}", e);
}
Expand Down
8 changes: 4 additions & 4 deletions src/client/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ impl PushClient {

// send initial results (use try_send, drop if full)
for r in initial_results {
if let ClientSessionResult::OutboundResponse(packet) = r {
if let Err(e) = tx.try_send(Bytes::from(packet.bytes.clone())) {
debug!("Dropped initial packet for {}: {}", addr, e);
}
if let ClientSessionResult::OutboundResponse(packet) = r
&& let Err(e) = tx.try_send(Bytes::from(packet.bytes.clone()))
{
debug!("Dropped initial packet for {}: {}", addr, e);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Config {
pub struct Platform {
pub url: Url,
pub key: String,
pub orientation: Orientation,
pub _orientation: Orientation,
}

#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq)]
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt;

#[derive(Debug)]
#[allow(dead_code)]
pub enum RelayError {
Io(std::io::Error),
Tls(tokio_native_tls::native_tls::Error),
Expand Down Expand Up @@ -41,4 +42,5 @@ impl From<tokio_native_tls::native_tls::Error> for RelayError {
}
}

#[allow(dead_code)]
pub type Result<T> = std::result::Result<T, RelayError>;
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use clap::Parser;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use clap::Parser;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
use tokio::sync::RwLock;
Expand Down
5 changes: 5 additions & 0 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::fmt;
use serde::{Deserialize, Serialize};

#[derive(Debug)]
#[allow(dead_code)]
#[allow(clippy::enum_variant_names)]
pub enum StreamKeyError {
OAuthError(String),
ApiError(String),
Expand All @@ -25,19 +27,22 @@ impl fmt::Display for StreamKeyError {
impl Error for StreamKeyError {}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(dead_code)]
pub struct StreamKey {
pub key: String,
pub rtmp_url: String,
}

#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct OAuth2Config {
pub client_id: String,
pub client_secret: String,
pub redirect_uri: String,
pub access_token: Option<String>,
}

#[allow(dead_code)]
pub trait StreamKeyProvider: Send + Sync {
const NAME: &str;

Expand Down
34 changes: 17 additions & 17 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub async fn handshake_and_create_server_session(
) -> Result<(ServerSession, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
let mut hs = Handshake::new(PeerType::Server);
let mut buf = [0u8; 4096];
let mut leftover = Vec::new();

loop {
let n = stream.read(&mut buf).await?;
Expand All @@ -30,23 +29,24 @@ pub async fn handshake_and_create_server_session(
if !response_bytes.is_empty() {
stream.write_all(&response_bytes).await?;
}
leftover = remaining_bytes;
break;
}
}
}
return Ok((
{
// Reduce latency: use smaller chunk size and smaller ack window to have quicker acks
let mut config = ServerSessionConfig::new();
config.chunk_size = 128; // smaller chunks -> lower per-chunk latency (tradeoff CPU)
config.window_ack_size = 262_144; // 256KB ack window to get more frequent acks

// Reduce latency: use smaller chunk size and smaller ack window to have quicker acks
let mut config = ServerSessionConfig::new();
config.chunk_size = 128; // smaller chunks -> lower per-chunk latency (tradeoff CPU)
config.window_ack_size = 262_144; // 256KB ack window to get more frequent acks

let (server_session, initial_results) = ServerSession::new(config)?;
for res in initial_results {
if let ServerSessionResult::OutboundResponse(packet) = res {
stream.write_all(&packet.bytes).await?;
let (server_session, initial_results) = ServerSession::new(config)?;
for res in initial_results {
if let ServerSessionResult::OutboundResponse(packet) = res {
stream.write_all(&packet.bytes).await?;
}
}
server_session
},
remaining_bytes,
));
}
}
}

Ok((server_session, leftover))
}