Skip to content

Commit 16c3532

Browse files
authored
fix: add rpc backoff, stop miles processing without tx gas details (#911)
* minor querying fix * fix: stop processing without gas info, rpc call backoff
1 parent 75dc63b commit 16c3532

2 files changed

Lines changed: 62 additions & 31 deletions

File tree

tools/fastswap-miles/miles.go

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -252,26 +252,30 @@ WHERE processed = false
252252
erc20FastRPCSet := batchCheckFastRPC(cfg.Logger, cfg.DB, allErc20Hashes)
253253

254254
for token, batch := range batches {
255-
gasCosts := make([]*big.Int, len(batch.Txs))
256-
bidCosts := make([]*big.Int, len(batch.Txs))
257255
totalOriginalGasCost := big.NewInt(0)
258256
totalOriginalBidCost := big.NewInt(0)
259257

260-
var skippedRows []erc20Row
258+
// First pass: separate rows into ready, pending-bid, and not-in-fastrpc.
259+
// Pending-bid rows are excluded from the batch so they retry next cycle.
260+
var readyTxs []erc20Row
261+
var readyGasCosts []*big.Int
262+
var readyBidCosts []*big.Int
261263

262-
for i, r := range batch.Txs {
264+
for _, r := range batch.Txs {
263265
bidCostWei := getBidCost(erc20BidMap, r.txHash)
264266
if bidCostWei.Sign() == 0 {
265267
if erc20FastRPCSet[strings.ToLower(r.txHash)] {
266268
cfg.Logger.Info("erc20 tx in FastRPC but bid not indexed yet, will retry",
267269
slog.String("tx", r.txHash), slog.String("user", r.user))
268-
} else {
269-
cfg.Logger.Info("erc20 tx not in FastRPC, skipping with 0 miles",
270-
slog.String("tx", r.txHash), slog.String("user", r.user))
271-
skippedRows = append(skippedRows, r)
270+
continue // genuinely skip — will retry next cycle
271+
}
272+
cfg.Logger.Info("erc20 tx not in FastRPC, skipping with 0 miles",
273+
slog.String("tx", r.txHash), slog.String("user", r.user))
274+
surplusWei, _ := new(big.Int).SetString(r.surplus, 10)
275+
if !cfg.DryRun {
276+
markProcessed(cfg.DB, r.txHash, weiToEth(surplusWei), 0, 0, "0")
272277
}
273-
gasCosts[i] = big.NewInt(0)
274-
bidCosts[i] = big.NewInt(0)
278+
processed++
275279
continue
276280
}
277281

@@ -284,25 +288,29 @@ WHERE processed = false
284288
}
285289
}
286290

287-
gasCosts[i] = gasCostWei
288-
bidCosts[i] = bidCostWei
291+
readyTxs = append(readyTxs, r)
292+
readyGasCosts = append(readyGasCosts, gasCostWei)
293+
readyBidCosts = append(readyBidCosts, bidCostWei)
289294

290295
totalOriginalGasCost.Add(totalOriginalGasCost, gasCostWei)
291296
totalOriginalBidCost.Add(totalOriginalBidCost, bidCostWei)
292297
}
293298

294-
for _, r := range skippedRows {
299+
if len(readyTxs) == 0 {
300+
continue
301+
}
302+
303+
// Recalculate TotalSum for only the ready rows
304+
readyTotalSum := big.NewInt(0)
305+
for _, r := range readyTxs {
295306
surplusWei, _ := new(big.Int).SetString(r.surplus, 10)
296-
if !cfg.DryRun {
297-
markProcessed(cfg.DB, r.txHash, weiToEth(surplusWei), 0, 0, "0")
298-
}
299-
processed++
307+
readyTotalSum.Add(readyTotalSum, surplusWei)
300308
}
301309

302310
reqBody := barterRequest{
303311
Source: token,
304312
Target: cfg.WETH.Hex(),
305-
SellAmount: batch.TotalSum.String(),
313+
SellAmount: readyTotalSum.String(),
306314
Recipient: cfg.ExecutorAddr.Hex(),
307315
Origin: cfg.ExecutorAddr.Hex(),
308316
MinReturnFraction: 0.98,
@@ -351,14 +359,14 @@ WHERE processed = false
351359

352360
if cfg.DryRun {
353361
cfg.Logger.Info("simulated sweep",
354-
slog.String("amount", batch.TotalSum.String()),
362+
slog.String("amount", readyTotalSum.String()),
355363
slog.String("token", token),
356364
slog.Float64("return_eth", weiToEth(expectedEthReturn)),
357365
slog.Float64("gas_eth", weiToEth(expectedGasCost)))
358366
actualEthReturn = expectedEthReturn
359367
actualSwapGasCost = expectedGasCost
360368
} else {
361-
actualEthReturn, actualSwapGasCost, err = submitFastSwapSweep(ctx, cfg.Logger, cfg.Client, cfg.L1Client, cfg.HTTPClient, cfg.Signer, cfg.ExecutorAddr, common.HexToAddress(token), batch.TotalSum, cfg.FastSwapURL, cfg.FundsRecipient, cfg.SettlementAddr, barterResp, cfg.MaxGasGwei)
369+
actualEthReturn, actualSwapGasCost, err = submitFastSwapSweep(ctx, cfg.Logger, cfg.Client, cfg.L1Client, cfg.HTTPClient, cfg.Signer, cfg.ExecutorAddr, common.HexToAddress(token), readyTotalSum, cfg.FastSwapURL, cfg.FundsRecipient, cfg.SettlementAddr, barterResp, cfg.MaxGasGwei)
362370
if err != nil {
363371
cfg.Logger.Error("failed to sweep token", slog.String("token", token), slog.Any("error", err))
364372
continue
@@ -369,17 +377,17 @@ WHERE processed = false
369377
slog.Float64("gas_eth", weiToEth(actualSwapGasCost)))
370378
}
371379

372-
for i, r := range batch.Txs {
380+
for i, r := range readyTxs {
373381
surplusWei, _ := new(big.Int).SetString(r.surplus, 10)
374382

375383
txGrossEth := new(big.Int).Mul(actualEthReturn, surplusWei)
376-
txGrossEth.Div(txGrossEth, batch.TotalSum)
384+
txGrossEth.Div(txGrossEth, readyTotalSum)
377385

378386
txOverheadGas := new(big.Int).Mul(actualSwapGasCost, surplusWei)
379-
txOverheadGas.Div(txOverheadGas, batch.TotalSum)
387+
txOverheadGas.Div(txOverheadGas, readyTotalSum)
380388

381-
txNetProfit := new(big.Int).Sub(txGrossEth, gasCosts[i])
382-
txNetProfit.Sub(txNetProfit, bidCosts[i])
389+
txNetProfit := new(big.Int).Sub(txGrossEth, readyGasCosts[i])
390+
txNetProfit.Sub(txNetProfit, readyBidCosts[i])
383391
txNetProfit.Sub(txNetProfit, txOverheadGas)
384392

385393
surplusEth := weiToEth(txGrossEth)
@@ -390,7 +398,7 @@ WHERE processed = false
390398
slog.String("tx", r.txHash), slog.String("user", r.user),
391399
slog.Float64("gross_eth", surplusEth), slog.Float64("net_profit_eth", netProfitEth))
392400
if !cfg.DryRun {
393-
markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, bidCosts[i].String())
401+
markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, readyBidCosts[i].String())
394402
}
395403
processed++
396404
continue
@@ -405,7 +413,7 @@ WHERE processed = false
405413
slog.String("tx", r.txHash), slog.String("user", r.user),
406414
slog.Float64("gross_eth", surplusEth), slog.Float64("net_profit_eth", netProfitEth))
407415
if !cfg.DryRun {
408-
markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, bidCosts[i].String())
416+
markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, readyBidCosts[i].String())
409417
}
410418
processed++
411419
continue
@@ -427,10 +435,11 @@ WHERE processed = false
427435
miles,
428436
)
429437
if err != nil {
430-
cfg.Logger.Error("fuel submit failed (will not retry to avoid duplicate swap)",
438+
cfg.Logger.Error("fuel submit failed, will retry next cycle",
431439
slog.String("tx", r.txHash), slog.Any("error", err))
440+
continue // don't mark processed — retry next cycle
432441
}
433-
markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), bidCosts[i].String())
442+
markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), readyBidCosts[i].String())
434443
processed++
435444
}
436445
}

