Skip to content

Commit 20e07d5

Browse files
committed
Make a result type
1 parent 754455c commit 20e07d5

9 files changed

Lines changed: 63 additions & 72 deletions

File tree

qio/consumer.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@
55
from typing import Any
66

77
from .invocation import Invocation
8+
from .invocation import InvocationCompleted
89
from .invocation import InvocationContinued
9-
from .invocation import InvocationErrored
1010
from .invocation import InvocationResumed
1111
from .invocation import InvocationStarted
12-
from .invocation import InvocationSucceeded
1312
from .invocation import InvocationSuspended
1413
from .invocation import InvocationThrew
1514
from .invocation import LocalInvocationContinued
1615
from .invocation import LocalInvocationSuspended
1716
from .invocation import LocalInvocationThrew
1817
from .message import Message
1918
from .receiver import Receiver
19+
from .result import Err
20+
from .result import Ok
2021
from .stream import Stream
2122
from .suspension import Suspension
2223

@@ -87,10 +88,12 @@ def resume(self, invocation: Invocation):
8788

8889
def succeed(self, invocation: Invocation, value: Any):
8990
"""Signal that the invocation has succeeded."""
90-
self.__stream.publish(InvocationSucceeded(id=invocation.id, value=value))
91+
self.__stream.publish(InvocationCompleted(id=invocation.id, result=Ok(value)))
9192
self.__receiver.finish(self.__invocations.pop(invocation))
9293

9394
def error(self, invocation: Invocation, exception: Exception):
9495
"""Signal that the invocation has errored."""
95-
self.__stream.publish(InvocationErrored(id=invocation.id, exception=exception))
96+
self.__stream.publish(
97+
InvocationCompleted(id=invocation.id, result=Err(exception))
98+
)
9699
self.__receiver.finish(self.__invocations.pop(invocation))

qio/continuation.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,21 @@
66

77
from .id import random_id
88
from .invocation import Invocation
9+
from .result import Err
10+
from .result import Ok
11+
from .result import Result
912

1013

1114
@dataclass(eq=False, kw_only=True)
1215
class Continuation[T: Callable[..., Any] = Callable[..., Any]]:
1316
id: str = field(default_factory=random_id)
1417
invocation: Invocation
1518
generator: Generator[Invocation, Any, Any]
16-
17-
18-
@dataclass(eq=False, kw_only=True)
19-
class SendContinuation(Continuation):
20-
value: Any
21-
22-
def send(self) -> Any:
23-
return self.generator.send(self.value)
24-
25-
26-
@dataclass(eq=False, kw_only=True)
27-
class ThrowContinuation(Continuation):
28-
exception: Exception
29-
30-
def throw(self) -> Any:
31-
return self.generator.throw(self.exception)
19+
result: Result[Any, BaseException]
20+
21+
def resume(self) -> Any:
22+
match self.result:
23+
case Ok(value):
24+
return self.generator.send(value)
25+
case Err(exception):
26+
return self.generator.throw(exception)

qio/invocation.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
from .id import random_id
1414
from .suspension import Suspension
1515
from .suspension import SuspensionCompleted
16-
from .suspension import SuspensionErrored
1716
from .suspension import SuspensionSubmitted
18-
from .suspension import SuspensionSucceeded
1917

2018

2119
@dataclass(eq=False, kw_only=True)
@@ -135,11 +133,3 @@ class InvocationResumed(Event): ...
135133

136134
@dataclass(eq=False, kw_only=True)
137135
class InvocationCompleted(SuspensionCompleted): ...
138-
139-
140-
@dataclass(eq=False, kw_only=True)
141-
class InvocationSucceeded(SuspensionSucceeded): ...
142-
143-
144-
@dataclass(eq=False, kw_only=True)
145-
class InvocationErrored(SuspensionErrored): ...

qio/monitor.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@
44
from textual.widgets import Footer
55
from textual.widgets import Header
66

