Skip to content

Commit 1057266

Browse files
authored
fix: aggregator errors when events in pending and incoming batch (#741)
* chore: clippy * fix: join pending event_states for history when processing conclusion events batch If we had put an event in pending and the incoming batch included subsequent events as well as its model, it would be unblocked and persisted, however, the new events wouldn't correctly recognize their history and fail to patch * test: add test for pending events * fix: address PR feedback
1 parent 3a32f84 commit 1057266

6 files changed

Lines changed: 151 additions & 50 deletions

File tree

event-svc/src/event/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ impl EventService {
579579

580580
let proof = ChainProof::from(proof);
581581
self.event_access
582-
.persist_chain_inclusion_proofs(&[proof.clone()])
582+
.persist_chain_inclusion_proofs(std::slice::from_ref(&proof))
583583
.await
584584
.map_err(|e| crate::eth_rpc::Error::Application(e.into()))?;
585585

event-svc/src/event/validator/time.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ mod test {
268268
root_cid,
269269
block_hash: "0x0".to_string(),
270270
metadata: ChainProofMetadata {
271-
chain_id: chain_id,
271+
chain_id,
272272
tx_hash: "0x0".to_string(),
273273
tx_input: "0x0".to_string(),
274274
},

event-svc/src/tests/event.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,11 +695,11 @@ pub fn get_mock_chain_provider(
695695
Ok(crate::eth_rpc::ChainInclusionProof {
696696
timestamp: crate::eth_rpc::Timestamp::from_unix_ts(1744383131980),
697697
root_cid,
698-
block_hash: format!("0xblock_hash{}", p.tx_hash().to_string()),
698+
block_hash: format!("0xblock_hash{}", p.tx_hash()),
699699
metadata: crate::eth_rpc::ChainProofMetadata {
700700
chain_id: chain_id.clone(),
701701
tx_hash: tx_hash_try_from_cid(p.tx_hash()).unwrap().to_string(),
702-
tx_input: format!("0x{}{}", p.tx_type(), p.tx_hash().to_string()),
702+
tx_input: format!("0x{}{}", p.tx_type(), p.tx_hash()),
703703
},
704704
})
705705
});

one/src/daemon.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
default_directory, handle_signals, http, http_metrics, metrics, network::Ipfs, startup, DBOpts,
55
DBOptsExperimental, Info, LogOpts, Network,
66
};
7-
use anyhow::{anyhow, bail, Result};
7+
use anyhow::{anyhow, bail, Context as _, Result};
88
use ceramic_anchor_remote::RemoteCas;
99
use ceramic_anchor_service::AnchorService;
1010
use ceramic_core::NodeKey;
@@ -457,7 +457,9 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
457457
};
458458
debug!(?p2p_config, "using p2p config");
459459

460-
let node_key = NodeKey::try_from_dir(opts.p2p_key_dir).await?;
460+
let node_key = NodeKey::try_from_dir(opts.p2p_key_dir)
461+
.await
462+
.context("missing p2p dir")?;
461463
let node_id = node_key.id();
462464

463465
// Process initial interests if provided

pipeline/src/aggregator/mod.rs

