@@ -834,45 +834,12 @@ func (f *LogFetcher) mergeSortedLogs(batches [][]*ethtypes.Log) []*ethtypes.Log
834834 return res
835835}
836836
837- func (f * LogFetcher ) ensureHeightAvailable (ctx context.Context , height int64 ) error {
838- if f .watermarks == nil {
839- return nil
840- }
841- return f .watermarks .EnsureHeightAvailable (ctx , height )
842- }
843-
844837func (f * LogFetcher ) latestHeight (ctx context.Context ) (int64 , error ) {
845- if f .watermarks != nil {
846- return f .watermarks .LatestHeight (ctx )
847- }
848- if f .ctxProvider == nil {
849- return 0 , fmt .Errorf ("ctx provider not configured" )
850- }
851- return f .ctxProvider (LatestCtxHeight ).BlockHeight (), nil
838+ return f .watermarks .LatestHeight (ctx )
852839}
853840
854841func (f * LogFetcher ) earliestHeight (ctx context.Context ) (int64 , error ) {
855- if f .watermarks != nil {
856- return f .watermarks .EarliestHeight (ctx )
857- }
858- if f .ctxProvider == nil {
859- return 0 , fmt .Errorf ("ctx provider not configured" )
860- }
861- storeCtx := f .ctxProvider (LatestCtxHeight )
862- ms := storeCtx .MultiStore ()
863- if ms == nil {
864- return 0 , nil
865- }
866- earliest := int64 (0 )
867- func () {
868- defer func () {
869- if r := recover (); r != nil {
870- earliest = 0
871- }
872- }()
873- earliest = ms .GetEarliestVersion ()
874- }()
875- return earliest , nil
842+ return f .watermarks .EarliestHeight (ctx )
876843}
877844
878845// Pooled version that reuses slice allocation
@@ -957,30 +924,25 @@ func (f *LogFetcher) fetchBlocksByCrit(ctx context.Context, crit filters.FilterC
957924 return res , 0 , false , nil
958925 }
959926
960- block , err := blockByHashWithRetry (ctx , f .tmClient , crit .BlockHash [:], 1 )
927+ block , err := blockByHashRespectingWatermarks (ctx , f .tmClient , f . watermarks , crit .BlockHash [:], 1 )
961928 if err != nil {
962929 // For non-existent blocks, return empty channel instead of error
963930 res := make (chan * coretypes.ResultBlock )
964931 close (res )
965932 return res , 0 , false , nil
966933 }
967- if err := f .ensureHeightAvailable (ctx , block .Block .Height ); err != nil {
968- res := make (chan * coretypes.ResultBlock )
969- close (res )
970- return res , 0 , false , nil
971- }
972934 res := make (chan * coretypes.ResultBlock , 1 )
973935 res <- block
974936 close (res )
975937 return res , 0 , false , nil
976938 }
977939
978940 applyOpenEndedLogLimit := f .filterConfig .maxLog > 0 && (crit .FromBlock == nil || crit .ToBlock == nil )
979- latest , err := f .latestHeight (ctx )
941+ latest , err := f .watermarks . LatestHeight (ctx )
980942 if err != nil {
981943 return nil , 0 , false , err
982944 }
983- earliest , err := f .earliestHeight (ctx )
945+ earliest , err := f .watermarks . EarliestHeight (ctx )
984946 if err != nil {
985947 earliest = 0
986948 }
@@ -1059,7 +1021,7 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi
10591021 // check cache first, without holding the semaphore
10601022 if cachedEntry , found := f .globalBlockCache .Get (height ); found {
10611023 if cachedEntry .Block != nil {
1062- if err := f .ensureHeightAvailable (ctx , cachedEntry .Block .Block .Height ); err != nil {
1024+ if err := f .watermarks . EnsureHeightAvailable (ctx , cachedEntry .Block .Block .Height ); err != nil {
10631025 continue
10641026 }
10651027 }
@@ -1074,7 +1036,7 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi
10741036 if cachedEntry , found := f .globalBlockCache .Get (height ); found {
10751037 <- f .dbReadSemaphore
10761038 if cachedEntry .Block != nil {
1077- if err := f .ensureHeightAvailable (ctx , cachedEntry .Block .Block .Height ); err != nil {
1039+ if err := f .watermarks . EnsureHeightAvailable (ctx , cachedEntry .Block .Block .Height ); err != nil {
10781040 continue
10791041 }
10801042 }
@@ -1109,10 +1071,6 @@ func (f *LogFetcher) processBatch(ctx context.Context, start, end int64, crit fi
11091071 <- f .dbReadSemaphore
11101072 continue
11111073 }
1112- if err := f .ensureHeightAvailable (ctx , block .Block .Height ); err != nil {
1113- <- f .dbReadSemaphore
1114- continue
1115- }
11161074
11171075 // Use LoadOrStore to create/get cache entry atomically
11181076 entry := loadOrStoreCacheEntry (& f .cacheCreationMutex , f .globalBlockCache , height , block )
0 commit comments