-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathevents.rs
More file actions
132 lines (116 loc) · 5.04 KB
/
events.rs
File metadata and controls
132 lines (116 loc) · 5.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use std::sync::Arc;
use anyhow::{Result, bail};
use arc_swap::ArcSwap;
use chrono::{DateTime, Utc};
use rand::distr::{SampleString, StandardUniform};
use std::sync::mpsc::Receiver;
use crate::app::models::{Event, event_params};
use crate::app::{DuckDBPool, EVENT_BATCH_INTERVAL, SqlitePool};
#[derive(Clone)]
pub struct LiwanEvents {
duckdb: DuckDBPool,
sqlite: SqlitePool,
daily_salt: Arc<ArcSwap<(String, DateTime<Utc>)>>,
}
impl LiwanEvents {
pub fn try_new(duckdb: DuckDBPool, sqlite: SqlitePool) -> Result<Self> {
let daily_salt: (String, DateTime<Utc>) = {
tracing::debug!("Loading daily salt");
sqlite.get()?.query_row("select salt, updated_at from salts where id = 1", [], |row| {
Ok((row.get(0)?, row.get(1)?))
})?
};
Ok(Self { duckdb, sqlite, daily_salt: ArcSwap::new(daily_salt.into()).into() })
}
/// Get the daily salt, generating a new one if the current one is older than 24 hours
pub fn get_salt(&self) -> Result<String> {
let (salt, updated_at) = &**self.daily_salt.load();
// if the salt is older than 24 hours, replace it with a new one (utils::generate_salt)
if (Utc::now() - updated_at) > chrono::Duration::hours(24) {
tracing::debug!("Daily salt expired, generating a new one");
let new_salt = StandardUniform.sample_string(&mut rand::rng(), 16);
let now = Utc::now();
let conn = self.sqlite.get()?;
conn.execute("update salts set salt = ?, updated_at = ? where id = 1", rusqlite::params![&new_salt, now])?;
self.daily_salt.store((new_salt.clone(), now).into());
Ok(new_salt)
} else {
Ok(salt.clone())
}
}
/// Append events in batch
pub fn append(&self, events: impl Iterator<Item = Event>) -> Result<()> {
let conn = self.duckdb.get()?;
let mut appender = conn.appender("events")?;
let mut first_event_time = Utc::now();
for event in events {
appender.append_row(event_params![event])?;
if first_event_time > event.created_at {
first_event_time = event.created_at;
}
}
appender.flush()?;
update_event_times(&conn, first_event_time)?;
Ok(())
}
/// Start processing events from the given channel. Blocks until the channel is closed.
pub fn process(&self, events: Receiver<Event>) -> Result<()> {
let conn = self.duckdb.get()?;
loop {
match events.recv() {
Ok(event) => {
let mut appender = conn.appender("events")?;
let mut first_event_time = event.created_at;
appender.append_row(event_params![event])?;
// Non-blockingly drain the remaining events in the queue if there are any
let mut count = 1;
for event in events.try_iter() {
appender.append_row(event_params![event])?;
count += 1;
if first_event_time > event.created_at {
first_event_time = event.created_at;
}
}
appender.flush()?;
update_event_times(&conn, first_event_time)?;
tracing::debug!("Processed {} events", count);
// Sleep to allow more events to be received before the next batch
std::thread::sleep(EVENT_BATCH_INTERVAL);
}
Err(_) => bail!("event channel closed"),
}
}
}
}
use duckdb::{Connection, Result as DuckResult, params};
pub fn update_event_times(conn: &Connection, from_time: DateTime<Utc>) -> DuckResult<()> {
// this can probably be simplified, sadly the where clause can't contain window functions
let sql = "--sql
with
filtered_events as (
select *
from events
where created_at >= ?::timestamp or visitor_id in (
select visitor_id
from events
where created_at >= now()::timestamp - interval '24 hours' and created_at < ?::timestamp and time_to_next_event is null
)
),
cte as (
select
visitor_id,
created_at,
created_at - lag(created_at) over (partition by visitor_id order by created_at) as time_from_last_event,
lead(created_at) over (partition by visitor_id order by created_at) - created_at as time_to_next_event
from filtered_events
)
update events
set
time_from_last_event = cte.time_from_last_event,
time_to_next_event = cte.time_to_next_event
from cte
where events.visitor_id = cte.visitor_id and events.created_at = cte.created_at;
";
conn.execute(sql, params![&from_time, &from_time])?;
Ok(())
}