-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelpers.py
More file actions
129 lines (101 loc) · 2.78 KB
/
helpers.py
File metadata and controls
129 lines (101 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import secrets
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import Transaction, Block, Address, Output
from app.utils import utcnow, to_timestamp
async def create_transaction(
session: AsyncSession,
currencies: list[str] | None = None,
size: int = 1024,
height: int = 1,
locktime: int = 0,
version: int = 1,
amount: dict[str, float] | None = None,
blockhash: str | None = None,
addresses: list[str] | None = None,
coinbase: bool = False,
) -> Transaction:
if currencies is None:
currencies = ["MBC"]
if amount is None:
amount = {"MBC": 10.0}
now = utcnow()
transaction = Transaction(
currencies=currencies,
txid=secrets.token_hex(32),
blockhash=blockhash or secrets.token_hex(32),
created=now,
timestamp=to_timestamp(now),
size=size,
height=height,
locktime=locktime,
version=version,
amount=amount,
coinbase=coinbase,
addresses=addresses or [secrets.token_hex(32), secrets.token_hex(32)],
)
session.add(transaction)
await session.commit()
return transaction
async def create_block(
session: AsyncSession,
blockhash: str = None,
height: int = 1,
movements: dict = None,
transactions: list[str] = None,
prev_blockhash: str = None,
) -> Block:
if movements is None:
movements = {}
if transactions is None:
transactions = []
now = utcnow()
block = Block(
blockhash=blockhash or secrets.token_hex(32),
transactions=transactions,
height=height,
movements=movements,
created=now,
timestamp=to_timestamp(now),
prev_blockhash=prev_blockhash,
)
session.add(block)
await session.commit()
return block
async def create_address(session: AsyncSession) -> Address:
address = Address(
address=secrets.token_hex(32),
)
session.add(address)
await session.commit()
return address
async def create_output(
session: AsyncSession,
currency: str = "MBC",
shortcut: str = "aabbccddeeff0011223344",
blockhash: str = None,
address: str = None,
txid: str = None,
amount: float = 10.0,
timelock: int = 0,
type: str = "new_token",
spent: bool = False,
index: int = 1,
):
output = Output(
currency=currency,
shortcut=shortcut,
blockhash=blockhash or secrets.token_hex(32),
address=address or secrets.token_hex(32),
txid=txid or secrets.token_hex(32),
amount=amount, # type: ignore
timelock=timelock,
type=type,
meta={},
spent=spent,
script="",
asm="",
index=index,
)
session.add(output)
await session.commit()
return output