@@ -184,6 +184,168 @@ where
184184 Ok ( ( ) )
185185 }
186186
187+ /// Replay the write-ahead log to recover state after a crash.
188+ ///
189+ /// This method should be called once during startup, after the WAL is
190+ /// opened and persistent modality stores have loaded their data from redb.
191+ ///
192+ /// Recovery strategy (octad-level, Option B):
193+ ///
194+ /// 1. Find the last checkpoint in the WAL.
195+ /// 2. Replay all entries after that checkpoint.
196+ /// 3. For committed operations (Insert/Update/Delete followed by a
197+ /// Checkpoint with payload b"COMMITTED"): rebuild the octad status
198+ /// registry. The modality data is already in redb.
199+ /// 4. For uncommitted operations (no matching Checkpoint): log a warning.
200+ /// The incomplete write may have partially persisted to redb — the data
201+ /// is still consistent per-modality (each redb write is atomic), but the
202+ /// cross-modal operation may be incomplete.
203+ /// 5. Write a fresh checkpoint after replay.
204+ ///
205+ /// Returns the number of entities recovered.
206+ pub async fn replay_wal (
207+ & self ,
208+ wal_dir : impl AsRef < std:: path:: Path > ,
209+ ) -> Result < usize , OctadError > {
210+ use std:: collections:: HashSet ;
211+ use verisim_wal:: WalReader ;
212+
213+ let reader = match WalReader :: open ( & wal_dir) {
214+ Ok ( r) => r,
215+ Err ( verisim_wal:: WalError :: DirectoryNotFound ( _) ) => {
216+ info ! ( "No WAL directory found — clean start" ) ;
217+ return Ok ( 0 ) ;
218+ }
219+ Err ( e) => {
220+ return Err ( OctadError :: ModalityError {
221+ modality : "wal" . to_string ( ) ,
222+ message : format ! ( "Failed to open WAL reader: {e}" ) ,
223+ } ) ;
224+ }
225+ } ;
226+
227+ // Find the last checkpoint to determine replay start point
228+ let checkpoint_seq = reader. find_last_checkpoint ( ) . map_err ( |e| {
229+ OctadError :: ModalityError {
230+ modality : "wal" . to_string ( ) ,
231+ message : format ! ( "Failed to find checkpoint: {e}" ) ,
232+ }
233+ } ) ?;
234+
235+ let start_seq = checkpoint_seq. unwrap_or ( 0 ) ;
236+ info ! ( start_seq, "Replaying WAL from sequence {}" , start_seq) ;
237+
238+ // Read all entries from the checkpoint onward
239+ let entries: Vec < WalEntry > = reader
240+ . replay_from ( start_seq)
241+ . map_err ( |e| OctadError :: ModalityError {
242+ modality : "wal" . to_string ( ) ,
243+ message : format ! ( "WAL replay_from failed: {e}" ) ,
244+ } ) ?
245+ . collect ( ) ;
246+
247+ if entries. is_empty ( ) {
248+ info ! ( "WAL replay: no entries to replay" ) ;
249+ return Ok ( 0 ) ;
250+ }
251+
252+ // Track which entity_ids have been committed (have a Checkpoint entry)
253+ let mut committed_entities: HashSet < String > = HashSet :: new ( ) ;
254+ let mut uncommitted_entities: HashSet < String > = HashSet :: new ( ) ;
255+ let mut entity_ops: HashMap < String , ( WalOperation , Vec < u8 > ) > = HashMap :: new ( ) ;
256+
257+ for entry in & entries {
258+ match entry. operation {
259+ WalOperation :: Checkpoint => {
260+ // Checkpoint with payload "COMMITTED" marks the previous op as complete
261+ if entry. payload == b"COMMITTED" {
262+ committed_entities. insert ( entry. entity_id . clone ( ) ) ;
263+ uncommitted_entities. remove ( & entry. entity_id ) ;
264+ }
265+ }
266+ WalOperation :: Insert | WalOperation :: Update | WalOperation :: Delete => {
267+ entity_ops. insert (
268+ entry. entity_id . clone ( ) ,
269+ ( entry. operation . clone ( ) , entry. payload . clone ( ) ) ,
270+ ) ;
271+ if !committed_entities. contains ( & entry. entity_id ) {
272+ uncommitted_entities. insert ( entry. entity_id . clone ( ) ) ;
273+ }
274+ }
275+ }
276+ }
277+
278+ // Warn about uncommitted operations
279+ for entity_id in & uncommitted_entities {
280+ tracing:: warn!(
281+ entity_id,
282+ "WAL replay: uncommitted operation for entity — may be partially persisted"
283+ ) ;
284+ }
285+
286+ // Rebuild octad status registry for committed entities
287+ let mut octads = self . octads . write ( ) . await ;
288+ let mut recovered = 0usize ;
289+
290+ for entity_id in & committed_entities {
291+ if let Some ( ( op, payload) ) = entity_ops. get ( entity_id) {
292+ match op {
293+ WalOperation :: Insert | WalOperation :: Update => {
294+ // Deserialize the OctadInput to determine which modalities were written
295+ if let Ok ( input) = serde_json:: from_slice :: < OctadInput > ( payload) {
296+ let modality_status = ModalityStatus {
297+ graph : input. graph . is_some ( ) ,
298+ vector : input. vector . is_some ( ) ,
299+ document : input. document . is_some ( ) ,
300+ tensor : input. tensor . is_some ( ) ,
301+ semantic : input. semantic . is_some ( ) ,
302+ temporal : true , // Always written
303+ provenance : input. provenance . is_some ( ) ,
304+ spatial : input. spatial . is_some ( ) ,
305+ } ;
306+
307+ let id = OctadId :: from ( entity_id. clone ( ) ) ;
308+ let now = Utc :: now ( ) ;
309+
310+ // Get existing version or start at 1
311+ let version = octads
312+ . get ( entity_id)
313+ . map ( |s| s. version + 1 )
314+ . unwrap_or ( 1 ) ;
315+
316+ octads. insert (
317+ entity_id. clone ( ) ,
318+ OctadStatus {
319+ id,
320+ created_at : now,
321+ modified_at : now,
322+ version,
323+ modality_status,
324+ } ,
325+ ) ;
326+ recovered += 1 ;
327+ } else {
328+ tracing:: warn!( entity_id, "WAL replay: failed to deserialize OctadInput" ) ;
329+ }
330+ }
331+ WalOperation :: Delete => {
332+ octads. remove ( entity_id) ;
333+ recovered += 1 ;
334+ }
335+ WalOperation :: Checkpoint => { } // Already handled above
336+ }
337+ }
338+ }
339+
340+ info ! ( recovered, committed = committed_entities. len( ) , uncommitted = uncommitted_entities. len( ) , "WAL replay complete" ) ;
341+
342+ // Write a fresh checkpoint to mark recovery complete
343+ drop ( octads) ; // Release write lock before checkpoint
344+ self . wal_checkpoint ( ) . await . ok ( ) ;
345+
346+ Ok ( recovered)
347+ }
348+
187349 /// Access the provenance store for direct queries.
188350 pub fn provenance_store ( & self ) -> & Arc < P > {
189351 & self . provenance
0 commit comments