22// Copyright (c) 2026 Jonathan D.A. Jewell (hyperpolymath) <j.d.a.jewell@open.ac.uk>
33//
44// Persistent provenance store backed by redb via verisim-storage.
5+ //
6+ // Each entity's ProvenanceChain is stored as a single JSON blob keyed by
7+ // entity_id. On open(), all chains are scanned into an in-memory cache
8+ // protected by a tokio::sync::RwLock (matching the InMemory implementation).
9+ // Writes go to redb first, then update the cache.
510
611use std:: collections:: HashMap ;
712use std:: path:: Path ;
8- use std:: sync:: { Arc , RwLock } ;
13+ use std:: sync:: Arc ;
914
1015use async_trait:: async_trait;
11- use tracing:: info;
16+ use tracing:: { debug , info, instrument } ;
1217use verisim_storage:: redb_backend:: RedbBackend ;
1318use verisim_storage:: typed:: TypedStore ;
1419
15- use crate :: { ProvenanceChain , ProvenanceError , ProvenanceRecord , ProvenanceStore } ;
20+ use crate :: {
21+ ProvenanceChain , ProvenanceError , ProvenanceEventType , ProvenanceRecord , ProvenanceStore ,
22+ } ;
1623
17- /// Persistent provenance store: redb for durability, in-memory cache for queries.
24+ /// Persistent provenance store: redb for durability, async RwLock cache for
25+ /// fast reads.
26+ ///
27+ /// The cache uses `tokio::sync::RwLock` to match the async locking pattern of
28+ /// `InMemoryProvenanceStore`.
1829pub struct RedbProvenanceStore {
30+ /// Typed store for provenance chains, keyed by entity_id.
1931 store : TypedStore < RedbBackend > ,
20- cache : Arc < RwLock < HashMap < String , ProvenanceChain > > > ,
32+ /// In-memory cache of all provenance chains.
33+ chains : Arc < tokio:: sync:: RwLock < HashMap < String , ProvenanceChain > > > ,
2134}
2235
2336impl RedbProvenanceStore {
37+ /// Open (or create) a persistent provenance store at the given path.
38+ ///
39+ /// On open, all existing chains are scanned from redb into the in-memory
40+ /// cache so that reads never hit disk.
2441 pub async fn open ( path : impl AsRef < Path > ) -> Result < Self , ProvenanceError > {
2542 let backend = RedbBackend :: open ( path. as_ref ( ) )
26- . map_err ( |e| ProvenanceError :: StorageError ( format ! ( "redb open: {}" , e) ) ) ?;
43+ . map_err ( |e| ProvenanceError :: IoError ( format ! ( "redb open: {}" , e) ) ) ?;
2744 let store = TypedStore :: new ( backend, "prov" ) ;
2845
2946 let entries: Vec < ( String , ProvenanceChain ) > = store
3047 . scan_prefix ( "" , 1_000_000 )
3148 . await
32- . map_err ( |e| ProvenanceError :: StorageError ( format ! ( "scan: {}" , e) ) ) ?;
49+ . map_err ( |e| ProvenanceError :: IoError ( format ! ( "scan: {}" , e) ) ) ?;
3350
3451 let mut cache = HashMap :: new ( ) ;
3552 for ( id, chain) in entries {
3653 cache. insert ( id, chain) ;
3754 }
3855
3956 info ! ( count = cache. len( ) , "Loaded provenance store from redb" ) ;
40- Ok ( Self { store, cache : Arc :: new ( RwLock :: new ( cache) ) } )
57+ Ok ( Self {
58+ store,
59+ chains : Arc :: new ( tokio:: sync:: RwLock :: new ( cache) ) ,
60+ } )
4161 }
4262
43- async fn persist_chain ( & self , entity_id : & str ) -> Result < ( ) , ProvenanceError > {
44- let c = self . cache . read ( ) . map_err ( |_| ProvenanceError :: LockPoisoned ) ?;
45- if let Some ( chain) = c. get ( entity_id) {
46- self . store . put ( entity_id, chain) . await
47- . map_err ( |e| ProvenanceError :: StorageError ( format ! ( "put: {}" , e) ) ) ?;
48- }
49- Ok ( ( ) )
63+ /// Persist a single entity's chain to redb.
64+ async fn persist_chain (
65+ & self ,
66+ entity_id : & str ,
67+ chain : & ProvenanceChain ,
68+ ) -> Result < ( ) , ProvenanceError > {
69+ self . store
70+ . put ( entity_id, chain)
71+ . await
72+ . map_err ( |e| ProvenanceError :: IoError ( format ! ( "put: {}" , e) ) )
5073 }
5174}
5275
5376#[ async_trait]
5477impl ProvenanceStore for RedbProvenanceStore {
55- async fn record ( & self , record : ProvenanceRecord ) -> Result < ( ) , ProvenanceError > {
56- let entity_id = record. entity_id . clone ( ) ;
57- {
58- let mut c = self . cache . write ( ) . map_err ( |_| ProvenanceError :: LockPoisoned ) ?;
59- let chain = c. entry ( entity_id. clone ( ) ) . or_insert_with ( || ProvenanceChain {
60- entity_id : entity_id. clone ( ) ,
61- records : Vec :: new ( ) ,
62- } ) ;
63- chain. records . push ( record) ;
64- }
65- self . persist_chain ( & entity_id) . await
78+ #[ instrument( skip( self ) ) ]
79+ async fn record_event (
80+ & self ,
81+ entity_id : & str ,
82+ event_type : ProvenanceEventType ,
83+ actor : & str ,
84+ source : Option < String > ,
85+ description : & str ,
86+ ) -> Result < ProvenanceRecord , ProvenanceError > {
87+ let mut chains = self . chains . write ( ) . await ;
88+ let chain = chains
89+ . entry ( entity_id. to_string ( ) )
90+ . or_insert_with ( || ProvenanceChain :: new ( entity_id) ) ;
91+
92+ chain. append ( event_type, actor, source, description) ;
93+ let record = chain. records . last ( ) . unwrap ( ) . clone ( ) ;
94+
95+ // Persist the updated chain to redb.
96+ self . persist_chain ( entity_id, chain) . await ?;
97+
98+ debug ! (
99+ entity_id = %entity_id,
100+ event = %record. event_type,
101+ actor = %record. actor,
102+ chain_length = chain. len( ) ,
103+ "Provenance event recorded (persistent)"
104+ ) ;
105+ Ok ( record)
66106 }
67107
68- async fn get_chain ( & self , entity_id : & str ) -> Result < Option < ProvenanceChain > , ProvenanceError > {
69- let c = self . cache . read ( ) . map_err ( |_| ProvenanceError :: LockPoisoned ) ?;
70- Ok ( c. get ( entity_id) . cloned ( ) )
108+ async fn get_chain ( & self , entity_id : & str ) -> Result < ProvenanceChain , ProvenanceError > {
109+ let chains = self . chains . read ( ) . await ;
110+ chains
111+ . get ( entity_id)
112+ . cloned ( )
113+ . ok_or_else ( || ProvenanceError :: NotFound ( entity_id. to_string ( ) ) )
71114 }
72115
73116 async fn verify_chain ( & self , entity_id : & str ) -> Result < bool , ProvenanceError > {
74- let c = self . cache . read ( ) . map_err ( |_| ProvenanceError :: LockPoisoned ) ?;
75- match c. get ( entity_id) {
76- Some ( chain) => Ok ( chain. verify ( ) ) ,
77- None => Err ( ProvenanceError :: NotFound ( entity_id. to_string ( ) ) ) ,
117+ let chains = self . chains . read ( ) . await ;
118+ match chains. get ( entity_id) {
119+ Some ( chain) => {
120+ chain. verify ( ) ?;
121+ Ok ( true )
122+ }
123+ None => Ok ( false ) ,
78124 }
79125 }
80126
81- async fn get_latest ( & self , entity_id : & str ) -> Result < Option < ProvenanceRecord > , ProvenanceError > {
82- let c = self . cache . read ( ) . map_err ( |_| ProvenanceError :: LockPoisoned ) ?;
83- Ok ( c. get ( entity_id) . and_then ( |chain| chain. records . last ( ) . cloned ( ) ) )
127+ async fn get_origin (
128+ & self ,
129+ entity_id : & str ,
130+ ) -> Result < Option < ProvenanceRecord > , ProvenanceError > {
131+ let chains = self . chains . read ( ) . await ;
132+ Ok ( chains. get ( entity_id) . and_then ( |c| c. origin ( ) . cloned ( ) ) )
133+ }
134+
135+ async fn get_latest (
136+ & self ,
137+ entity_id : & str ,
138+ ) -> Result < Option < ProvenanceRecord > , ProvenanceError > {
139+ let chains = self . chains . read ( ) . await ;
140+ Ok ( chains. get ( entity_id) . and_then ( |c| c. latest ( ) . cloned ( ) ) )
84141 }
85142
86- async fn search_by_actor ( & self , actor : & str ) -> Result < Vec < ProvenanceRecord > , ProvenanceError > {
87- let c = self . cache . read ( ) . map_err ( |_| ProvenanceError :: LockPoisoned ) ?;
143+ async fn search_by_actor (
144+ & self ,
145+ actor : & str ,
146+ ) -> Result < Vec < ( String , ProvenanceRecord ) > , ProvenanceError > {
147+ let chains = self . chains . read ( ) . await ;
88148 let mut results = Vec :: new ( ) ;
89- for chain in c . values ( ) {
149+ for ( entity_id , chain) in chains . iter ( ) {
90150 for record in & chain. records {
91151 if record. actor == actor {
92- results. push ( record. clone ( ) ) ;
152+ results. push ( ( entity_id . clone ( ) , record. clone ( ) ) ) ;
93153 }
94154 }
95155 }
96156 Ok ( results)
97157 }
98158
99159 async fn delete_chain ( & self , entity_id : & str ) -> Result < ( ) , ProvenanceError > {
100- self . store . delete ( entity_id) . await
101- . map_err ( |e| ProvenanceError :: StorageError ( format ! ( "delete: {}" , e) ) ) ?;
102- let mut c = self . cache . write ( ) . map_err ( |_| ProvenanceError :: LockPoisoned ) ?;
103- c. remove ( entity_id) ;
160+ // Delete from redb first.
161+ self . store
162+ . delete ( entity_id)
163+ . await
164+ . map_err ( |e| ProvenanceError :: IoError ( format ! ( "delete: {}" , e) ) ) ?;
165+ // Then remove from cache.
166+ let mut chains = self . chains . write ( ) . await ;
167+ chains. remove ( entity_id) ;
104168 Ok ( ( ) )
105169 }
106170}
@@ -114,17 +178,99 @@ mod tests {
114178 let dir = tempfile:: tempdir ( ) . unwrap ( ) ;
115179 let path = dir. path ( ) . join ( "prov.redb" ) ;
116180
181+ // Write data in one session.
117182 {
118183 let store = RedbProvenanceStore :: open ( & path) . await . unwrap ( ) ;
119- let record = ProvenanceRecord :: new ( "e1" , "create" , "user1" ) ;
120- store. record ( record) . await . unwrap ( ) ;
184+ store
185+ . record_event (
186+ "entity-1" ,
187+ ProvenanceEventType :: Created ,
188+ "alice" ,
189+ Some ( "https://source.example.com" . to_string ( ) ) ,
190+ "Initial creation" ,
191+ )
192+ . await
193+ . unwrap ( ) ;
194+ store
195+ . record_event (
196+ "entity-1" ,
197+ ProvenanceEventType :: Modified ,
198+ "bob" ,
199+ None ,
200+ "Updated vector embedding" ,
201+ )
202+ . await
203+ . unwrap ( ) ;
121204 }
122205
206+ // Reopen and verify data survived.
123207 {
124208 let store = RedbProvenanceStore :: open ( & path) . await . unwrap ( ) ;
125- let chain = store. get_chain ( "e1" ) . await . unwrap ( ) . unwrap ( ) ;
126- assert_eq ! ( chain. records. len( ) , 1 ) ;
127- assert_eq ! ( chain. records[ 0 ] . actor, "user1" ) ;
209+
210+ let chain = store. get_chain ( "entity-1" ) . await . unwrap ( ) ;
211+ assert_eq ! ( chain. len( ) , 2 ) ;
212+ assert ! ( chain. verify( ) . is_ok( ) ) ;
213+
214+ let origin = store. get_origin ( "entity-1" ) . await . unwrap ( ) . unwrap ( ) ;
215+ assert_eq ! ( origin. actor, "alice" ) ;
216+ assert_eq ! ( origin. event_type, ProvenanceEventType :: Created ) ;
217+
218+ let latest = store. get_latest ( "entity-1" ) . await . unwrap ( ) . unwrap ( ) ;
219+ assert_eq ! ( latest. actor, "bob" ) ;
220+ assert_eq ! ( latest. event_type, ProvenanceEventType :: Modified ) ;
221+
222+ // Verify chain integrity
223+ assert ! ( store. verify_chain( "entity-1" ) . await . unwrap( ) ) ;
224+
225+ // Non-existent entity returns false, not error
226+ assert ! ( !store. verify_chain( "no-such-entity" ) . await . unwrap( ) ) ;
128227 }
129228 }
229+
230+ #[ tokio:: test]
231+ async fn test_persistent_provenance_search_by_actor ( ) {
232+ let dir = tempfile:: tempdir ( ) . unwrap ( ) ;
233+ let path = dir. path ( ) . join ( "prov-search.redb" ) ;
234+
235+ let store = RedbProvenanceStore :: open ( & path) . await . unwrap ( ) ;
236+ store
237+ . record_event ( "e1" , ProvenanceEventType :: Created , "alice" , None , "Created e1" )
238+ . await
239+ . unwrap ( ) ;
240+ store
241+ . record_event ( "e2" , ProvenanceEventType :: Created , "bob" , None , "Created e2" )
242+ . await
243+ . unwrap ( ) ;
244+ store
245+ . record_event (
246+ "e3" ,
247+ ProvenanceEventType :: Imported ,
248+ "alice" ,
249+ None ,
250+ "Imported e3" ,
251+ )
252+ . await
253+ . unwrap ( ) ;
254+
255+ let alice_records = store. search_by_actor ( "alice" ) . await . unwrap ( ) ;
256+ assert_eq ! ( alice_records. len( ) , 2 ) ;
257+
258+ let bob_records = store. search_by_actor ( "bob" ) . await . unwrap ( ) ;
259+ assert_eq ! ( bob_records. len( ) , 1 ) ;
260+ }
261+
262+ #[ tokio:: test]
263+ async fn test_persistent_provenance_delete_chain ( ) {
264+ let dir = tempfile:: tempdir ( ) . unwrap ( ) ;
265+ let path = dir. path ( ) . join ( "prov-delete.redb" ) ;
266+
267+ let store = RedbProvenanceStore :: open ( & path) . await . unwrap ( ) ;
268+ store
269+ . record_event ( "e1" , ProvenanceEventType :: Created , "alice" , None , "Created" )
270+ . await
271+ . unwrap ( ) ;
272+
273+ store. delete_chain ( "e1" ) . await . unwrap ( ) ;
274+ assert ! ( store. get_chain( "e1" ) . await . is_err( ) ) ;
275+ }
130276}
0 commit comments