Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions foreign/cpp/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ fn main() {
println!("cargo:rerun-if-changed=src/identifier.rs");
println!("cargo:rerun-if-changed=src/lib.rs");
println!("cargo:rerun-if-changed=src/stream.rs");
println!("cargo:rerun-if-changed=src/system.rs");
println!("cargo:rerun-if-changed=src/topic.rs");
}
36 changes: 36 additions & 0 deletions foreign/cpp/include/iggy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,42 @@ class CompressionAlgorithm final {
std::string algorithm_;
};

class SnapshotCompression final {
public:
static SnapshotCompression stored() { return SnapshotCompression("stored"); }
static SnapshotCompression deflated() { return SnapshotCompression("deflated"); }
static SnapshotCompression bzip2() { return SnapshotCompression("bzip2"); }
static SnapshotCompression zstd() { return SnapshotCompression("zstd"); }
static SnapshotCompression lzma() { return SnapshotCompression("lzma"); }
static SnapshotCompression xz() { return SnapshotCompression("xz"); }

std::string_view snapshot_compression_value() const { return snapshot_compression_; }

private:
explicit SnapshotCompression(std::string snapshot_compression)
: snapshot_compression_(std::move(snapshot_compression)) {}

std::string snapshot_compression_;
};

class SnapshotType final {
public:
static SnapshotType filesystem_overview() { return SnapshotType("filesystem_overview"); }
static SnapshotType process_list() { return SnapshotType("process_list"); }
static SnapshotType resource_usage() { return SnapshotType("resource_usage"); }
static SnapshotType test() { return SnapshotType("test"); }
static SnapshotType server_logs() { return SnapshotType("server_logs"); }
static SnapshotType server_config() { return SnapshotType("server_config"); }
static SnapshotType all() { return SnapshotType("all"); }

std::string_view snapshot_type_value() const { return snapshot_type_; }

private:
explicit SnapshotType(std::string snapshot_type) : snapshot_type_(std::move(snapshot_type)) {}

std::string snapshot_type_;
};

