-
Notifications
You must be signed in to change notification settings - Fork 125
Expand file tree
/
Copy pathbidding.py
More file actions
88 lines (75 loc) · 2.33 KB
/
bidding.py
File metadata and controls
88 lines (75 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from typing import Annotated
from fastapi import APIRouter, Depends
from lato import Application
from api.dependencies import get_application
from api.models.bidding import BiddingResponse, PlaceBidRequest
from modules.bidding.application.command import PlaceBidCommand, RetractBidCommand
from modules.bidding.application.query.get_bidding_details import GetBiddingDetails
router = APIRouter()
"""
Inspired by https://developer.ebay.com/api-docs/buy/offer/types/api:Bidding
"""
@router.get("/bidding/{listing_id}", tags=["bidding"], response_model=BiddingResponse)
async def get_bidding_details_of_listing(
listing_id, app: Annotated[Application, Depends(get_application)]
):
"""
Shows listing details
"""
query = GetBiddingDetails(listing_id=listing_id)
result = await app.execute_async(query)
return BiddingResponse(
listing_id=result.id,
auction_end_date=result.ends_at,
bids=result.bids,
)
@router.post(
"/bidding/{listing_id}/place_bid", tags=["bidding"], response_model=BiddingResponse
)
async def place_bid(
listing_id,
request_body: PlaceBidRequest,
app: Annotated[Application, Depends(get_application)],
):
"""
Places a bid on a listing
"""
# TODO: get bidder from current user
command = PlaceBidCommand(
listing_id=listing_id,
bidder_id=request_body.bidder_id,
amount=request_body.amount,
)
await app.execute_async(command)
# execute_async, or execute?
query = GetBiddingDetails(listing_id=listing_id)
result = await app.execute_async(query)
return BiddingResponse(
listing_id=result.id,
auction_end_date=result.ends_at,
bids=result.bids,
)
@router.post(
"/bidding/{listing_id}/retract_bid",
tags=["bidding"],
response_model=BiddingResponse,
)
async def retract_bid(
listing_id, app: Annotated[Application, Depends(get_application)]
):
"""
Retracts a bid from a listing
"""
command = RetractBidCommand(
listing_id=listing_id,
bidder_id="",
)
app.execute(command)
query = GetBiddingDetails(listing_id=listing_id)
query_result = app.execute_query(query)
payload = query_result.payload
return BiddingResponse(
listing_id=str(payload.id),
auction_end_date=payload.ends_at,
bids=payload.bids,
)