Skip to content

Commit ecb0bf0

Browse files
authored
Merge pull request #108 from CodeGov-org/luca/logs
feat: logs fetcher
2 parents fed888b + a622ba2 commit ecb0bf0

9 files changed

Lines changed: 1411 additions & 74 deletions

File tree

Cargo.lock

Lines changed: 1119 additions & 71 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ members = [
44
"src/backend/api",
55
"src/backend/impl",
66
"src/backend/external_canisters",
7+
"src/backend/logs",
78
]
89

910
[profile.release]
@@ -24,6 +25,9 @@ candid = "0.10"
2425
candid_parser = "0.1"
2526
serde = "1.0"
2627
uuid = "1.6"
28+
serde_bytes = "0.11"
29+
chrono = { version = "0.4", default-features = false, features = ["std"] }
30+
base64 = "0.22"
2731

2832
mockall = "0.12"
2933
rstest = "0.18"

scripts/scrape-logs.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/bin/bash
2+
3+
cargo run \
4+
--package backend_logs \
5+
--bin backend_logs \
6+
-- \
7+
--identity-pem data/codegov-website-logger-identity.pem \
8+
--loki-endpoint https://logs-prod-eu-west-0.grafana.net \
9+
--loki-username 152321 \
10+
--loki-password $LOKI_PASSWORD \
11+
--backend-canister-id nijcm-2qaaa-aaaal-qcx2a-cai

src/backend/external_canisters/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ ic-nns-governance.workspace = true
99

1010
candid.workspace = true
1111
serde.workspace = true
12-
serde_bytes = "0.11"
12+
serde_bytes.workspace = true

src/backend/impl/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ serde.workspace = true
2929
serde_cbor = "0.11"
3030

3131
uuid = { workspace = true, features = ["serde"] }
32-
chrono = { version = "0.4", default-features = false, features = ["std"] }
32+
chrono.workspace = true
3333
hex = "0.4"
3434
lazy_static = "1.4"
35-
base64 = "0.22"
35+
base64.workspace = true
3636

3737
rand = { version = "0.8", default-features = false }
3838
rand_chacha = { version = "0.3", default-features = false }

src/backend/logs/Cargo.toml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[package]
2+
name = "backend_logs"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[[bin]]
7+
name = "backend_logs"
8+
path = "src/main.rs"
9+
10+
[dependencies]
11+
backend_api = { path = "../api" }
12+
13+
ic-cdk.workspace = true
14+
ic-agent = "0.39"
15+
16+
candid.workspace = true
17+
serde.workspace = true
18+
serde_bytes.workspace = true
19+
20+
clap = { version = "4.5", features = ["derive"] }
21+
opentelemetry = "0.26"
22+
opentelemetry-otlp = { version = "0.26", features = [
23+
"logs",
24+
"http-json",
25+
"reqwest-client",
26+
], default-features = false }
27+
opentelemetry_sdk = { version = "0.26", features = ["logs", "rt-tokio"] }
28+
opentelemetry-semantic-conventions = "0.26"
29+
tokio = { version = "1.40", features = ["full"] }
30+
anyhow = "1.0"
31+
base64.workspace = true
32+
chrono.workspace = true

