Skip to content

Commit d3d1a0b

Browse files
authored
[HZ-5404] Set for Asyncio (#789)
Straightforward port of Set, and its tests to asyncio
1 parent 94e769f commit d3d1a0b

4 files changed

Lines changed: 475 additions & 0 deletions

File tree

hazelcast/internal/asyncio_client.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@
2828
ProxyManager,
2929
QUEUE_SERVICE,
3030
REPLICATED_MAP_SERVICE,
31+
SET_SERVICE,
3132
VECTOR_SERVICE,
3233
)
3334
from hazelcast.internal.asyncio_proxy.list import List
3435
from hazelcast.internal.asyncio_proxy.map import Map
3536
from hazelcast.internal.asyncio_proxy.multi_map import MultiMap
3637
from hazelcast.internal.asyncio_proxy.queue import Queue
3738
from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap
39+
from hazelcast.internal.asyncio_proxy.set import Set
3840
from hazelcast.internal.asyncio_reactor import AsyncioReactor
3941
from hazelcast.serialization import SerializationServiceV1
4042
from hazelcast.internal.asyncio_statistics import Statistics
@@ -298,6 +300,17 @@ async def get_queue(self, name: str) -> Queue[KeyType]:
298300
"""
299301
return await self._proxy_manager.get_or_create(QUEUE_SERVICE, name)
300302

303+
async def get_set(self, name: str) -> Set[KeyType]:
304+
"""Returns the distributed set instance with the specified name.
305+
306+
Args:
307+
name: Name of the distributed set.
308+
309+
Returns:
310+
Distributed set instance with the specified name.
311+
"""
312+
return await self._proxy_manager.get_or_create(SET_SERVICE, name)
313+
301314
async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueType]:
302315
"""Returns the distributed ReplicatedMap instance with the specified
303316
name.

