-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
119 lines (89 loc) · 2.91 KB
/
__init__.py
File metadata and controls
119 lines (89 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""Methods for exchange."""
import logging
from enum import Enum
from random import random
from typing import Optional
from pydantic import Field, PositiveInt
from cyberfusion.RabbitMQConsumer.contracts import (
HandlerBase,
RPCRequestBase,
RPCResponseBase,
RPCResponseData,
)
logger = logging.getLogger(__name__)
class FavouriteFoodEnum(str, Enum):
"""Favourite foods."""
ONION = "onion"
ORANGE = "orange"
BANANA = "banana"
class RPCRequestExample(RPCRequestBase):
"""Data part of RPC request."""
favourite_food: FavouriteFoodEnum = Field(
description="Human-readable favourite food."
)
chance_percentage: PositiveInt = Field(
description="Chances of favourite food passing.", default=20
)
class Config:
"""Config."""
json_schema_extra = {
"examples": [
{"favourite_food": FavouriteFoodEnum.BANANA},
{
"favourite_food": FavouriteFoodEnum.ONION,
"chance_percentage": 50,
},
]
}
class RPCResponseDataExample(RPCResponseData):
"""Data part of RPC response."""
tolerable: bool
class RPCResponseExample(RPCResponseBase):
"""Base attributes for RPC response."""
data: Optional[RPCResponseDataExample]
class Config:
"""Config."""
json_schema_extra = {
"examples": [
{
"_description": "Not tolerable",
"success": True,
"message": "Determined toleration",
"data": {"tolerable": False},
},
{
"_description": "Tolerable",
"success": True,
"message": "Determined toleration",
"data": {"tolerable": True},
},
]
}
def determine_toleration(
favourite_food: FavouriteFoodEnum, chance_percentage: int
) -> bool:
"""Determine if food is tolerable.
Had this not been an example, you would probably have done some computation
to get a result.
"""
if favourite_food in FavouriteFoodEnum.ONION:
return False
return random() > (chance_percentage / 100.0)
class Handler(HandlerBase):
"""Class to handle RPC requests."""
@property
def lock_attribute(self) -> str:
"""Attribute of RPC request, used for locking.
If the value matches, no two requests may run simultaneously.
"""
return "favourite_food"
def __call__(self, request: RPCRequestExample) -> RPCResponseExample:
"""Handle message."""
tolerable = determine_toleration(
request.favourite_food, request.chance_percentage
)
return RPCResponseExample(
success=True,
message="Determined toleration",
data=RPCResponseDataExample(tolerable=tolerable),
)