src/backend/logs/src/fetcher.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use std::{
2+
fs::{File, OpenOptions},
3+
io::{Read, Seek, Write},
4+
path::PathBuf,
5+
};
6+
7+
use backend_api::{ApiResult, ListLogsResponse, LogEntry, LogsFilterRequest};
8+
use candid::{Decode, Encode, Principal};
9+
use ic_agent::{identity::Secp256k1Identity, Agent};
10+
11+
use crate::utils::now_timestamp_ms;
12+
13+
struct BackendActor {
14+
agent: Agent,
15+
canister_id: Principal,
16+
}
17+
18+
impl BackendActor {
19+
fn new(identity_pem: PathBuf, canister_id: Principal) -> anyhow::Result<Self> {
20+
let identity = Secp256k1Identity::from_pem_file(identity_pem)?;
21+
let agent = Agent::builder()
22+
.with_identity(identity)
23+
.with_url("https://icp-api.io")
24+
.build()?;
25+
Ok(Self { agent, canister_id })
26+
}
27+
28+
async fn list_logs(
29+
&self,
30+
after_timestamp_ms: Option<u64>,
31+
) -> Result<ListLogsResponse, anyhow::Error> {
32+
let request = LogsFilterRequest {
33+
after_timestamp_ms,
34+
before_timestamp_ms: None,
35+
context_contains_any: None,
36+
level: None,
37+
message_contains_any: None,
38+
};
39+
let response = self
40+
.agent
41+
.query(&self.canister_id, "list_logs")
42+
.with_arg(Encode!(&request)?)
43+
.await?;
44+
let result = Decode!(&response, ApiResult<ListLogsResponse>)?;
45+
match result {
46+
ApiResult::Ok(ok) => Ok(ok),
47+
ApiResult::Err(err) => Err(anyhow::anyhow!(err)),
48+
}
49+
}
50+
}
51+
52+
pub struct LogFetcher {
53+
last_fetch_timestamp: Option<u64>,
54+
file: File,
55+
actor: BackendActor,
56+
}
57+
58+
impl LogFetcher {
59+
pub fn new(identity_pem: PathBuf, backend_canister_id: String) -> anyhow::Result<Self> {
60+
let path = "data/last-fetch-timestamp.txt";
61+
let mut file = OpenOptions::new()
62+
.create(true)
63+
.read(true)
64+
.write(true)
65+
.open(path)?;
66+
let mut last_fetch_timestamp = String::new();
67+
file.read_to_string(&mut last_fetch_timestamp)?;
68+
69+
let actor = BackendActor::new(identity_pem, Principal::from_text(backend_canister_id)?)?;
70+
71+
Ok(Self {
72+
file,
73+
last_fetch_timestamp: last_fetch_timestamp.trim().parse().ok(),
74+
actor,
75+
})
76+
}
77+
78+
pub async fn fetch_logs(&mut self) -> anyhow::Result<Vec<LogEntry>> {
79+
let logs = self.actor.list_logs(self.last_fetch_timestamp).await?;
80+
let now = now_timestamp_ms();
81+
self.update_last_fetch_timestamp(now);
82+
Ok(logs.logs)
83+
}
84+
85+
fn update_last_fetch_timestamp(&mut self, timestamp: u64) {
86+
self.last_fetch_timestamp = Some(timestamp);
87+
self.file.set_len(0).unwrap();
88+
self.file.rewind().unwrap();
89+
self.file
90+
.write_all(timestamp.to_string().as_bytes())
91+
.unwrap();
92+
}
93+
}