hazelcast/internal/asyncio_proxy/manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from hazelcast.internal.asyncio_proxy.list import create_list_proxy
55
from hazelcast.internal.asyncio_proxy.multi_map import create_multi_map_proxy
66
from hazelcast.internal.asyncio_proxy.queue import create_queue_proxy
7+
from hazelcast.internal.asyncio_proxy.set import create_set_proxy
78
from hazelcast.internal.asyncio_proxy.vector_collection import (
89
VectorCollection,
910
create_vector_collection_proxy,
@@ -20,6 +21,7 @@
2021
MULTI_MAP_SERVICE = "hz:impl:multiMapService"
2122
QUEUE_SERVICE = "hz:impl:queueService"
2223
REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService"
24+
SET_SERVICE = "hz:impl:setService"
2325
VECTOR_SERVICE = "hz:service:vector"
2426

2527
_proxy_init: typing.Dict[
@@ -31,6 +33,7 @@
3133
MULTI_MAP_SERVICE: create_multi_map_proxy,
3234
QUEUE_SERVICE: create_queue_proxy,
3335
REPLICATED_MAP_SERVICE: create_replicated_map_proxy,
36+
SET_SERVICE: create_set_proxy,
3437
VECTOR_SERVICE: create_vector_collection_proxy,
3538
}
3639

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
import typing
2+
3+
from hazelcast.protocol.codec import (
4+
set_add_all_codec,
5+
set_add_codec,
6+
set_add_listener_codec,
7+
set_clear_codec,
8+
set_compare_and_remove_all_codec,
9+
set_compare_and_retain_all_codec,
10+
set_contains_all_codec,
11+
set_contains_codec,
12+
set_get_all_codec,
13+
set_is_empty_codec,
14+
set_remove_codec,
15+
set_remove_listener_codec,
16+
set_size_codec,
17+
)
18+
from hazelcast.internal.asyncio_proxy.base import (
19+
PartitionSpecificProxy,
20+
ItemEvent,
21+
ItemEventType,
22+
)
23+
from hazelcast.types import ItemType
24+
from hazelcast.serialization.compact import SchemaNotReplicatedError
25+
from hazelcast.util import check_not_none, deserialize_list_in_place
26+
27+
28+
class Set(PartitionSpecificProxy, typing.Generic[ItemType]):
29+
"""Concurrent, distributed implementation of Set.
30+
31+
Example:
32+
>>> my_set = await client.get_set("my_set")
33+
>>> print("set.add", await my_set.add("item"))
34+
>>> print("set.size", await my_set.size())
35+
36+
Warning:
37+
Asyncio client set proxy is not thread-safe, do not access it from other threads.
38+
"""
39+
40+
async def add(self, item: ItemType) -> bool:
41+
"""Adds the specified item if it is not exists in this set.
42+
43+
Args:
44+
item: The specified item to be added.
45+
46+
Returns:
47+
``True`` if this set is changed after call, ``False`` otherwise.
48+
"""
49+
check_not_none(item, "Value can't be None")
50+
try:
51+
element_data = self._to_data(item)
52+
except SchemaNotReplicatedError as e:
53+
return await self._send_schema_and_retry(e, self.add, item)
54+
55+
request = set_add_codec.encode_request(self.name, element_data)
56+
return await self._invoke(request, set_add_codec.decode_response)
57+
58+
async def add_all(self, items: typing.Sequence[ItemType]) -> bool:
59+
"""Adds the elements in the specified collection if they're not exist
60+
in this set.
61+
62+
Args:
63+
items: Collection which includes the items to be added.
64+
65+
Returns:
66+
``True`` if this set is changed after call, ``False`` otherwise.
67+
"""
68+
check_not_none(items, "Value can't be None")
69+
try:
70+
data_items = []
71+
for item in items:
72+
check_not_none(item, "Value can't be None")
73+
data_items.append(self._to_data(item))
74+
except SchemaNotReplicatedError as e:
75+
return await self._send_schema_and_retry(e, self.add_all, items)
76+
77+
request = set_add_all_codec.encode_request(self.name, data_items)
78+
return await self._invoke(request, set_add_all_codec.decode_response)
79+
80+
async def add_listener(
81+
self,
82+
include_value: bool = False,
83+
item_added_func: typing.Callable[[ItemEvent[ItemType]], None] = None,
84+
item_removed_func: typing.Callable[[ItemEvent[ItemType]], None] = None,
85+
) -> str:
86+
"""Adds an item listener for this container.
87+
88+
Listener will be notified for all container add/remove events.
89+
90+
Args:
91+
include_value: Whether received events include the updated item or
92+
not.
93+
item_added_func: Function to be called when an item is added to
94+
this set.
95+
item_removed_func: Function to be called when an item is deleted
96+
from this set.
97+
98+
Returns:
99+
A registration id which is used as a key to remove the listener.
100+
"""
101+
request = set_add_listener_codec.encode_request(self.name, include_value, self._is_smart)
102+
103+
def handle_event_item(item_data, uuid, event_type):
104+
item = self._to_object(item_data) if include_value else None
105+
member = self._context.cluster_service.get_member(uuid)
106+
107+
item_event = ItemEvent(self.name, item, event_type, member)
108+
if event_type == ItemEventType.ADDED:
109+
if item_added_func:
110+
item_added_func(item_event)
111+
else:
112+
if item_removed_func:
113+
item_removed_func(item_event)
114+
115+
return await self._register_listener(
116+
request,
117+
lambda r: set_add_listener_codec.decode_response(r),
118+
lambda reg_id: set_remove_listener_codec.encode_request(self.name, reg_id),
119+
lambda m: set_add_listener_codec.handle(m, handle_event_item),
120+
)
121+
122+
async def clear(self) -> None:
123+
"""Clears the set. Set will be empty with this call."""
124+
request = set_clear_codec.encode_request(self.name)
125+
return await self._invoke(request)
126+
127+
async def contains(self, item: ItemType) -> bool:
128+
"""Determines whether this set contains the specified item or not.
129+
130+
Args:
131+
item: The specified item to be searched.
132+
133+
Returns:
134+
``True`` if the specified item exists in this set, ``False``
135+
otherwise.
136+
"""
137+
check_not_none(item, "Value can't be None")
138+
try:
139+
item_data = self._to_data(item)
140+
except SchemaNotReplicatedError as e:
141+
return await self._send_schema_and_retry(e, self.contains, item)
142+
143+
request = set_contains_codec.encode_request(self.name, item_data)
144+
return await self._invoke(request, set_contains_codec.decode_response)
145+
146+
async def contains_all(self, items: typing.Sequence[ItemType]) -> bool:
147+
"""Determines whether this set contains all items in the specified
148+
collection or not.
149+
150+
Args:
151+
items: The specified collection which includes the items to be
152+
searched.
153+
154+
Returns:
155+
``True`` if all the items in the specified collection exist in
156+
this set, ``False`` otherwise.
157+
"""
158+
check_not_none(items, "Value can't be None")
159+
try:
160+
data_items = []
161+
for item in items:
162+
check_not_none(item, "Value can't be None")
163+
data_items.append(self._to_data(item))
164+
except SchemaNotReplicatedError as e:
165+
return await self._send_schema_and_retry(e, self.contains_all, items)
166+
167+
request = set_contains_all_codec.encode_request(self.name, data_items)
168+
return await self._invoke(request, set_contains_all_codec.decode_response)
169+
170+
async def get_all(self) -> typing.List[ItemType]:
171+
"""Returns all the items in the set.
172+
173+
Returns:
174+
List of the items in this set.
175+
"""
176+
177+
def handler(message):
178+
data_list = set_get_all_codec.decode_response(message)
179+
return deserialize_list_in_place(data_list, self._to_object)
180+
181+
request = set_get_all_codec.encode_request(self.name)
182+
return await self._invoke(request, handler)
183+
184+
async def is_empty(self) -> bool:
185+
"""Determines whether this set is empty or not.
186+
187+
Returns:
188+
``True`` if this set is empty, ``False`` otherwise.
189+
"""
190+
request = set_is_empty_codec.encode_request(self.name)
191+
return await self._invoke(request, set_is_empty_codec.decode_response)
192+
193+
async def remove(self, item: ItemType) -> bool:
194+
"""Removes the specified element from the set if it exists.
195+
196+
Args:
197+
item: The specified element to be removed.
198+
199+
Returns:
200+
``True`` if the specified element exists in this set, ``False``
201+
otherwise.
202+
"""
203+
check_not_none(item, "Value can't be None")
204+
try:
205+
item_data = self._to_data(item)
206+
except SchemaNotReplicatedError as e:
207+
return await self._send_schema_and_retry(e, self.remove, item)
208+
209+
request = set_remove_codec.encode_request(self.name, item_data)
210+
return await self._invoke(request, set_remove_codec.decode_response)
211+
212+
async def remove_all(self, items: typing.Sequence[ItemType]) -> bool:
213+
"""Removes all of the elements of the specified collection from this
214+
set.
215+
216+
Args:
217+
items: The specified collection.
218+
219+
Returns:
220+
``True`` if the call changed this set, ``False`` otherwise.
221+
"""
222+
check_not_none(items, "Value can't be None")
223+
try:
224+
data_items = []
225+
for item in items:
226+
check_not_none(item, "Value can't be None")
227+
data_items.append(self._to_data(item))
228+
except SchemaNotReplicatedError as e:
229+
return await self._send_schema_and_retry(e, self.remove_all, items)
230+
231+
request = set_compare_and_remove_all_codec.encode_request(self.name, data_items)
232+
return await self._invoke(request, set_compare_and_remove_all_codec.decode_response)
233+
234+
async def remove_listener(self, registration_id: str) -> bool:
235+
"""Removes the specified item listener.
236+
237+
Returns silently if the specified listener was not added before.
238+
239+
Args:
240+
registration_id: Id of the listener to be deleted.
241+
242+
Returns:
243+
``True`` if the item listener is removed, ``False`` otherwise.
244+
"""
245+
return await self._deregister_listener(registration_id)
246+
247+
async def retain_all(self, items: typing.Sequence[ItemType]) -> bool:
248+
"""Removes the items which are not contained in the specified
249+
collection.
250+
251+
In other words, only the items that are contained in the specified
252+
collection will be retained.
253+
254+
Args:
255+
items: Collection which includes the elements to be retained in
256+
this set.
257+
258+
Returns:
259+
``True`` if this set changed as a result of the call, ``False``
260+
otherwise.
261+
"""
262+
check_not_none(items, "Value can't be None")
263+
try:
264+
data_items = []
265+
for item in items:
266+
check_not_none(item, "Value can't be None")
267+
data_items.append(self._to_data(item))
268+
except SchemaNotReplicatedError as e:
269+
return await self._send_schema_and_retry(e, self.retain_all, items)
270+
271+
request = set_compare_and_retain_all_codec.encode_request(self.name, data_items)
272+
return await self._invoke(request, set_compare_and_retain_all_codec.decode_response)
273+
274+
async def size(self) -> int:
275+
"""Returns the number of items in this set.
276+
277+
Returns:
278+
Number of items in this set.
279+
"""
280+
request = set_size_codec.encode_request(self.name)
281+
return await self._invoke(request, set_size_codec.decode_response)
282+
283+
284+
async def create_set_proxy(service_name, name, context):
285+
return Set(service_name, name, context)

0 commit comments

Comments
 (0)