Skip to content

Commit 19c9f14

Browse files
author
unknown
committed
separate sub importer, notify partial finished per subscription
1 parent f5683d2 commit 19c9f14

2 files changed

Lines changed: 77 additions & 26 deletions

File tree

src/galaxy/api/plugin.py

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
)
1616
from galaxy.task_manager import TaskManager
1717

18-
1918
logger = logging.getLogger(__name__)
2019

2120

@@ -34,16 +33,15 @@ def dict_factory(elements):
3433

3534
class Importer:
3635
def __init__(
37-
self,
38-
task_manger,
39-
name,
40-
get,
41-
prepare_context,
42-
notification_success,
43-
notification_failure,
44-
notification_finished,
45-
complete,
46-
is_generator=False
36+
self,
37+
task_manger,
38+
name,
39+
get,
40+
prepare_context,
41+
notification_success,
42+
notification_failure,
43+
notification_finished,
44+
complete,
4745
):
4846
self._task_manager = task_manger
4947
self._name = name
@@ -55,16 +53,11 @@ def __init__(
5553
self._complete = complete
5654

5755
self._import_in_progress = False
58-
self._is_generator = is_generator
5956

6057
async def _import_element(self, id_, context_):
6158
try:
62-
if self._is_generator:
63-
async for element in self._get(id_, context_):
64-
self._notification_success(id_, element)
65-
else:
66-
element = await self._get(id_, context_)
67-
self._notification_success(id_, element)
59+
element = await self._get(id_, context_)
60+
self._notification_success(id_, element)
6861
except ApplicationError as error:
6962
self._notification_failure(id_, error)
7063
except asyncio.CancelledError:
@@ -101,6 +94,44 @@ async def start(self, ids):
10194
raise
10295

10396

97+
class SubscriptionGamesImporter(Importer):
98+
def __init__(
99+
self,
100+
task_manger,
101+
name,
102+
get,
103+
prepare_context,
104+
notification_success,
105+
notification_failure,
106+
notification_finished,
107+
notification_partial_finished,
108+
complete
109+
):
110+
super(SubscriptionGamesImporter, self).__init__(task_manger,
111+
name,
112+
get,
113+
prepare_context,
114+
notification_success,
115+
notification_failure,
116+
notification_finished,
117+
complete)
118+
self._notification_partial_finished = notification_partial_finished
119+
120+
async def _import_element(self, id_, context_):
121+
try:
122+
async for element in self._get(id_, context_):
123+
self._notification_success(id_, element)
124+
except ApplicationError as error:
125+
self._notification_failure(id_, error)
126+
except asyncio.CancelledError:
127+
pass
128+
except Exception:
129+
logger.exception("Unexpected exception raised in %s importer", self._name)
130+
self._notification_failure(id_, UnknownError())
131+
finally:
132+
self._notification_partial_finished()
133+
134+
104135
class Plugin:
105136
"""Use and override methods of this class to create a new platform integration."""
106137

@@ -183,16 +214,16 @@ def __init__(self, platform, version, reader, writer, handshake_token):
183214
self._local_size_import_finished,
184215
self.local_size_import_complete
185216
)
186-
self._subscription_games_importer = Importer(
217+
self._subscription_games_importer = SubscriptionGamesImporter(
187218
self._external_task_manager,
188219
"subscription games",
189220
self.get_subscription_games,
190221
self.prepare_subscription_games_context,
191222
self._subscription_games_import_success,
192223
self._subscription_games_import_failure,
193224
self._subscription_games_import_finished,
194-
self.subscription_games_import_complete,
195-
is_generator=True
225+
self._subscriptions_games_partial_import_finished,
226+
self.subscription_games_import_complete
196227
)
197228

198229
# internal
@@ -297,7 +328,8 @@ def _detect_feature(self, feature: Feature, methods: List[str]):
297328
if self._implements(methods):
298329
self._features.add(feature)
299330

300-
def _register_method(self, name, handler, result_name=None, internal=False, immediate=False, sensitive_params=False):
331+
def _register_method(self, name, handler, result_name=None, internal=False, immediate=False,
332+
sensitive_params=False):
301333
def wrap_result(result):
302334
if result_name:
303335
result = {
@@ -671,7 +703,8 @@ def _local_size_import_failure(self, game_id: str, error: ApplicationError) -> N
671703
def _local_size_import_finished(self) -> None:
672704
self._connection.send_notification("local_size_import_finished", None)
673705

674-
def _subscription_games_import_success(self, subscription_name: str, subscription_games: Optional[List[SubscriptionGame]]) -> None:
706+
def _subscription_games_import_success(self, subscription_name: str,
707+
subscription_games: Optional[List[SubscriptionGame]]) -> None:
675708
self._connection.send_notification(
676709
"subscription_games_import_success",
677710
{
@@ -689,6 +722,9 @@ def _subscription_games_import_failure(self, subscription_name: str, error: Appl
689722
}
690723
)
691724

725+
def _subscriptions_games_partial_import_finished(self) -> None:
726+
self._connection.send_notification("subscription_games_partial_import_finished", None)
727+
692728
def _subscription_games_import_finished(self) -> None:
693729
self._connection.send_notification("subscription_games_import_finished", None)
694730

@@ -773,7 +809,7 @@ async def authenticate(self, stored_credentials=None):
773809
raise NotImplementedError()
774810

775811
async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \
776-
-> Union[NextStep, Authentication]:
812+
-> Union[NextStep, Authentication]:
777813
"""This method is called if we return :class:`~galaxy.api.types.NextStep` from :meth:`.authenticate`
778814
or :meth:`.pass_login_credentials`.
779815
This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on.
@@ -1117,7 +1153,8 @@ async def prepare_subscription_games_context(self, subscription_names: List[str]
11171153
"""
11181154
return None
11191155

1120-
async def get_subscription_games(self, subscription_name: str, context: Any) -> AsyncGenerator[List[SubscriptionGame], None]:
1156+
async def get_subscription_games(self, subscription_name: str, context: Any) -> AsyncGenerator[
1157+
List[SubscriptionGame], None]:
11211158
"""Override this method to provide SubscriptionGames for a given subscription.
11221159
This method should `yield` a list of SubscriptionGames -> yield [sub_games]
11231160
@@ -1193,7 +1230,6 @@ async def coroutine():
11931230
writer.close()
11941231
await writer.wait_closed()
11951232

1196-
11971233
try:
11981234
if sys.platform == "win32":
11991235
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

tests/test_subscriptions.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ async def sub_games():
140140
]
141141
}
142142
},
143+
{
144+
'jsonrpc': '2.0',
145+
'method':
146+
'subscription_games_partial_import_finished', 'params': None
147+
},
143148
{
144149
"jsonrpc": "2.0",
145150
"method": "subscription_games_import_finished",
@@ -183,6 +188,11 @@ async def sub_games():
183188
"subscription_games": None
184189
}
185190
},
191+
{
192+
'jsonrpc': '2.0',
193+
'method':
194+
'subscription_games_partial_import_finished', 'params': None
195+
},
186196
{
187197
"jsonrpc": "2.0",
188198
"method": "subscription_games_import_finished",
@@ -229,6 +239,11 @@ async def test_get_subscription_games_error(exception, code, message, plugin, re
229239
}
230240
}
231241
},
242+
{
243+
'jsonrpc': '2.0',
244+
'method':
245+
'subscription_games_partial_import_finished', 'params': None
246+
},
232247
{
233248
"jsonrpc": "2.0",
234249
"method": "subscription_games_import_finished",

0 commit comments

Comments
 (0)