src/backend/logs/src/main.rs

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
use std::{collections::HashMap, path::PathBuf};
2+
3+
use backend_api::{LogEntry, LogLevel};
4+
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
5+
use clap::Parser;
6+
use opentelemetry::{
7+
logs::{AnyValue, Logger as _, LoggerProvider as _, Severity},
8+
KeyValue,
9+
};
10+
use opentelemetry_otlp::WithExportConfig;
11+
use opentelemetry_sdk::{
12+
logs::{BatchConfigBuilder, BatchLogProcessor, LogRecord, Logger, LoggerProvider},
13+
runtime::Tokio,
14+
Resource,
15+
};
16+
17+
mod fetcher;
18+
mod utils;
19+
20+
use fetcher::LogFetcher;
21+
use utils::now;
22+
23+
/// `2048` is the default batch size for the OTLP logs batch processor.
24+
/// We use `2048 * 4` to accommodate cases where we fetch a lot of logs at once.
25+
const LOGS_PROCESSOR_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048 * 4;
26+
27+
fn init_telemetry(args: &Args) -> anyhow::Result<LoggerProvider> {
28+
let headers = {
29+
let mut headers = HashMap::new();
30+
let auth_header = format!(
31+
"Basic {}",
32+
BASE64.encode(format!("{}:{}", args.loki_username, args.loki_password))
33+
);
34+
headers.insert("Authorization".to_string(), auth_header.parse().unwrap());
35+
headers
36+
};
37+
// from https://grafana.com/docs/loki/latest/reference/loki-http-api/#ingest-logs-using-otlp
38+
let loki_endpoint = format!("{}/otlp/v1/logs", args.loki_endpoint);
39+
let exporter = opentelemetry_otlp::new_exporter()
40+
.http()
41+
.with_endpoint(loki_endpoint)
42+
.with_headers(headers)
43+
.build_log_exporter()?;
44+
45+
let processor = BatchLogProcessor::builder(exporter, Tokio)
46+
.with_batch_config(
47+
BatchConfigBuilder::default()
48+
.with_max_queue_size(LOGS_PROCESSOR_MAX_QUEUE_SIZE_DEFAULT)
49+
.build(),
50+
)
51+
.build();
52+
53+
let logger_provider = LoggerProvider::builder()
54+
.with_log_processor(processor)
55+
.with_resource(Resource::new(vec![
56+
KeyValue::new(
57+
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
58+
"backend_canister",
59+
),
60+
KeyValue::new("canister_id", args.backend_canister_id.clone()),
61+
]))
62+
.build();
63+
64+
Ok(logger_provider)
65+
}
66+
67+
fn build_logger(provider: &LoggerProvider) -> Logger {
68+
provider.logger_builder("backend_canister_logger").build()
69+
}
70+
71+
struct LogEntryAdapter(LogEntry);
72+
73+
impl TryInto<LogRecord> for LogEntryAdapter {
74+
type Error = anyhow::Error;
75+
76+
fn try_into(self) -> Result<LogRecord, Self::Error> {
77+
let mut log_record = LogRecord::default();
78+
log_record.timestamp =
79+
Some(chrono::DateTime::parse_from_rfc3339(&self.0.date_time)?.into());
80+
log_record.observed_timestamp = Some(now());
81+
log_record.severity_number = Some(match self.0.level {
82+
LogLevel::Info => Severity::Info,
83+
LogLevel::Warn => Severity::Warn,
84+
LogLevel::Error => Severity::Error,
85+
});
86+
let mut body = HashMap::new();
87+
body.insert("message".into(), self.0.message.into());
88+
if let Some(context) = self.0.context {
89+
body.insert("context".into(), context.into());
90+
}
91+
log_record.body = Some(AnyValue::Map(Box::new(body)));
92+
93+
Ok(log_record)
94+
}
95+
}
96+
97+
#[derive(Parser, Debug)]
98+
#[command(author, version, about, long_about = None)]
99+
struct Args {
100+
/// Path to the identity PEM file
101+
#[arg(long, value_name = "FILE")]
102+
identity_pem: PathBuf,
103+
104+
/// Loki endpoint URL
105+
#[arg(long, value_name = "URL")]
106+
loki_endpoint: String,
107+
108+
/// Loki username
109+
#[arg(long)]
110+
loki_username: String,
111+
112+
/// Loki password
113+
#[arg(long)]
114+
loki_password: String,
115+
116+
/// Backend canister ID
117+
#[arg(long, value_name = "CANISTER_ID")]
118+
backend_canister_id: String,
119+
}
120+
121+
#[tokio::main]
122+
async fn main() -> anyhow::Result<()> {
123+
let args = Args::parse();
124+
125+
// we need to keep the logger provider in memory because the drop implementation shuts down the processors
126+
let logger_provider = init_telemetry(&args)?;
127+
let logger = build_logger(&logger_provider);
128+
129+
let mut log_fetcher = LogFetcher::new(args.identity_pem, args.backend_canister_id)?;
130+
131+
let logs = log_fetcher.fetch_logs().await?;
132+
println!("Sending {} logs to Loki...", logs.len());
133+
for log in logs {
134+
let log_entry = LogEntryAdapter(log);
135+
logger.emit(log_entry.try_into()?);
136+
}
137+
println!("Logs sent to Loki");
138+
139+
Ok(())
140+
}

src/backend/logs/src/utils.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
use std::time::{SystemTime, UNIX_EPOCH};
2+
3+
pub fn now() -> SystemTime {
4+
SystemTime::now()
5+
}
6+
7+
pub fn now_timestamp_ms() -> u64 {
8+
now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
9+
}

0 commit comments

Comments
 (0)