@@ -63,6 +63,7 @@ const (
6363)
6464
6565var httpClient = & http.Client {Timeout : 180 * time .Second }
66+ var covalentFetchTimeout = 10 * time .Second
6667
6768// In-memory cache: date string "YYYY-MM-DD" -> ETH/USD price.
6869// Avoids repeat Covalent API calls for transactions on the same day.
@@ -240,9 +241,12 @@ func main() {
240241 false ,
241242 "only update/compare rows where the existing DB row has is_lending=1 (ignores incompleteness filter)" ,
242243 )
244+ fetchTimeout = flag .Int ("fetch-timeout" , 10 , "Covalent API timeout in seconds per transaction" )
243245 )
244246 flag .Parse ()
245247
248+ covalentFetchTimeout = time .Duration (* fetchTimeout ) * time .Second
249+
246250 apiKey := os .Getenv ("COVALENT_KEY" )
247251 if apiKey == "" {
248252 log .Fatal ("COVALENT_KEY is required" )
@@ -339,7 +343,36 @@ func main() {
339343 }
340344
341345 // Real run: insert missing, update existing (fill-only unless recompute-all)
346+ //
347+ // Tombstone lifecycle (two-phase):
348+ // 1st 404 → not_found_retry (retryable; rediscovered on next run)
349+ // 2nd 404 → not_found (permanent tombstone; excluded from future runs)
350+ //
351+ // Rate guard: only NEW 404s (not repeats) count toward the rate. If the
352+ // new-404 rate exceeds 10%, all new tombstones are suppressed for that run
353+ // (likely a Covalent outage). Repeat 404s are upgraded unconditionally
354+ // since they've already failed across 2+ runs.
355+ const tombstoneMaxRate = 0.10
356+
357+ type pendingTombstone struct {
358+ candidate Candidate
359+ }
360+
361+ // Load hashes that are already not_found_retry from a prior run.
362+ notFoundPending , err := loadNotFoundPendingHashes (db )
363+ if err != nil {
364+ log .Fatalf ("loadNotFoundPendingHashes: %v" , err )
365+ }
366+ if len (notFoundPending ) > 0 {
367+ log .Printf ("loaded %d not_found_retry hashes from prior runs" , len (notFoundPending ))
368+ }
369+
342370 var inserted , updated , computeErr int
371+ var new404Count int // new 404s (first time, counts toward rate)
372+ var repeat404Count int // repeat 404s (already pending, upgraded directly)
373+ var processedCount int
374+ var pendingTombstones []pendingTombstone
375+
343376 for idx , k := range keys {
344377 if idx % 100 == 0 {
345378 log .Printf ("progress %d/%d" , idx , len (keys ))
@@ -351,31 +384,55 @@ func main() {
351384 computeErr ++
352385 log .Printf ("compute error %s: %v" , w .Hash0x , err )
353386
354- // If we get a"tx not found" (Covalent 404), insert a tombstone (not_found row)so we don't retry forever.
355387 if w .InsertCandidate != nil && ! * onlyUpdates && isCovalentTxNotFound (err ) {
388+ hashNorm := strings .ToLower (strings .TrimSpace (strip0x (w .InsertCandidate .HashNorm )))
389+
390+ // Repeat 404: already failed on a prior run. Upgrade to
391+ // permanent tombstone unconditionally.
392+ if _ , isPending := notFoundPending [hashNorm ]; isPending {
393+ repeat404Count ++
394+ if upErr := upgradeToNotFound (db , hashNorm ); upErr != nil {
395+ log .Printf ("upgrade not_found error %s: %v" , w .Hash0x , upErr )
396+ } else {
397+ log .Printf ("upgraded to not_found (repeat 404) %s" , w .Hash0x )
398+ }
399+ continue
400+ }
401+
402+ // New 404: first time seeing this hash fail.
403+ new404Count ++
356404 ok , why := shouldTombstoneNotFound (db , w .InsertCandidate .HashNorm , w .InsertCandidate .Source , w .InsertCandidate .CommitmentIndex , 15 )
357405 if ! ok {
358406 log .Printf ("skip tombstone %s (%s)" , w .Hash0x , why )
359407 continue
360408 }
361-
362- if insErr := insertV2NotFoundRow (db , * w .InsertCandidate ); insErr != nil {
363- log .Printf ("insert not_found error %s: %v" , w .Hash0x , insErr )
364- } else {
365- inserted ++
366- }
409+ pendingTombstones = append (pendingTombstones , pendingTombstone {candidate : * w .InsertCandidate })
367410 }
368411 continue
369412 }
370413
414+ processedCount ++
415+
371416 // Insert missing row (if applicable)
372417 if w .InsertCandidate != nil && ! * onlyUpdates {
373- err := insertV2Row (db , * w .InsertCandidate , comp )
374- if err != nil {
375- // if already exists due to race, fall through to update
376- log .Printf ("insert error %s: %v" , w .Hash0x , err )
418+ hashNorm := strings .ToLower (strings .TrimSpace (strip0x (w .InsertCandidate .HashNorm )))
419+ // If this hash was not_found_retry, overwrite it with full data
420+ // via UPDATE instead of INSERT (avoids needing DELETE privileges).
421+ if _ , wasPending := notFoundPending [hashNorm ]; wasPending {
422+ fakeExisting := ExistingRow {HashNorm : hashNorm , Hash0x : w .Hash0x }
423+ if upErr := updateV2Row (db , fakeExisting , comp , true ); upErr != nil {
424+ log .Printf ("update not_found_retry error %s: %v" , w .Hash0x , upErr )
425+ } else {
426+ inserted ++
427+ }
377428 } else {
378- inserted ++
429+ err := insertV2Row (db , * w .InsertCandidate , comp )
430+ if err != nil {
431+ // if already exists due to race, fall through to update
432+ log .Printf ("insert error %s: %v" , w .Hash0x , err )
433+ } else {
434+ inserted ++
435+ }
379436 }
380437 }
381438
@@ -390,7 +447,28 @@ func main() {
390447 }
391448 }
392449
393- log .Printf ("done: inserted=%d updated=%d compute_errors=%d" , inserted , updated , computeErr )
450+ // Flush or suppress pending tombstones based on the NEW 404 rate.
451+ tombstonesWritten := 0
452+ totalAttempted := processedCount + computeErr
453+ if totalAttempted > 0 && len (pendingTombstones ) > 0 {
454+ rate := float64 (new404Count ) / float64 (totalAttempted )
455+ if rate > tombstoneMaxRate {
456+ log .Printf ("suppressing %d tombstones: new 404 rate %.1f%% (%d/%d) exceeds %.0f%% threshold — likely Covalent outage" ,
457+ len (pendingTombstones ), rate * 100 , new404Count , totalAttempted , tombstoneMaxRate * 100 )
458+ } else {
459+ for _ , pt := range pendingTombstones {
460+ if insErr := insertV2NotFoundRow (db , pt .candidate ); insErr != nil {
461+ log .Printf ("insert not_found_retry error %s: %v" , pt .candidate .Hash0x , insErr )
462+ } else {
463+ tombstonesWritten ++
464+ inserted ++
465+ }
466+ }
467+ }
468+ }
469+
470+ log .Printf ("done: inserted=%d updated=%d compute_errors=%d new_404s=%d repeat_404s=%d tombstones_pending=%d tombstones_suppressed=%d" ,
471+ inserted , updated , computeErr , new404Count , repeat404Count , tombstonesWritten , len (pendingTombstones )- tombstonesWritten )
394472}
395473
396474// -------------------- DB connection --------------------
@@ -497,6 +575,7 @@ v2 AS (
497575 FROM mevcommit_57173.processed_l1_txns_v2
498576 WHERE l1_tx_hash IS NOT NULL
499577 AND CAST(l1_tx_hash AS VARCHAR) <> ''
578+ AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) <> 'not_found_retry')
500579)
501580SELECT
502581 o.l1_tx_hash_0x,
@@ -570,6 +649,7 @@ v2_raw AS (
570649 FROM mevcommit_57173.processed_l1_txns_v2
571650 WHERE l1_tx_hash IS NOT NULL
572651 AND CAST(l1_tx_hash AS VARCHAR) <> ''
652+ AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) <> 'not_found_retry')
573653),
574654v2 AS (
575655 SELECT
@@ -779,7 +859,7 @@ SELECT
779859FROM mevcommit_57173.processed_l1_txns_v2
780860WHERE l1_tx_hash IS NOT NULL
781861 AND CAST(l1_tx_hash AS VARCHAR) <> ''
782- AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) <> 'not_found')
862+ AND (primary_class IS NULL OR LOWER(CAST(primary_class AS VARCHAR)) NOT IN ( 'not_found', 'not_found_retry') )
783863 AND (
784864 is_swap IS NULL
785865 OR is_lending IS NULL
@@ -999,6 +1079,42 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 (
9991079 return err
10001080}
10011081
1082+ // loadNotFoundPendingHashes returns a set of hash_norm values that have
1083+ // primary_class = 'not_found_retry'. These are txs that got a Covalent 404
1084+ // on a previous run but were not yet confirmed as permanently missing.
1085+ func loadNotFoundPendingHashes (db * sql.DB ) (map [string ]struct {}, error ) {
1086+ q := `
1087+ SELECT LOWER(CAST(l1_tx_hash AS VARCHAR)) AS hash_norm
1088+ FROM mevcommit_57173.processed_l1_txns_v2
1089+ WHERE LOWER(CAST(primary_class AS VARCHAR)) = 'not_found_retry';
1090+ `
1091+ rows , err := db .Query (q )
1092+ if err != nil {
1093+ return nil , err
1094+ }
1095+ defer rows .Close ()
1096+ out := map [string ]struct {}{}
1097+ for rows .Next () {
1098+ var h string
1099+ if err := rows .Scan (& h ); err != nil {
1100+ return nil , err
1101+ }
1102+ out [strings .ToLower (strings .TrimSpace (h ))] = struct {}{}
1103+ }
1104+ return out , rows .Err ()
1105+ }
1106+
1107+ // upgradeToNotFound promotes a not_found_retry row to not_found (permanent tombstone).
1108+ func upgradeToNotFound (db * sql.DB , hashNorm string ) error {
1109+ q := `
1110+ UPDATE mevcommit_57173.processed_l1_txns_v2
1111+ SET primary_class = 'not_found'
1112+ WHERE l1_tx_hash = ? AND LOWER(CAST(primary_class AS VARCHAR)) = 'not_found_retry';
1113+ `
1114+ _ , err := db .Exec (q , hashNorm )
1115+ return err
1116+ }
1117+
10021118func insertV2NotFoundRow (db * sql.DB , c Candidate ) error {
10031119 q := `
10041120INSERT INTO mevcommit_57173.processed_l1_txns_v2 (
@@ -1009,7 +1125,7 @@ INSERT INTO mevcommit_57173.processed_l1_txns_v2 (
10091125 primary_class
10101126) VALUES (?, ?, ?, ?, ?);
10111127`
1012- primary := "not_found "
1128+ primary := "not_found_retry "
10131129
10141130 _ , err := db .Exec (q ,
10151131 c .HashNorm ,
@@ -1455,47 +1571,52 @@ func fetchTransaction(txHash0x, apiKey string) (*TxResponse, error) {
14551571
14561572 url := fmt .Sprintf ("%s/%s/transaction_v2/%s/?no-logs=false" , covalentBaseURL , chainName , txHash0x )
14571573
1458- ctx , cancel := context .WithTimeout (context .Background (), 40 * time .Second )
1459- defer cancel ()
1574+ const maxRetries = 2
1575+ for attempt := 0 ; ; attempt ++ {
1576+ ctx , cancel := context .WithTimeout (context .Background (), covalentFetchTimeout )
14601577
1461- req , err := http .NewRequestWithContext (ctx , "GET" , url , nil )
1462- if err != nil {
1463- return nil , err
1464- }
1578+ req , err := http .NewRequestWithContext (ctx , "GET" , url , nil )
1579+ if err != nil {
1580+ cancel ()
1581+ return nil , err
1582+ }
14651583
1466- // Covalent v1 API auth: HTTP Basic (key as username, empty password)
1467- req .SetBasicAuth (apiKey , "" )
1468- req .Header .Set ("Accept" , "application/json" )
1584+ // Covalent v1 API auth: HTTP Basic (key as username, empty password)
1585+ req .SetBasicAuth (apiKey , "" )
1586+ req .Header .Set ("Accept" , "application/json" )
14691587
1470- start := time .Now ()
1471- resp , err := httpClient .Do (req )
1472- dur := time .Since (start )
1473- if err != nil {
1474- return nil , fmt .Errorf ("tx request error after %s: %w" , dur , err )
1475- }
1476- defer func () {
1477- if err := resp .Body .Close (); err != nil {
1478- log .Printf ("resp.Body.Close: %v" , err )
1588+ start := time .Now ()
1589+ resp , err := httpClient .Do (req )
1590+ dur := time .Since (start )
1591+ if err != nil {
1592+ cancel ()
1593+ if attempt < maxRetries {
1594+ log .Printf ("fetchTransaction %s: timeout on attempt %d/%d, retrying" , txHash0x , attempt + 1 , maxRetries + 1 )
1595+ continue
1596+ }
1597+ return nil , fmt .Errorf ("tx request error after %s: %w" , dur , err )
14791598 }
1480- }()
14811599
1482- body , readErr := io .ReadAll (resp .Body )
1483- if readErr != nil {
1484- return nil , fmt .Errorf ("read tx body: %w" , readErr )
1485- }
1486- if resp .StatusCode != 200 {
1487- return nil , fmt .Errorf ("covalent tx HTTP %d: %s" , resp .StatusCode , truncateBody (body ))
1488- }
1600+ body , readErr := io .ReadAll (resp .Body )
1601+ resp .Body .Close ()
1602+ cancel ()
1603+ if readErr != nil {
1604+ return nil , fmt .Errorf ("read tx body: %w" , readErr )
1605+ }
1606+ if resp .StatusCode != 200 {
1607+ return nil , fmt .Errorf ("covalent tx HTTP %d: %s" , resp .StatusCode , truncateBody (body ))
1608+ }
14891609
1490- var txResp TxResponse
1491- if err := json .Unmarshal (body , & txResp ); err != nil {
1492- return nil , fmt .Errorf ("covalent tx JSON decode: %w; body: %s" , err , truncateBody (body ))
1493- }
1494- if txResp .Error {
1495- log .Printf ("fetchTransaction %s status=%d" , txHash0x , resp .StatusCode )
1496- return nil , fmt .Errorf ("covalent tx error: %s" , txResp .ErrorMessage )
1610+ var txResp TxResponse
1611+ if err := json .Unmarshal (body , & txResp ); err != nil {
1612+ return nil , fmt .Errorf ("covalent tx JSON decode: %w; body: %s" , err , truncateBody (body ))
1613+ }
1614+ if txResp .Error {
1615+ log .Printf ("fetchTransaction %s status=%d" , txHash0x , resp .StatusCode )
1616+ return nil , fmt .Errorf ("covalent tx error: %s" , txResp .ErrorMessage )
1617+ }
1618+ return & txResp , nil
14971619 }
1498- return & txResp , nil
14991620}
15001621
15011622func isCovalentTxNotFound (err error ) bool {
0 commit comments