Lines changed: 140 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -442,23 +442,6 @@ impl Aggregator {
442442
// Joins events with their previous state if any
443443
#[instrument(skip_all)]
444444
async fn join_event_states(&self, conclusion_events: DataFrame) -> Result<DataFrame> {
445-
let event_states = self
446-
.ctx
447-
.table(EVENT_STATES_TABLE)
448-
.await?
449-
.select_columns(&[
450-
"event_cid_partition",
451-
"stream_cid",
452-
"event_cid",
453-
"event_height",
454-
"data",
455-
])?
456-
// Alias column so it does not conflict with the column from conclusion_events
457-
// in the join.
458-
.with_column_renamed("event_cid_partition", "ecp")?
459-
.with_column_renamed("data", "previous_data")?
460-
.with_column_renamed("event_height", "previous_height")?;
461-
462445
let conclusion_events = conclusion_events
463446
// MID only ever use the first previous, so we can optimize the join by selecting the
464447
// first element of the previous array.
@@ -482,43 +465,80 @@ impl Aggregator {
482465
.alias("conclusion_events")?;
483466

484467
// remove events we've already seen
468+
let known_event_cids = self
469+
.ctx
470+
.table(EVENT_STATES_TABLE)
471+
.await?
472+
.select_columns(&["event_cid_partition", "event_cid"])?
473+
.with_column_renamed("event_cid_partition", "known_event_cid_partition")?
474+
.with_column_renamed("event_cid", "known_event_cid")?;
475+
485476
let conclusion_events = conclusion_events
486477
.join_on(
487-
event_states.clone(),
478+
known_event_cids,
488479
JoinType::LeftAnti,
489480
vec![
490-
col("conclusion_events.event_cid_partition").eq(col("ecp")),
491-
col("conclusion_events.event_cid")
492-
.eq(table_col(EVENT_STATES_TABLE, "event_cid")),
481+
col("conclusion_events.event_cid_partition")
482+
.eq(col("known_event_cid_partition")),
483+
col("conclusion_events.event_cid").eq(col("known_event_cid")),
493484
],
494485
)
495486
.context("anti join")?;
496487

497488
// join all new conclusion_events with known history to apply patches
489+
// Include both EVENT_STATES_TABLE and PENDING_EVENT_STATES_TABLE
490+
// because events may reference pending events that haven't been fully processed yet
491+
// but will be 'unlocked' in our batch if their model being processed as well
492+
let event_states_for_previous = self
493+
.ctx
494+
.table(EVENT_STATES_TABLE)
495+
.await?
496+
.select_columns(&["event_cid_partition", "event_cid", "event_height", "data"])?
497+
.union(
498+
self.ctx
499+
.table(PENDING_EVENT_STATES_TABLE)
500+
.await?
501+
.select(vec![
502+
cid_part(col("event_cid")).alias("event_cid_partition"),
503+
col("event_cid"),
504+
col("event_height"),
505+
col("data"),
506+
])?,
507+
)?
508+
// Alias column so it does not conflict with the column from conclusion_events
509+
// in the join.
510+
.with_column_renamed("event_cid_partition", "previous_ecp")?
511+
.with_column_renamed("event_cid", "previous_event_cid")?
512+
.with_column_renamed("data", "previous_data")?
513+
.with_column_renamed("event_height", "previous_height")?;
514+
498515
let conclusion_events = conclusion_events
499516
.join_on(
500-
event_states,
517+
event_states_for_previous,
501518
JoinType::Left,
502519
[
503-
col("previous_event_cid_partition").eq(col("ecp")),
504-
col("previous").eq(table_col(EVENT_STATES_TABLE, "event_cid")),
520+
col("previous_event_cid_partition").eq(col("previous_ecp")),
521+
col("previous").eq(col("previous_event_cid")),
505522
],
506523
)
507524
.context("setup join")?
525+
.cache()
526+
.await
527+
.context("caching joined events")?
508528
.select(vec![
509529
col("conclusion_event_order"),
510-
col("conclusion_events.stream_cid").alias("stream_cid"),
511-
col("conclusion_events.stream_type").alias("stream_type"),
512-
col("conclusion_events.controller").alias("controller"),
513-
col("conclusion_events.dimensions").alias("dimensions"),
514-
col("conclusion_events.event_cid").alias("event_cid"),
515-
col("conclusion_events.event_type").alias("event_type"),
516-
col("conclusion_events.data").alias("data"),
530+
col("stream_cid"),
531+
col("stream_type"),
532+
col("controller"),
533+
col("dimensions"),
534+
col("event_cid"),
535+
col("event_type"),
536+
col("data"),
517537
col("previous"),
518538
col("previous_data"),
519539
col("previous_height"),
520-
col("conclusion_events.before").alias("before"),
521-
col("conclusion_events.chain_id").alias("chain_id"),
540+
col("before"),
541+
col("chain_id"),
522542
col("event_cid_partition"),
523543
])
524544
.context("select joined conclusion events")?;
@@ -788,7 +808,7 @@ impl Aggregator {
788808
let ordered = event_states
789809
.window(vec![row_number()
790810
.order_by(vec![
791-
// Ensure that the order preserves stream order and is deterministic.
811+
// Then ensure that the order preserves stream order and is deterministic.
792812
// Otherwise applications would see stream updates before seeing their previous
793813
// state.
794814
// External to this function we ensure that a model comes befores its
@@ -3650,8 +3670,8 @@ mod tests {
36503670
result
36513671
};
36523672

3653-
println!("first_result: {}", first_result.to_string());
3654-
println!("second_result: {}", second_result.to_string());
3673+
println!("first_result: {}", first_result);
3674+
println!("second_result: {}", second_result);
36553675

36563676
expect![[r#"
36573677
+-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+
@@ -3720,13 +3740,11 @@ mod tests {
37203740
)
37213741
}
37223742

3723-
/// WARNING: the order used here MUST BE GLOBAL for your events, so if you call this multiple times
3743+
/// WARNING:
3744+
/// - the order used here MUST BE GLOBAL for your events, so if you call this multiple times
37243745
/// you must correct the order manually.
3746+
/// - the event CID and unique values are random, so they are not stable across test runs
37253747
fn n_mid_events(to_add: u64, model_stream_id: &StreamId) -> Vec<ConclusionEvent> {
3726-
// NOTE: These CIDs and StreamIDs are fake and do not represent the actual hash of the data.
3727-
// This makes testing easier as changing the contents does not mean you need to update all of
3728-
// the cids.
3729-
37303748
let unique = random_cid().to_bytes();
37313749
let instance_stream_id = StreamId::document(random_cid());
37323750
let stream_init = ConclusionInit {
@@ -3862,4 +3880,85 @@ mod tests {
38623880
assert!(!res.contains("cannot validate"));
38633881
assert!(!res2.contains("cannot validate"));
38643882
}
3883+
3884+
async fn batched_test(
3885+
batch1: &[ConclusionEvent],
3886+
batch2: &[ConclusionEvent],
3887+
) -> Vec<RecordBatch> {
3888+
let ctx = init_with_cache(Some(7)).await.unwrap();
3889+
3890+
let mut subscription = ctx
3891+
.actor_handle
3892+
.send(SubscribeSinceMsg {
3893+
projection: None,
3894+
filters: None,
3895+
limit: None,
3896+
})
3897+
.await
3898+
.unwrap()
3899+
.unwrap();
3900+
3901+
ctx.actor_handle
3902+
.send(NewConclusionEventsMsg {
3903+
events: conclusion_events_to_record_batch(batch1).unwrap(),
3904+
})
3905+
.await
3906+
.unwrap()
3907+
.unwrap();
3908+
3909+
ctx.actor_handle
3910+
.send(NewConclusionEventsMsg {
3911+
events: conclusion_events_to_record_batch(batch2).unwrap(),
3912+
})
3913+
.await
3914+
.unwrap()
3915+
.unwrap();
3916+
3917+
let results = subscription.try_next().await.unwrap().unwrap();
3918+
3919+
ctx.shutdown().await.unwrap();
3920+
vec![results]
3921+
}
3922+
3923+
#[test_log::test(tokio::test)]
3924+
async fn pending_model_batching() {
3925+
let (model_stream_id, model) = test_model();
3926+
let mid1_events = n_mid_events(10, &model_stream_id);
3927+
let mid2_events = n_mid_events(10, &model_stream_id);
3928+
// This order is important! The model must come in a batch AFTER a MID init event that requires it with MORE events from the stream.
3929+
// Batch 1: init, batch 2: model, patch
3930+
// This forces the init to be pended waiting for the model, so when it arrives the init will be unpended and aggregated.
3931+
// The incoming event will then be aggregated to the stream, correctly understanding the state of the init event.
3932+
// Previously, the would miss the history and result in a validation error.
3933+
let mut events = mid1_events[0..1]
3934+
.iter()
3935+
.cloned()
3936+
.chain(mid2_events.iter().cloned())
3937+
.chain([model])
3938+
.chain(mid1_events[1..].iter().cloned())
3939+
.collect::<Vec<_>>();
3940+
// events must come after their previous and the order number has to incrememt globally.
3941+
// we ensured condition 1, now we rewrite the order
3942+
events
3943+
.iter_mut()
3944+
.enumerate()
3945+
.for_each(|(i, event)| match event {
3946+
ConclusionEvent::Data(data) => data.order = i as u64,
3947+
ConclusionEvent::Time(time) => time.order = i as u64,
3948+
});
3949+
3950+
let batch1 = &events[0..10].to_vec();
3951+
let batch2 = &events[10..].to_vec();
3952+
3953+
// First processing: process events normally
3954+
let results = tokio::time::timeout(
3955+
std::time::Duration::from_secs(3),
3956+
batched_test(batch1, batch2),
3957+
)
3958+
.await
3959+
.unwrap();
3960+
let res = pretty_event_states(results).await.unwrap();
3961+
3962+
assert!(!res.to_string().contains("null instance"), "{res}");
3963+
}
38653964
}

pipeline/src/aggregator/model_instance_patch.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,12 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
157157
//
158158
// Assumption made by the function:
159159
// * Window partitions are by stream_cid
160-
// * Rows are ordered by the index column
160+
// * Rows are ordered by the conclusion_event_order column
161161
//
162162
// With these assumptions the code assumes it has all events for a stream and only events from
163163
// a single stream.
164-
// Additionally index sort order means that any event's previous event comes earlier in the
165-
// data set and so a single pass algorithm can be implemented.
164+
// Additionally conclusion_event_order sort order means that any event's previous event comes
165+
// earlier in the data set and so a single pass algorithm can be implemented.
166166
//
167167
// Input data must have the following columns:
168168
// * event_cid - unique id of the event

0 commit comments

Comments
 (0)