class Expiry final {
public:
static Expiry server_default() { return Expiry("server_default", 0); }
Expand Down
195 changes: 194 additions & 1 deletion foreign/cpp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,112 @@ use iggy::prelude::{
Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm,
ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as RustIggyClient,
IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry,
MaxTopicSize as RustMaxTopicSize, PartitionClient, StreamClient, TopicClient, UserClient,
MaxTopicSize as RustMaxTopicSize, PartitionClient,
SnapshotCompression as RustSnapshotCompression, StreamClient, SystemClient as RustSystemClient,
SystemSnapshotType as RustSystemSnapshotType, TopicClient, UserClient,
};
use iggy_common::{
CacheMetrics as RustCacheMetrics, CacheMetricsKey as RustCacheMetricsKey,
ClientInfo as RustClientInfo, ClientInfoDetails as RustClientInfoDetails, Stats as RustStats,
};
use std::convert::TryFrom;
use std::str::FromStr;
use std::sync::Arc;

impl From<RustClientInfo> for ffi::ClientInfo {
fn from(client: RustClientInfo) -> Self {
ffi::ClientInfo {
client_id: client.client_id,
// TODO(slbotbm): In high-level client, this should be converted to None.
user_id: client.user_id.unwrap_or(u32::MAX),
address: client.address,
transport: client.transport,
consumer_groups_count: client.consumer_groups_count,
}
}
}

impl From<RustClientInfoDetails> for ffi::ClientInfoDetails {
fn from(client: RustClientInfoDetails) -> Self {
ffi::ClientInfoDetails {
client_id: client.client_id,
// TODO(slbotbm): In high-level client, this should be converted to None.
user_id: client.user_id.unwrap_or(u32::MAX),
address: client.address,
transport: client.transport,
consumer_groups_count: client.consumer_groups_count,
consumer_groups: client
.consumer_groups
.into_iter()
.map(ffi::ConsumerGroupInfo::from)
.collect(),
}
}
}

impl TryFrom<Option<RustClientInfoDetails>> for ffi::ClientInfoDetails {
type Error = String;

fn try_from(client: Option<RustClientInfoDetails>) -> Result<Self, Self::Error> {
match client {
Some(client) => Ok(ffi::ClientInfoDetails::from(client)),
None => Err("client not found".to_string()),
}
}
}

impl From<(RustCacheMetricsKey, RustCacheMetrics)> for ffi::CacheMetricEntry {
fn from((key, metrics): (RustCacheMetricsKey, RustCacheMetrics)) -> Self {
ffi::CacheMetricEntry {
stream_id: key.stream_id,
topic_id: key.topic_id,
partition_id: key.partition_id,
hits: metrics.hits,
misses: metrics.misses,
hit_ratio: metrics.hit_ratio,
}
}
}

impl From<RustStats> for ffi::Stats {
fn from(stats: RustStats) -> Self {
ffi::Stats {
process_id: stats.process_id,
cpu_usage: stats.cpu_usage,
total_cpu_usage: stats.total_cpu_usage,
memory_usage: stats.memory_usage.as_bytes_u64(),
total_memory: stats.total_memory.as_bytes_u64(),
available_memory: stats.available_memory.as_bytes_u64(),
run_time: stats.run_time.as_micros(),
start_time: stats.start_time.as_micros(),
read_bytes: stats.read_bytes.as_bytes_u64(),
written_bytes: stats.written_bytes.as_bytes_u64(),
messages_size_bytes: stats.messages_size_bytes.as_bytes_u64(),
streams_count: stats.streams_count,
topics_count: stats.topics_count,
partitions_count: stats.partitions_count,
segments_count: stats.segments_count,
messages_count: stats.messages_count,
clients_count: stats.clients_count,
consumer_groups_count: stats.consumer_groups_count,
hostname: stats.hostname,
os_name: stats.os_name,
os_version: stats.os_version,
kernel_version: stats.kernel_version,
iggy_server_version: stats.iggy_server_version,
iggy_server_semver: stats.iggy_server_semver.unwrap_or(u32::MAX),
cache_metrics: stats
.cache_metrics
.into_iter()
.map(ffi::CacheMetricEntry::from)
.collect(),
threads_count: stats.threads_count,
free_disk_space: stats.free_disk_space.as_bytes_u64(),
total_disk_space: stats.total_disk_space.as_bytes_u64(),
}
}
}

pub struct Client {
pub inner: Arc<RustIggyClient>,
}
Expand Down Expand Up @@ -378,6 +479,98 @@ impl Client {
Ok(())
})
}

pub fn get_stats(&self) -> Result<ffi::Stats, String> {
RUNTIME.block_on(async {
let stats = self
.inner
.get_stats()
.await
.map_err(|error| format!("Could not get stats: {error}"))?;
Ok(ffi::Stats::from(stats))
})
}

pub fn get_me(&self) -> Result<ffi::ClientInfoDetails, String> {
RUNTIME.block_on(async {
let client = self
.inner
.get_me()
.await
.map_err(|error| format!("Could not get current client info: {error}"))?;
Ok(ffi::ClientInfoDetails::from(client))
})
}

pub fn get_client(&self, client_id: u32) -> Result<ffi::ClientInfoDetails, String> {
RUNTIME.block_on(async {
let client = self
.inner
.get_client(client_id)
.await
.map_err(|error| format!("Could not get client '{client_id}': {error}"))?;
ffi::ClientInfoDetails::try_from(client)
.map_err(|error| format!("Could not get client '{client_id}': {error}"))
})
}

pub fn get_clients(&self) -> Result<Vec<ffi::ClientInfo>, String> {
RUNTIME.block_on(async {
let clients = self
.inner
.get_clients()
.await
.map_err(|error| format!("Could not get clients: {error}"))?;
Ok(clients.into_iter().map(ffi::ClientInfo::from).collect())
})
}

pub fn ping(&self) -> Result<(), String> {
RUNTIME.block_on(async {
self.inner
.ping()
.await
.map_err(|error| format!("Could not ping server: {error}"))?;
Ok(())
})
}

pub fn heartbeat_interval(&self) -> u64 {
RUNTIME.block_on(async { self.inner.heartbeat_interval().await.as_micros() })
}