tools/fastswap-miles/sweep.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,18 @@ func indexBatch(
6060
txHash := ev.Raw.TxHash.Hex()
6161
blockNum := ev.Raw.BlockNumber
6262

63-
receipt, err := client.TransactionReceipt(ctx, ev.Raw.TxHash)
63+
var receipt *types.Receipt
64+
for attempt := 0; attempt < 3; attempt++ {
65+
receipt, err = client.TransactionReceipt(ctx, ev.Raw.TxHash)
66+
if err == nil {
67+
break
68+
}
69+
if strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests") {
70+
time.Sleep(time.Duration(attempt+1) * 2 * time.Second)
71+
continue
72+
}
73+
break
74+
}
6475
if err != nil {
6576
logger.Warn("receipt fetch failed, skipping gas cost", slog.String("tx", txHash), slog.Any("error", err))
6677
}
@@ -72,7 +83,18 @@ func indexBatch(
7283
)
7384
}
7485

75-
header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum))
86+
var header *types.Header
87+
for attempt := 0; attempt < 3; attempt++ {
88+
header, err = client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum))
89+
if err == nil {
90+
break
91+
}
92+
if strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests") {
93+
time.Sleep(time.Duration(attempt+1) * 2 * time.Second)
94+
continue
95+
}
96+
break
97+
}
7698
if err != nil {
7799
logger.Warn("header fetch failed", slog.Uint64("block", blockNum), slog.Any("error", err))
78100
}

0 commit comments

Comments
 (0)