-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync_receiver_to_catalog.py
More file actions
90 lines (78 loc) · 2.44 KB
/
sync_receiver_to_catalog.py
File metadata and controls
90 lines (78 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from __future__ import annotations
import argparse
from converter.sync import ConverterSyncService, SyncBatchEvent, SyncJob
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Sync products from receiver DB into catalog DB",
)
parser.add_argument(
"--receiver-db",
required=True,
help="Receiver DB path (SQLite, tests/local) or PostgreSQL DSN",
)
parser.add_argument(
"--catalog-db",
required=True,
help="Catalog DB path (SQLite, tests/local) or PostgreSQL DSN",
)
parser.add_argument(
"--parser-name",
default="fixprice",
help="Filter by parser_name from receiver run_artifacts",
)
parser.add_argument(
"--receiver-fetch-size",
type=int,
default=2000,
help="Max records per receiver fetch",
)
parser.add_argument(
"--write-chunk-size",
type=int,
default=1000,
help="Max normalized records per atomic write chunk",
)
parser.add_argument(
"--max-batches",
type=int,
default=0,
help="Stop after N batches (0 means no limit)",
)
parser.add_argument(
"--sync-version",
choices=("v2",),
default="v2",
help="Sync engine version",
)
return parser
def main() -> None:
args = _build_parser().parse_args()
print(
"Starting sync:",
f"receiver={args.receiver_db}",
f"catalog={args.catalog_db}",
f"parser={args.parser_name}",
f"sync_version={args.sync_version}",
f"receiver_fetch_size={args.receiver_fetch_size}",
f"write_chunk_size={args.write_chunk_size}",
)
service = ConverterSyncService()
def _on_batch(event: SyncBatchEvent) -> None:
print(
f"Batch {event.batch_number}: processed={event.batch_size} total={event.total_processed}"
)
result = service.run(
SyncJob(
receiver_db=args.receiver_db,
catalog_db=args.catalog_db,
parser_name=args.parser_name,
receiver_fetch_size=args.receiver_fetch_size,
write_chunk_size=args.write_chunk_size,
sync_version=args.sync_version,
max_batches=args.max_batches,
),
on_batch=_on_batch,
)
print(f"Sync finished: batches={result.batches} total_processed={result.total_processed}")
if __name__ == "__main__":
main()