diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 9fd45e0..b630e0b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -1,4 +1,4 @@ -name: Rust +name: CI on: push: @@ -10,13 +10,20 @@ env: CARGO_TERM_COLOR: always jobs: - build: - + pre-commit: + name: Pre-commit checks runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - uses: pre-commit/action@v3.0.1 + test: + name: Tests + runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Build - run: cargo build --verbose - name: Run tests run: cargo test --verbose diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 5a063f4..0580194 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -20,7 +20,7 @@ stages: build: stage: build - image: + image: name: ghcr.io/pyo3/maturin:latest entrypoint: [""] environment: @@ -42,7 +42,7 @@ build: publish: stage: publish needs: ["build"] - image: + image: name: ghcr.io/pyo3/maturin:main entrypoint: [""] variables: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..3716811 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,37 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v6.0.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-toml + - id: check-merge-conflict + + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.15.1 + hooks: + - id: ruff-check + - id: ruff-format + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.19.1 + hooks: + - id: mypy + args: [--config-file, pyreccaster/pyproject.toml] + files: ^pyreccaster/python/ + + - repo: local + hooks: + - id: cargo-fmt + name: cargo fmt + language: system + entry: cargo fmt --all -- --check + pass_filenames: false + types: [rust] + - id: cargo-clippy + name: cargo clippy + language: system + entry: cargo clippy --workspace --all-targets + pass_filenames: false + types: [rust] diff --git a/Cargo.toml b/Cargo.toml index b52d75b..9f5d9e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,3 +2,9 @@ members = [ "examples/*", "pyreccaster","reccaster", "wire"] default-members = ["pyreccaster", "reccaster", "wire"] resolver = "2" + +[workspace.lints.rust] +missing_docs = "deny" + +[workspace.lints.clippy] +all = { level = "deny", priority = -1 } diff --git a/README.md b/README.md index 4765f9b..c387644 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ # Recsync-rs -A rust implementation of [recsync](https://github.com/ChannelFinder/recsync) protocl with python bindings.Aiming for bug to bug compatibility with current implementation of RecCeiver. +A rust implementation of [recsync](https://github.com/ChannelFinder/recsync) protocl with python bindings.Aiming for bug to bug compatibility with current implementation of RecCeiver. See the [recsync](https://github.com/ChannelFinder/recsync) original repository for details about the protocol and theory of operation. -## Project status -The project initially would implement only **ReCaster** in Rust with Python binding to be used along with [p4p](https://github.com/mdavidsaver/p4p). -**RecCeiver** is not implemented yet. Recsync-rs is split into different sections. First part is `wire` which implements only the protocol definition, encoders and decoders. -It used by **ReCaster** and **RecCeiver** (not implemented yet). Second part is `reccaster` which is **ReCaster** implementation, as it will be used as rust library. +## Project status +The project initially would implement only **ReCaster** in Rust with Python binding to be used along with [p4p](https://github.com/mdavidsaver/p4p). +**RecCeiver** is not implemented yet. Recsync-rs is split into different sections. First part is `wire` which implements only the protocol definition, encoders and decoders. +It used by **ReCaster** and **RecCeiver** (not implemented yet). Second part is `reccaster` which is **ReCaster** implementation, as it will be used as rust library. Finally, `pyreccaster` is a [pyo3](https://github.com/PyO3/pyo3) Rust-wrapped Python library of `reccaster`. ### RecCaster functionality @@ -17,7 +17,7 @@ Finally, `pyreccaster` is a [pyo3](https://github.com/PyO3/pyo3) Rust-wrapped Py * [X] Add Info * [ ] Delete Record -## Usage Example +## Usage Example Using Rust ```rust @@ -78,7 +78,7 @@ if __name__ == "__main__": ## Requirements * Rust 1.54.0 or later * Python 3.7 or later -* [Maturin](https://github.com/PyO3/maturin) +* [Maturin](https://github.com/PyO3/maturin) ## Build and Installation @@ -99,7 +99,7 @@ pip install maturin cd pyreccaster maturin build # to install the python bindings -pip install . +pip install . ``` ### Cross-Compile Python bindings for Windows diff --git a/examples/basic-reccaster/Cargo.toml b/examples/basic-reccaster/Cargo.toml index fca5496..2423254 100644 --- a/examples/basic-reccaster/Cargo.toml +++ b/examples/basic-reccaster/Cargo.toml @@ -3,6 +3,9 @@ name = "basic-reccaster" version = "0.1.0" edition = "2021" +[lints] +workspace = true + [dependencies] tokio = { version = "1", features = ["full"] } tracing = "^0.1.41" diff --git a/examples/basic-reccaster/src/main.rs b/examples/basic-reccaster/src/main.rs index 195dc27..63dd836 100644 --- a/examples/basic-reccaster/src/main.rs +++ b/examples/basic-reccaster/src/main.rs @@ -5,20 +5,23 @@ // You must comply with both licenses to use, modify, or distribute this software. // See the LICENSE file for details. -use std::collections::HashMap; -use tracing_subscriber; +#![allow(missing_docs)] use reccaster::{record::Record, Reccaster}; +use std::collections::HashMap; #[tokio::main] async fn main() { - - tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init(); + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .init(); let mut record = Record::new("DEV:RECASTER:RUST".to_string(), "ai".to_string()); - record.properties.insert("recordDesc".to_string(), "Rust Recaster".to_string()); + record + .properties + .insert("recordDesc".to_string(), "Rust Recaster".to_string()); let records: Vec = vec![record]; - let mut props: HashMap = HashMap::new(); + let mut props: HashMap = HashMap::new(); props.insert("ENGINEER".into(), "Rust Recaster".into()); props.insert("HOSTNAME".into(), "Example-Host-Machine".into()); diff --git a/pyreccaster/Cargo.toml b/pyreccaster/Cargo.toml index 4856bda..71f71f1 100644 --- a/pyreccaster/Cargo.toml +++ b/pyreccaster/Cargo.toml @@ -6,6 +6,10 @@ authors = ["Aqeel AlShafei "] license = "MIT AND BSD-3-Clause" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lints] +workspace = true + [lib] name = "pyreccaster" crate-type = ["cdylib"] diff --git a/pyreccaster/pyproject.toml b/pyreccaster/pyproject.toml index cc82e51..59f1e3f 100644 --- a/pyreccaster/pyproject.toml +++ b/pyreccaster/pyproject.toml @@ -33,6 +33,10 @@ classifiers = [ tests = [ "pytest", ] +lint = [ + "ruff", + "mypy", +] dynamic = ["version"] [tool.maturin] @@ -40,3 +44,15 @@ profile = "release" python-source = "python" compatibility = "linux" features = ["pyo3/extension-module"] + +[tool.ruff] +target-version = "py38" +src = ["python"] +exclude = ["test.py"] + +[tool.ruff.lint] +select = ["E", "F", "I", "UP", "B", "RUF"] + +[tool.mypy] +strict = true +ignore_missing_imports = true diff --git a/pyreccaster/python/pyreccaster/__init__.py b/pyreccaster/python/pyreccaster/__init__.py index 0f0c48c..7371688 100644 --- a/pyreccaster/python/pyreccaster/__init__.py +++ b/pyreccaster/python/pyreccaster/__init__.py @@ -1,5 +1,5 @@ -from .pyreccaster import * - +from . import pyreccaster +from .pyreccaster import * # noqa: F403 __doc__ = pyreccaster.__doc__ if hasattr(pyreccaster, "__all__"): diff --git a/pyreccaster/python/tests/test_all.py b/pyreccaster/python/tests/test_all.py index e9f2adb..9524be3 100644 --- a/pyreccaster/python/tests/test_all.py +++ b/pyreccaster/python/tests/test_all.py @@ -1,6 +1,5 @@ -import pytest import pyreccaster -def test_sum_as_string(): - assert pyreccaster.sum_as_string(1, 1) == "2" +def test_sum_as_string() -> None: + assert pyreccaster.sum_as_string(1, 1) == "2" # type: ignore[attr-defined] diff --git a/pyreccaster/src/lib.rs b/pyreccaster/src/lib.rs index c636c59..f4e1838 100644 --- a/pyreccaster/src/lib.rs +++ b/pyreccaster/src/lib.rs @@ -5,13 +5,14 @@ // You must comply with both licenses to use, modify, or distribute this software. // See the LICENSE file for details. +#![allow(missing_docs)] use std::{collections::HashMap, sync::Arc}; use pyo3::prelude::*; use pyo3_async_runtimes::tokio::future_into_py_with_locals; -use reccaster::{Record, Reccaster}; +use reccaster::{Reccaster, Record}; use tokio::sync::Mutex; - + #[pyclass] pub struct PyRecord(Record); @@ -19,8 +20,18 @@ pub struct PyRecord(Record); impl PyRecord { #[new] #[pyo3(signature = (name, r#type, alias=None, properties=HashMap::new()))] - fn new(name: String, r#type: String, alias: Option, properties: HashMap) -> Self { - PyRecord(Record { name, r#type, alias, properties }) + fn new( + name: String, + r#type: String, + alias: Option, + properties: HashMap, + ) -> Self { + PyRecord(Record { + name, + r#type, + alias, + properties, + }) } #[getter] @@ -42,16 +53,23 @@ impl PyRecord { fn properties(&self) -> PyResult> { Ok(self.0.properties.clone()) } - } impl FromPyObject<'_> for PyRecord { fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult { - let name: String = ob.getattr("name")?.extract().unwrap_or_else(|_| "OPS no name !!!!!!!!!!!".to_string()); + let name: String = ob + .getattr("name")? + .extract() + .unwrap_or_else(|_| "OPS no name !!!!!!!!!!!".to_string()); let r#type: String = ob.getattr("type")?.extract()?; let alias: Option = ob.getattr("alias")?.extract()?; let properties: HashMap = ob.getattr("properties")?.extract()?; - Ok(PyRecord (Record { name, r#type, alias, properties })) + Ok(PyRecord(Record { + name, + r#type, + alias, + properties, + })) } } @@ -62,14 +80,22 @@ struct PyReccaster { #[pymethods] impl PyReccaster { - #[staticmethod] - fn setup(py: Python, records: Vec, props: Option>) -> PyResult> { + fn setup( + py: Python<'_>, + records: Vec, + props: Option>, + ) -> PyResult> { let locals = pyo3_async_runtimes::tokio::get_current_locals(py)?; - let pvs = records.iter().map(|record: &PyRecord| record.0.clone()).collect::>(); + let pvs = records + .iter() + .map(|record: &PyRecord| record.0.clone()) + .collect::>(); future_into_py_with_locals(py, locals, async move { let recc = Reccaster::new(pvs, props).await; - let pyrecc = PyReccaster { reccaster: Arc::new(Mutex::new(recc)) }; + let pyrecc = PyReccaster { + reccaster: Arc::new(Mutex::new(recc)), + }; Python::with_gil(|_py| Ok(pyrecc)) }) } diff --git a/reccaster/Cargo.toml b/reccaster/Cargo.toml index 832fff9..2600ded 100644 --- a/reccaster/Cargo.toml +++ b/reccaster/Cargo.toml @@ -7,6 +7,9 @@ license = "MIT AND BSD-3-Clause" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lints] +workspace = true + [dependencies] tokio = { version = "^1.36", features = ["full"] } tokio-util = { version = "^0.7.11", features = ["full"] } diff --git a/reccaster/src/lib.rs b/reccaster/src/lib.rs index 1cb5e95..aaba935 100644 --- a/reccaster/src/lib.rs +++ b/reccaster/src/lib.rs @@ -5,17 +5,29 @@ // You must comply with both licenses to use, modify, or distribute this software. // See the LICENSE file for details. +//! Client library for the RecSync protocol, used to register EPICS PV records +//! with a RecSync server over TCP. + +/// Record type definitions. pub mod record; pub use self::record::Record; -use std::{collections::HashMap, io, net::{IpAddr, Ipv4Addr, SocketAddr}}; -use tokio::{net::{UdpSocket, TcpStream}, io::Interest}; +use futures::SinkExt; +use std::{ + collections::HashMap, + io, + net::{IpAddr, Ipv4Addr, SocketAddr}, +}; +use tokio::{ + io::Interest, + net::{TcpStream, UdpSocket}, +}; +use tokio_stream::StreamExt; use tokio_util::codec::Framed; use tracing::{debug, error, info}; use wire::{Announcement, Message, MessageCodec, MSG_MAGIC_ID}; -use tokio_stream::StreamExt; -use futures::SinkExt; +/// An active RecSync caster that announces PV records to a RecSync server. pub struct Reccaster { udpsock: UdpSocket, framed: Option>, @@ -33,13 +45,28 @@ enum CasterState { } impl Reccaster { - + /// Create a new `Reccaster` that will register `records` with optional client + /// properties `props` once a RecSync server is discovered. pub async fn new(records: Vec, props: Option>) -> Reccaster { - let sock = UdpSocket::bind(format!("0.0.0.0:{}", wire::SERVER_ANNOUNCEMENT_UDP_PORT)).await.unwrap(); - debug!("listening for announcement messages at {}", wire::SERVER_ANNOUNCEMENT_UDP_PORT); - Self { udpsock: sock, framed: None, buf: [0; 1024], pvs: records, props: props, state: CasterState::Announcement } + let sock = UdpSocket::bind(format!("0.0.0.0:{}", wire::SERVER_ANNOUNCEMENT_UDP_PORT)) + .await + .unwrap(); + debug!( + "listening for announcement messages at {}", + wire::SERVER_ANNOUNCEMENT_UDP_PORT + ); + Self { + udpsock: sock, + framed: None, + buf: [0; 1024], + pvs: records, + props, + state: CasterState::Announcement, + } } + /// Run the caster indefinitely, cycling through discovery, handshake, upload, + /// and keepalive phases as the connection state changes. pub async fn run(&mut self) { loop { match self.state { @@ -58,12 +85,17 @@ impl Reccaster { Ok((len, addr)) => { if len >= 16 { let msg = Self::parse_announcement_message(&self.buf[..len], addr).unwrap(); - info!("Received announcement message: {:?}:{:?} with key:{:?} from: {:?}", msg.server_addr, msg.server_port, msg.server_key, addr); + info!( + "Received announcement message: {:?}:{:?} with key:{:?} from: {:?}", + msg.server_addr, msg.server_port, msg.server_key, addr + ); self.state = CasterState::Handshake(msg); } - }, - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { return; }, - Err(err) => { error!("{:?}", err) } + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {} + Err(err) => { + error!("{:?}", err) + } }; } } @@ -73,26 +105,29 @@ impl Reccaster { let addr = msg.server_addr; let port = msg.server_port; let key = msg.server_key; - // @TODO handle connection errors - let stream = TcpStream::connect(format!("{}:{}", addr, port)).await.map_err(|err| error!("{:?}",err)).unwrap(); + // @TODO handle connection errors + let stream = TcpStream::connect(format!("{}:{}", addr, port)) + .await + .map_err(|err| error!("{:?}", err)) + .unwrap(); info!("connect to {:?}:{}", addr, port); let codec = MessageCodec; let framed = Framed::new(stream, codec); self.framed = Some(framed); - if let Some(framed) = &mut self.framed { - while let Some(msg) = framed.next().await { + if let Some(framed) = &mut self.framed { + if let Some(msg) = framed.next().await { match msg.unwrap() { Message::ServerGreet(_) => { - let _ = framed.send(Message::ClientGreet(wire::ClientGreet { serv_key: key })).await; + let _ = framed + .send(Message::ClientGreet(wire::ClientGreet { serv_key: key })) + .await; debug!("Greet Message with server key: {}", key); self.state = CasterState::Upload; - return; - }, + } _ => { self.state = CasterState::Announcement; - return; - }, + } } } } @@ -103,32 +138,56 @@ impl Reccaster { if let CasterState::Upload = &mut self.state { if let Some(framed) = &mut self.framed { for (i, record) in self.pvs.iter().enumerate() { - let recid: u32 = i as u32 + 100; + let recid: u32 = i as u32 + 100; // AddRecord Message let record_name = &record.name; let record_type = &record.r#type; - let msg = Message::AddRecord(wire::AddRecord { recid: recid, atype: wire::AddRecordType::Record as u8, rtlen: record_type.len() as u8, rnlen: record_name.len() as u16, - rtype: record_type.to_string(), rname: record_name.to_string() }); + let msg = Message::AddRecord(wire::AddRecord { + recid, + atype: wire::AddRecordType::Record as u8, + rtlen: record_type.len() as u8, + rnlen: record_name.len() as u16, + rtype: record_type.to_string(), + rname: record_name.to_string(), + }); let _ = framed.send(msg.clone()).await; debug!("Sending AddRecord Message: {:?}", msg); // AddRecord alias Message if avaliable if let Some(record_alias) = &record.alias { - let msg = Message::AddRecord(wire::AddRecord { recid: recid, atype: wire::AddRecordType::Alias as u8, rtlen: record_type.len() as u8, rnlen: record_alias.len() as u16, - rtype: record_type.to_string(), rname: record_alias.to_string() }); + let msg = Message::AddRecord(wire::AddRecord { + recid, + atype: wire::AddRecordType::Alias as u8, + rtlen: record_type.len() as u8, + rnlen: record_alias.len() as u16, + rtype: record_type.to_string(), + rname: record_alias.to_string(), + }); let _ = framed.send(msg.clone()).await; }; // AddInfo Message // Send Client Properties if let Some(props) = &self.props { for (key, value) in props { - let msg: Message = Message::AddInfo(wire::AddInfo { recid: 0, keylen: key.len() as u8, valen: value.len() as u16, key: key.to_string(), value: value.to_string() }); - let _ = framed.send(msg.clone()).await; - debug!("Sending AddInfo Message: {:?}", msg.clone()); + let msg: Message = Message::AddInfo(wire::AddInfo { + recid: 0, + keylen: key.len() as u8, + valen: value.len() as u16, + key: key.to_string(), + value: value.to_string(), + }); + let _ = framed.send(msg.clone()).await; + debug!("Sending AddInfo Message: {:?}", msg.clone()); } } // Send Record Properties for (key, value) in &record.properties { - let msg = Message::AddInfo(wire::AddInfo { recid: recid, keylen: key.len() as u8, valen: value.len() as u16, key: key.to_string(), value: value.to_string() }); + let msg = Message::AddInfo(wire::AddInfo { + recid, + keylen: key.len() as u8, + valen: value.len() as u16, + key: key.to_string(), + value: value.to_string(), + }); let _ = framed.send(msg.clone()).await; debug!("Sending AddInfo Message: {:?}", msg.clone()); } @@ -145,19 +204,23 @@ impl Reccaster { if let Some(framed) = &mut self.framed { while let Some(msg_result) = framed.next().await { match msg_result { - Ok(msg) => { - match msg { - Message::Ping(ping_msg) => { - info!("received ping with nonce: {}", ping_msg.nonce); - if let Err(_) = framed.send(Message::Pong(wire::Pong { nonce: ping_msg.nonce })).await { - self.state = CasterState::Announcement; - return; - } - }, - _ => { + Ok(msg) => match msg { + Message::Ping(ping_msg) => { + info!("received ping with nonce: {}", ping_msg.nonce); + if framed + .send(Message::Pong(wire::Pong { + nonce: ping_msg.nonce, + })) + .await + .is_err() + { self.state = CasterState::Announcement; return; - }, + } + } + _ => { + self.state = CasterState::Announcement; + return; } }, Err(_) => { @@ -165,17 +228,19 @@ impl Reccaster { return; } } - } + } self.state = CasterState::Announcement; - return; } } } - fn parse_announcement_message(data: &[u8], src_addr: SocketAddr) -> Result { + fn parse_announcement_message( + data: &[u8], + src_addr: SocketAddr, + ) -> Result { let id = u16::from_be_bytes([data[0], data[1]]); // Checking if the ID is 'RC' - if id != MSG_MAGIC_ID { + if id != MSG_MAGIC_ID { return Err("Invalid ID"); } @@ -195,8 +260,12 @@ impl Reccaster { if server_addr.is_broadcast() { match src_addr.ip() { - IpAddr::V4(addr) => { server_addr = addr; }, - IpAddr::V6(_) => { unimplemented!("IPv6 is not supported") }, + IpAddr::V4(addr) => { + server_addr = addr; + } + IpAddr::V6(_) => { + unimplemented!("IPv6 is not supported") + } } } diff --git a/reccaster/src/record.rs b/reccaster/src/record.rs index 2808242..04b208a 100644 --- a/reccaster/src/record.rs +++ b/reccaster/src/record.rs @@ -7,17 +7,28 @@ use std::collections::HashMap; +/// Represents a single PV (Process Variable) record to be registered with the server. #[derive(Debug, Clone)] pub struct Record { + /// The PV name (e.g. `"DEV:AI:1"`). pub name: String, + /// The EPICS record type (e.g. `"ai"`, `"bo"`). pub r#type: String, + /// An optional alias name for this record. pub alias: Option, - pub properties: HashMap + /// Arbitrary key-value metadata attached to this record. + pub properties: HashMap, } impl Record { + /// Create a new record with the given name and type, no alias, and empty properties. pub fn new(name: String, r#type: String) -> Record { let map: HashMap = HashMap::new(); - Record { name, r#type, alias: None, properties: map} + Record { + name, + r#type, + alias: None, + properties: map, + } } -} \ No newline at end of file +} diff --git a/wire/Cargo.toml b/wire/Cargo.toml index 6489823..58e2985 100644 --- a/wire/Cargo.toml +++ b/wire/Cargo.toml @@ -7,6 +7,9 @@ license = "MIT AND BSD-3-Clause" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lints] +workspace = true + [dependencies] byteorder = "1" bytes = "1" diff --git a/wire/src/codec.rs b/wire/src/codec.rs index c88c0ab..de6b96e 100644 --- a/wire/src/codec.rs +++ b/wire/src/codec.rs @@ -26,20 +26,28 @@ impl Encoder for MessageCodec { fn encode(&mut self, msg: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { match msg { Message::ClientGreet(msg) => { - let header = MessageHeader::new(MessageID::ClientGreet.into(), (size_of::() + size_of::())as u32); + let header = MessageHeader::new( + MessageID::ClientGreet.into(), + (size_of::() + size_of::()) as u32, + ); dst.put(header.as_bytes()); dst.put_u32(0); // Padding dst.put_u32(msg.serv_key); Ok(()) - }, + } Message::Pong(msg) => { let header = MessageHeader::new(MessageID::Pong as u16, size_of::() as u32); dst.put(header.as_bytes()); dst.put_u32(msg.nonce); Ok(()) - }, + } Message::AddRecord(msg) => { - let len = (size_of::() + size_of::() + size_of::() + size_of::() + msg.rtype.len() + msg.rname.len()) as u32; + let len = (size_of::() + + size_of::() + + size_of::() + + size_of::() + + msg.rtype.len() + + msg.rname.len()) as u32; let header = MessageHeader::new(MessageID::AddRecord.into(), len); dst.put_u16(header.id); dst.put_u16(header.msg_id); @@ -51,10 +59,15 @@ impl Encoder for MessageCodec { dst.put_slice(msg.rtype.as_bytes()); dst.put_slice(msg.rname.as_bytes()); Ok(()) - }, + } Message::DelRecord(_) => todo!(), Message::AddInfo(msg) => { - let len = (size_of::() + size_of::() + size_of::() + size_of::() + msg.key.len() + msg.value.len()) as u32; + let len = (size_of::() + + size_of::() + + size_of::() + + size_of::() + + msg.key.len() + + msg.value.len()) as u32; let header = MessageHeader::new(MessageID::AddInfo.into(), len); dst.put_u16(header.id); dst.put_u16(header.msg_id); @@ -66,15 +79,20 @@ impl Encoder for MessageCodec { dst.put_slice(msg.key.as_bytes()); dst.put_slice(msg.value.as_bytes()); Ok(()) - }, + } Message::UploadDone(_) => { - let header = MessageHeader::new(MessageID::UploadDone.into(), size_of::() as u32); + let header = + MessageHeader::new(MessageID::UploadDone.into(), size_of::() as u32); dst.put(header.as_bytes()); dst.put_u32(0); Ok(()) - }, - Message::Ping(_) => unimplemented!("Recceiver related messages are not implemented yet."), - Message::ServerGreet(_) => unimplemented!("Recceiver related messages are not implemented yet.") + } + Message::Ping(_) => { + unimplemented!("Recceiver related messages are not implemented yet.") + } + Message::ServerGreet(_) => { + unimplemented!("Recceiver related messages are not implemented yet.") + } } } } @@ -93,7 +111,7 @@ impl Decoder for MessageCodec { let id = src.get_u16(); let msg_id = src.get_u16(); let len = src.get_u32() as usize; - + // Checking if the ID is 'RC' if id != MSG_MAGIC_ID { return Ok(None); @@ -113,13 +131,25 @@ impl Decoder for MessageCodec { MessageID::Ping => { let nonce = src.get_u32(); Ok(Some(Message::Ping(Ping { nonce }))) - }, - MessageID::ClientGreet => unimplemented!("Recceiver related messages are not implemented yet."), - MessageID::Pong => unimplemented!("Recceiver related messages are not implemented yet."), - MessageID::AddRecord => unimplemented!("Recceiver related messages are not implemented yet."), - MessageID::DelRecord => unimplemented!("Recceiver related messages are not implemented yet."), - MessageID::UploadDone => unimplemented!("Recceiver related messages are not implemented yet."), - MessageID::AddInfo => unimplemented!("Recceiver related messages are not implemented yet."), + } + MessageID::ClientGreet => { + unimplemented!("Recceiver related messages are not implemented yet.") + } + MessageID::Pong => { + unimplemented!("Recceiver related messages are not implemented yet.") + } + MessageID::AddRecord => { + unimplemented!("Recceiver related messages are not implemented yet.") + } + MessageID::DelRecord => { + unimplemented!("Recceiver related messages are not implemented yet.") + } + MessageID::UploadDone => { + unimplemented!("Recceiver related messages are not implemented yet.") + } + MessageID::AddInfo => { + unimplemented!("Recceiver related messages are not implemented yet.") + } } } } diff --git a/wire/src/header.rs b/wire/src/header.rs index 4e7469c..d9e4fbe 100644 --- a/wire/src/header.rs +++ b/wire/src/header.rs @@ -5,20 +5,29 @@ // You must comply with both licenses to use, modify, or distribute this software. // See the LICENSE file for details. +use crate::MSG_MAGIC_ID; use bytes::{BufMut, BytesMut}; use std::mem::size_of; -use crate::MSG_MAGIC_ID; +/// Fixed 8-byte header that precedes every wire protocol message. #[derive(Debug, Clone, PartialEq)] pub struct MessageHeader { + /// Magic identifier (`MSG_MAGIC_ID`, ASCII "RC"). pub id: u16, + /// Message type identifier. pub msg_id: u16, + /// Length of the message body in bytes. pub len: u32, } impl MessageHeader { + /// Create a new header with the given message type and body length. pub fn new(msg_id: u16, len: u32) -> MessageHeader { - MessageHeader { id: MSG_MAGIC_ID, msg_id, len } + MessageHeader { + id: MSG_MAGIC_ID, + msg_id, + len, + } } /// Return Header as BytesMut diff --git a/wire/src/lib.rs b/wire/src/lib.rs index 77ee183..58f4735 100644 --- a/wire/src/lib.rs +++ b/wire/src/lib.rs @@ -5,10 +5,12 @@ // You must comply with both licenses to use, modify, or distribute this software. // See the LICENSE file for details. -mod header; +//! Wire protocol types, codec, and constants for the RecSync protocol. + mod codec; +mod header; mod types; -pub use types::*; pub use codec::*; pub use header::*; +pub use types::*; diff --git a/wire/src/types.rs b/wire/src/types.rs index b315086..affdaf2 100644 --- a/wire/src/types.rs +++ b/wire/src/types.rs @@ -7,32 +7,46 @@ use std::net::Ipv4Addr; -/// AddRecord message type +/// AddRecord message type discriminant. pub enum AddRecordType { + /// A regular PV record. Record = 0, + /// An alias for an existing record. Alias = 1, } -/// UDP Announcement message structure +/// UDP Announcement message structure. #[derive(Debug)] pub struct Announcement { + /// Magic ID identifying this as a RecSync announcement. pub id: u16, + /// IPv4 address of the announcing server. pub server_addr: Ipv4Addr, + /// TCP port the server is listening on. pub server_port: u16, + /// Server-generated session key. pub server_key: u32, } -/// Messages ID +/// Message type identifiers used in the wire protocol header. #[derive(Copy, Clone)] #[repr(u16)] pub enum MessageID { + /// Server greeting sent after a client connects. ServerGreet = 0x8001, + /// Client greeting sent in response to a server greeting. ClientGreet = 0x0001, + /// Keepalive ping sent by the server. Ping = 0x8002, + /// Keepalive pong sent by the client in response to a ping. Pong = 0x0002, + /// Adds a PV record to the server's database. AddRecord = 0x0003, + /// Removes a PV record from the server's database. DelRecord = 0x0004, + /// Signals that the client has finished uploading records. UploadDone = 0x0005, + /// Attaches metadata to a record. AddInfo = 0x0006, } @@ -69,59 +83,91 @@ impl From for u16 { // Define all the message structs and enums here +/// Server greeting payload (no additional fields). #[derive(Debug, Clone, PartialEq)] pub struct ServerGreet; +/// Keepalive ping payload. #[derive(Debug, Clone, PartialEq)] pub struct Ping { + /// Random nonce that the client must echo back in the Pong. pub nonce: u32, } +/// Client greeting payload. #[derive(Debug, Clone, PartialEq)] pub struct ClientGreet { + /// Server key received in the UDP announcement. pub serv_key: u32, } +/// Keepalive pong payload. #[derive(Debug, Clone, PartialEq)] pub struct Pong { + /// Nonce copied from the corresponding Ping. pub nonce: u32, } +/// Payload for registering a PV record or alias on the server. #[derive(Debug, Clone, PartialEq)] pub struct AddRecord { + /// Record identifier assigned by the client. pub recid: u32, + /// Record type discriminant (`AddRecordType::Record` or `AddRecordType::Alias`). pub atype: u8, + /// Length of the record type string in bytes. pub rtlen: u8, + /// Length of the record name string in bytes. pub rnlen: u16, + /// Record type string (e.g. `"ai"`). pub rtype: String, + /// Record name or alias string. pub rname: String, } +/// Payload for removing a previously registered PV record. #[derive(Debug, Clone, PartialEq)] pub struct DelRecord { + /// Record identifier to remove. pub recid: u32, } +/// Payload signalling that the client has finished uploading records. #[derive(Debug, Clone, PartialEq)] pub struct UploadDone; +/// Payload for attaching a key-value metadata entry to a record. #[derive(Debug, Clone, PartialEq)] pub struct AddInfo { + /// Record identifier this info belongs to (0 for client-level info). pub recid: u32, + /// Length of the key string in bytes. pub keylen: u8, + /// Length of the value string in bytes. pub valen: u16, + /// Metadata key. pub key: String, + /// Metadata value. pub value: String, } +/// All messages that can be exchanged over the wire. #[derive(Debug, Clone, PartialEq)] pub enum Message { + /// Server greeting. ServerGreet(ServerGreet), + /// Keepalive ping from the server. Ping(Ping), + /// Client greeting. ClientGreet(ClientGreet), + /// Keepalive pong from the client. Pong(Pong), + /// Add a PV record or alias. AddRecord(AddRecord), + /// Remove a PV record. DelRecord(DelRecord), + /// Signal end of record upload. UploadDone(UploadDone), + /// Attach metadata to a record. AddInfo(AddInfo), }