7+
from .invocation import InvocationCompleted
78
from .invocation import InvocationContinued
8-
from .invocation import InvocationErrored
99
from .invocation import InvocationResumed
1010
from .invocation import InvocationStarted
1111
from .invocation import InvocationSubmitted
12-
from .invocation import InvocationSucceeded
1312
from .invocation import InvocationSuspended
1413
from .invocation import InvocationThrew
1514
from .qio import Qio
1615
from .queue import ShutDown
16+
from .result import Err
17+
from .result import Ok
1718
from .thread import Thread
1819

1920

@@ -34,8 +35,7 @@ def __init__(self):
3435
InvocationContinued,
3536
InvocationThrew,
3637
InvocationResumed,
37-
InvocationSucceeded,
38-
InvocationErrored,
38+
InvocationCompleted,
3939
}
4040
)
4141

@@ -56,8 +56,7 @@ def handle_invocation_event(
5656
| InvocationContinued
5757
| InvocationThrew
5858
| InvocationResumed
59-
| InvocationSucceeded
60-
| InvocationErrored,
59+
| InvocationCompleted,
6160
):
6261
table = self.query_one(DataTable)
6362
match event:
@@ -99,13 +98,13 @@ def handle_invocation_event(
9998
self.__column_keys[2],
10099
"Resumed",
101100
)
102-
case InvocationSucceeded():
101+
case InvocationCompleted(result=Ok()):
103102
table.update_cell(
104103
event.id,
105104
self.__column_keys[2],
106105
"Succeeded",
107106
)
108-
case InvocationErrored():
107+
case InvocationCompleted(result=Err()):
109108
table.update_cell(
110109
event.id,
111110
self.__column_keys[2],

qio/qio.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010
from .broker import Broker
1111
from .consumer import Consumer
1212
from .invocation import Invocation
13-
from .invocation import InvocationErrored
13+
from .invocation import InvocationCompleted
1414
from .invocation import InvocationSubmitted
15-
from .invocation import InvocationSucceeded
1615
from .invocation import deserialize
1716
from .invocation import serialize
1817
from .journal import Journal
@@ -21,6 +20,8 @@
2120
from .queue import ShutDown
2221
from .queuespec import QueueSpec
2322
from .registry import ROUTINE_REGISTRY
23+
from .result import Err
24+
from .result import Ok
2425
from .routine import Routine
2526
from .stream import Stream
2627
from .thread import Thread
@@ -119,7 +120,7 @@ def unsubscribe(self, queue: Queue):
119120
@contextmanager
120121
def invocation_handler(self) -> Generator[Future]:
121122
waiting: dict[str, Future] = {}
122-
events = self.subscribe({InvocationSucceeded, InvocationErrored})
123+
events = self.subscribe({InvocationCompleted})
123124

124125
def resolver():
125126
while True:
@@ -129,11 +130,11 @@ def resolver():
129130
break
130131

131132
match event:
132-
case InvocationSucceeded(id=invocation_id, value=value):
133+
case InvocationCompleted(id=invocation_id, result=Ok(value)):
133134
if invocation_id in waiting:
134135
future = waiting.pop(invocation_id)
135136
future.set_result(value)
136-
case InvocationErrored(id=invocation_id, exception=exception):
137+
case InvocationCompleted(id=invocation_id, result=Err(exception)):
137138
if invocation_id in waiting:
138139
future = waiting.pop(invocation_id)
139140
future.set_exception(exception)

qio/result.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from dataclasses import dataclass
2+
3+
4+
@dataclass
5+
class Ok[T]:
6+
value: T
7+
8+
9+
@dataclass
10+
class Err[E: BaseException]:
11+
error: E
12+
13+
14+
type Result[T, E: BaseException] = Ok[T] | Err[E]

qio/sample_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pytest
55

66
from qio import Qio
7-
from qio.invocation import InvocationSucceeded
7+
from qio.invocation import InvocationCompleted
88
from qio.sample import irregular
99

1010

@@ -15,7 +15,7 @@ def test_integration():
1515

1616
try:
1717
qio.purge(queue="qio")
18-
events = qio.subscribe({InvocationSucceeded})
18+
events = qio.subscribe({InvocationCompleted})
1919
invocation = irregular()
2020
qio.submit(invocation)
2121

qio/suspension.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from .event import Event
1111
from .id import random_id
12+
from .result import Result
1213

1314

1415
@dataclass(eq=False, kw_only=True)
@@ -30,14 +31,5 @@ class SuspensionSubmitted(Event): ...
3031

3132

3233
@dataclass(eq=False, kw_only=True)
33-
class SuspensionCompleted(Event): ...
34-
35-
36-
@dataclass(eq=False, kw_only=True)
37-
class SuspensionSucceeded(SuspensionCompleted):
38-
value: Any = field(repr=False)
39-
40-
41-
@dataclass(eq=False, kw_only=True)
42-
class SuspensionErrored(SuspensionCompleted):
43-
exception: Exception = field(repr=False)
34+
class SuspensionCompleted(Event):
35+
result: Result[Any, BaseException] = field(repr=False)

qio/worker.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,22 @@
66
from threading import Timer
77

88
from .continuation import Continuation
9-
from .continuation import SendContinuation
10-
from .continuation import ThrowContinuation
119
from .invocation import Invocation
1210
from .invocation import LocalInvocationSuspended
1311
from .qio import Qio
1412
from .queue import Queue
1513
from .queue import ShutDown
1614
from .queuespec import QueueSpec
15+
from .result import Err
16+
from .result import Ok
1717
from .thread import Thread
1818

1919

2020
class Worker:
2121
def __init__(self, qio: Qio, queuespec: QueueSpec):
2222
self.__qio = qio
2323

24-
self.__tasks = Queue[Invocation | SendContinuation | ThrowContinuation]()
24+
self.__tasks = Queue[Invocation | Continuation]()
2525
self.__consumer = self.__qio.consume(queuespec)
2626
self.__continuer_events = self.__qio.subscribe({LocalInvocationSuspended})
2727

@@ -102,6 +102,7 @@ def listener():
102102
waiting[event.suspension.start()] = Continuation(
103103
invocation=event.invocation,
104104
generator=event.generator,
105+
result=Ok(None),
105106
)
106107
# Replace ``new`` before setting the result
107108
# to avoid short busy wait loops.
@@ -137,10 +138,10 @@ def listener():
137138
)
138139
with suppress(ShutDown):
139140
self.__tasks.put(
140-
ThrowContinuation(
141+
Continuation(
141142
invocation=continuation.invocation,
142143
generator=continuation.generator,
143-
exception=exception,
144+
result=Err(exception),
144145
)
145146
)
146147
else:
@@ -149,10 +150,10 @@ def listener():
149150
)
150151
with suppress(ShutDown):
151152
self.__tasks.put(
152-
SendContinuation(
153+
Continuation(
153154
invocation=continuation.invocation,
154155
generator=continuation.generator,
155-
value=value,
156+
result=Ok(value),
156157
),
157158
)
158159

@@ -180,7 +181,7 @@ def __runner(self):
180181
case Invocation() as invocation:
181182
self.__consumer.start(invocation)
182183
self.__run_invocation(invocation)
183-
case SendContinuation() | ThrowContinuation() as continuation:
184+
case Continuation() as continuation:
184185
self.__consumer.resume(continuation.invocation)
185186
self.__run_continuation(continuation)
186187

@@ -195,22 +196,18 @@ def __run_invocation(self, invocation: Invocation):
195196
if isinstance(result, Awaitable):
196197
generator = result.__await__()
197198
self.__run_continuation(
198-
SendContinuation(
199+
Continuation(
199200
invocation=invocation,
200201
generator=generator,
201-
value=None,
202+
result=Ok(None),
202203
)
203204
)
204205
else:
205206
self.__consumer.succeed(invocation, result)
206207

207-
def __run_continuation(self, continuation: SendContinuation | ThrowContinuation):
208+
def __run_continuation(self, continuation: Continuation):
208209
"""Process a continuation task."""
209-
match continuation:
210-
case SendContinuation():
211-
method = continuation.send
212-
case ThrowContinuation():
213-
method = continuation.throw
210+
method = continuation.resume
214211

215212
try:
216213
suspension = method()

0 commit comments

Comments
 (0)