pub fn snapshot(
&self,
snapshot_compression: String,
snapshot_types: Vec<String>,
) -> Result<Vec<u8>, String> {
let rust_compression = match snapshot_compression.trim() {
"" => RustSnapshotCompression::default(),
value => RustSnapshotCompression::from_str(value).map_err(|error| {
format!("Could not capture snapshot: invalid compression '{value}': {error}")
})?,
};
let rust_snapshot_types = snapshot_types
.into_iter()
.map(|snapshot_type| {
RustSystemSnapshotType::from_str(&snapshot_type).map_err(|error| {
format!(
"Could not capture snapshot: invalid snapshot type '{}': {}",
snapshot_type, error
)
})
})
.collect::<Result<Vec<_>, _>>()?;

RUNTIME.block_on(async {
let snapshot = self
.inner
.snapshot(rust_compression, rust_snapshot_types)
.await
.map_err(|error| format!("Could not capture snapshot: {error}"))?;
Ok(snapshot.0)
})
}
}

pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> {
Expand Down
14 changes: 13 additions & 1 deletion foreign/cpp/src/consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,19 @@

use crate::ffi;
use iggy::prelude::ConsumerGroupDetails as RustConsumerGroupDetails;
use iggy_common::ConsumerGroupMember as RustConsumerGroupMember;
use iggy_common::{
ConsumerGroupInfo as RustConsumerGroupInfo, ConsumerGroupMember as RustConsumerGroupMember,
};

impl From<RustConsumerGroupInfo> for ffi::ConsumerGroupInfo {
fn from(group: RustConsumerGroupInfo) -> Self {
ffi::ConsumerGroupInfo {
stream_id: group.stream_id,
topic_id: group.topic_id,
group_id: group.group_id,
}
}
}

impl From<RustConsumerGroupMember> for ffi::ConsumerGroupMember {
fn from(member: RustConsumerGroupMember) -> Self {
Expand Down
1 change: 1 addition & 0 deletions foreign/cpp/src/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl TryFrom<ffi::Identifier> for RustIdentifier {
}
}

#[allow(clippy::wrong_self_convention)]
impl ffi::Identifier {
pub fn from_string(&mut self, id: String) -> Result<(), String> {
*self = RustIdentifier::named(&id)
Expand Down
74 changes: 74 additions & 0 deletions foreign/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,69 @@ mod ffi {
members: Vec<ConsumerGroupMember>,
}

struct ConsumerGroupInfo {
stream_id: u32,
topic_id: u32,
group_id: u32,
}

struct ClientInfo {
client_id: u32,
user_id: u32,
address: String,
transport: String,
consumer_groups_count: u32,
}

struct ClientInfoDetails {
client_id: u32,
user_id: u32,
address: String,
transport: String,
consumer_groups_count: u32,
consumer_groups: Vec<ConsumerGroupInfo>,
}

struct CacheMetricEntry {
stream_id: u32,
topic_id: u32,
partition_id: u32,
hits: u64,
misses: u64,
hit_ratio: f32,
}

struct Stats {
process_id: u32,
cpu_usage: f32,
total_cpu_usage: f32,
memory_usage: u64,
total_memory: u64,
available_memory: u64,
run_time: u64,
start_time: u64,
read_bytes: u64,
written_bytes: u64,
messages_size_bytes: u64,
streams_count: u32,
topics_count: u32,
partitions_count: u32,
segments_count: u32,
messages_count: u64,
clients_count: u32,
consumer_groups_count: u32,
hostname: String,
os_name: String,
os_version: String,
kernel_version: String,
iggy_server_version: String,
iggy_server_semver: u32,
cache_metrics: Vec<CacheMetricEntry>,
threads_count: u32,
free_disk_space: u64,
total_disk_space: u64,
}

extern "Rust" {
type Client;

Expand Down Expand Up @@ -129,6 +192,17 @@ mod ffi {
topic_id: Identifier,
group_id: Identifier,
) -> Result<()>;
fn get_stats(self: &Client) -> Result<Stats>;
fn get_me(self: &Client) -> Result<ClientInfoDetails>;
fn get_client(self: &Client, client_id: u32) -> Result<ClientInfoDetails>;
fn get_clients(self: &Client) -> Result<Vec<ClientInfo>>;
fn ping(self: &Client) -> Result<()>;
fn heartbeat_interval(self: &Client) -> u64;
fn snapshot(
self: &Client,
snapshot_compression: String,
snapshot_types: Vec<String>,
) -> Result<Vec<u8>>;

unsafe fn delete_connection(client: *mut Client) -> Result<()>;

Expand Down
Loading
Loading