Skip to content

Commit a1751ef

Browse files
committed
feat: log activity during backfill with original timestamps
Add activity logging to the backfill process so historical data shows up in the activity dashboard. Uses the record's createdAt timestamp (or falls back to current time) so entries appear at the correct time in the chart. - Add activityRepo to Backfiller struct - Add LogActivityWithStatus method to set status directly (avoids double DB call) - Log activity with 'success' status after successful record insert - Extract createdAt from record JSON for accurate timestamps - Update all NewBackfiller calls to pass activity repository
1 parent 6ce8ab8 commit a1751ef

3 files changed

Lines changed: 91 additions & 12 deletions

File tree

cmd/hypergoat/main.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ func run() error {
310310
backfillConfig.PLCURL = plcURL
311311
}
312312

313-
actorBackfiller := backfill.NewBackfiller(backfillConfig, recordsRepo, actorsRepo)
313+
backfillActivityRepo := repositories.NewJetstreamActivityRepository(db)
314+
actorBackfiller := backfill.NewBackfiller(backfillConfig, recordsRepo, actorsRepo, backfillActivityRepo)
314315

315316
// Single actor backfill callback
316317
adminHandler.Resolver().SetBackfillCallback(func(ctx context.Context, did string) error {
@@ -341,7 +342,7 @@ func run() error {
341342
// Create a new backfiller with the discovered collections
342343
bfConfig := backfillConfig
343344
bfConfig.Collections = collections
344-
bf := backfill.NewBackfiller(bfConfig, recordsRepo, actorsRepo)
345+
bf := backfill.NewBackfiller(bfConfig, recordsRepo, actorsRepo, backfillActivityRepo)
345346
defer bf.Close()
346347

347348
slog.Info("[backfill] Starting full network backfill", "collections", collections)
@@ -535,7 +536,8 @@ func run() error {
535536
bfConfig.PLCURL = plcURL
536537
}
537538

538-
backfiller := backfill.NewBackfiller(bfConfig, recordsRepo, actorsRepo)
539+
startupActivityRepo := repositories.NewJetstreamActivityRepository(db)
540+
backfiller := backfill.NewBackfiller(bfConfig, recordsRepo, actorsRepo, startupActivityRepo)
539541

540542
// Run backfill in background
541543
go func() {

internal/backfill/backfill.go

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package backfill
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"log/slog"
78
"os"
@@ -111,10 +112,11 @@ func (s *Stats) Duration() time.Duration {
111112

112113
// Backfiller coordinates historical data backfill.
113114
type Backfiller struct {
114-
config Config
115-
client *Client
116-
recordsRepo *repositories.RecordsRepository
117-
actorsRepo *repositories.ActorsRepository
115+
config Config
116+
client *Client
117+
recordsRepo *repositories.RecordsRepository
118+
actorsRepo *repositories.ActorsRepository
119+
activityRepo *repositories.JetstreamActivityRepository
118120

119121
// httpSem is a global semaphore limiting concurrent HTTP requests.
120122
// This prevents overwhelming the network and running out of file descriptors.
@@ -132,6 +134,7 @@ func NewBackfiller(
132134
config Config,
133135
recordsRepo *repositories.RecordsRepository,
134136
actorsRepo *repositories.ActorsRepository,
137+
activityRepo *repositories.JetstreamActivityRepository,
135138
) *Backfiller {
136139
// Create DID resolver with custom PLC URL
137140
didResolver := oauth.NewDIDResolver(
@@ -152,6 +155,7 @@ func NewBackfiller(
152155
client: NewClient(config.RelayURL, config.PLCURL, config.MaxHTTPConcurrent),
153156
recordsRepo: recordsRepo,
154157
actorsRepo: actorsRepo,
158+
activityRepo: activityRepo,
155159
httpSem: make(chan struct{}, config.MaxHTTPConcurrent),
156160
didCache: didCache,
157161
stopCacheCleanup: stopCleanup,
@@ -530,6 +534,17 @@ func (b *Backfiller) processRepo(ctx context.Context, pdsURL string, data *Atpro
530534
} else {
531535
insertedCount = len(filteredRecords)
532536
atomic.AddInt64(&b.stats.RecordsInserted, int64(insertedCount))
537+
538+
// Log activity for each inserted record (with 'success' status since already inserted)
539+
if b.activityRepo != nil {
540+
for _, rec := range filteredRecords {
541+
timestamp := extractCreatedAt(rec.JSON)
542+
_, err := b.activityRepo.LogActivityWithStatus(ctx, timestamp, "create", rec.Collection, rec.DID, rec.JSON, "success")
543+
if err != nil {
544+
slog.Debug("[backfill] Failed to log activity", "uri", rec.URI, "error", err)
545+
}
546+
}
547+
}
533548
}
534549
}
535550
insertMs := time.Since(insertStart).Milliseconds()
@@ -650,6 +665,14 @@ func (b *Backfiller) processRepoLegacy(ctx context.Context, pdsURL string, data
650665
if result == repositories.Inserted {
651666
totalInserted++
652667
atomic.AddInt64(&b.stats.RecordsInserted, 1)
668+
// Log activity for the inserted record
669+
if b.activityRepo != nil {
670+
timestamp := extractCreatedAt(string(rec.Value))
671+
_, err := b.activityRepo.LogActivityWithStatus(ctx, timestamp, "create", collection, data.DID, string(rec.Value), "success")
672+
if err != nil {
673+
slog.Debug("[backfill] Failed to log activity", "uri", rec.URI, "error", err)
674+
}
675+
}
653676
}
654677
}
655678
}
@@ -723,6 +746,17 @@ func (b *Backfiller) BackfillActor(ctx context.Context, did string) (int, error)
723746
if err := b.recordsRepo.BatchInsert(ctx, filteredRecords); err != nil {
724747
return 0, fmt.Errorf("batch insert failed: %w", err)
725748
}
749+
750+
// Log activity for each inserted record
751+
if b.activityRepo != nil {
752+
for _, rec := range filteredRecords {
753+
timestamp := extractCreatedAt(rec.JSON)
754+
_, err := b.activityRepo.LogActivityWithStatus(ctx, timestamp, "create", rec.Collection, rec.DID, rec.JSON, "success")
755+
if err != nil {
756+
slog.Debug("[backfill] Failed to log activity", "uri", rec.URI, "error", err)
757+
}
758+
}
759+
}
726760
}
727761

728762
slog.Info("[backfill] Actor backfill complete (CAR)",
@@ -758,6 +792,14 @@ func (b *Backfiller) backfillActorLegacy(ctx context.Context, data *AtprotoData)
758792

759793
if result == repositories.Inserted {
760794
totalRecords++
795+
// Log activity for the inserted record
796+
if b.activityRepo != nil {
797+
timestamp := extractCreatedAt(string(rec.Value))
798+
_, err := b.activityRepo.LogActivityWithStatus(ctx, timestamp, "create", collection, data.DID, string(rec.Value), "success")
799+
if err != nil {
800+
slog.Debug("[backfill] Failed to log activity", "uri", rec.URI, "error", err)
801+
}
802+
}
761803
}
762804
}
763805
}
@@ -786,3 +828,28 @@ func ParseCollections(s string) []string {
786828
}
787829
return result
788830
}
831+
832+
// extractCreatedAt extracts the createdAt timestamp from a record's JSON.
833+
// Returns the parsed time or the current time if not found/parseable.
834+
func extractCreatedAt(recordJSON string) time.Time {
835+
var data map[string]interface{}
836+
if err := json.Unmarshal([]byte(recordJSON), &data); err != nil {
837+
return time.Now()
838+
}
839+
840+
createdAt, ok := data["createdAt"].(string)
841+
if !ok {
842+
return time.Now()
843+
}
844+
845+
// Try parsing as RFC3339
846+
t, err := time.Parse(time.RFC3339, createdAt)
847+
if err != nil {
848+
// Try without timezone
849+
t, err = time.Parse("2006-01-02T15:04:05", createdAt)
850+
if err != nil {
851+
return time.Now()
852+
}
853+
}
854+
return t
855+
}

internal/database/repositories/jetstream_activity.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,20 @@ func NewJetstreamActivityRepository(db database.Executor) *JetstreamActivityRepo
4040
return &JetstreamActivityRepository{db: db}
4141
}
4242

43-
// LogActivity logs a new activity entry and returns the ID.
43+
// LogActivity logs a new activity entry with 'pending' status and returns the ID.
4444
func (r *JetstreamActivityRepository) LogActivity(
4545
ctx context.Context,
4646
timestamp time.Time,
4747
operation, collection, did, eventJSON string,
48+
) (int64, error) {
49+
return r.LogActivityWithStatus(ctx, timestamp, operation, collection, did, eventJSON, "pending")
50+
}
51+
52+
// LogActivityWithStatus logs a new activity entry with a custom status and returns the ID.
53+
func (r *JetstreamActivityRepository) LogActivityWithStatus(
54+
ctx context.Context,
55+
timestamp time.Time,
56+
operation, collection, did, eventJSON, status string,
4857
) (int64, error) {
4958
var sqlStr string
5059
var timestampStr string
@@ -54,24 +63,25 @@ func (r *JetstreamActivityRepository) LogActivity(
5463
timestampStr = timestamp.Format(time.RFC3339)
5564
sqlStr = fmt.Sprintf(`INSERT INTO jetstream_activity
5665
(timestamp, operation, collection, did, status, event_json)
57-
VALUES (%s, %s, %s, %s, 'pending', %s)
66+
VALUES (%s, %s, %s, %s, %s, %s)
5867
RETURNING id`,
5968
r.db.Placeholder(1), r.db.Placeholder(2), r.db.Placeholder(3),
60-
r.db.Placeholder(4), r.db.Placeholder(5))
69+
r.db.Placeholder(4), r.db.Placeholder(5), r.db.Placeholder(6))
6170
default:
6271
timestampStr = timestamp.Format("2006-01-02 15:04:05")
6372
sqlStr = fmt.Sprintf(`INSERT INTO jetstream_activity
6473
(timestamp, operation, collection, did, status, event_json)
65-
VALUES (%s, %s, %s, %s, 'pending', %s)`,
74+
VALUES (%s, %s, %s, %s, %s, %s)`,
6675
r.db.Placeholder(1), r.db.Placeholder(2), r.db.Placeholder(3),
67-
r.db.Placeholder(4), r.db.Placeholder(5))
76+
r.db.Placeholder(4), r.db.Placeholder(5), r.db.Placeholder(6))
6877
}
6978

7079
params := []database.Value{
7180
database.Text(timestampStr),
7281
database.Text(operation),
7382
database.Text(collection),
7483
database.Text(did),
84+
database.Text(status),
7585
database.Text(eventJSON),
7686
}
7787

0 commit comments

Comments
 (0)