Skip to content

Commit 536eb98

Browse files
committed
refactor: update TaskManager to implement CommandExecutionContext
1 parent ae772ea commit 536eb98

1 file changed

Lines changed: 25 additions & 12 deletions

File tree

openwpm/task_manager.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
from .browser_manager import BrowserManagerHandle
2222
from .command_sequence import CommandSequence
2323
from .errors import CommandExecutionError
24+
from .failure_tracker import FailureTracker
2425
from .js_instrumentation import clean_js_instrumentation_settings
2526
from .mp_logger import MPLogger
2627
from .storage.storage_controller import DataSocket, StorageControllerHandle
2728
from .storage.storage_providers import (
2829
StructuredStorageProvider,
30+
TableName,
2931
UnstructuredStorageProvider,
3032
)
33+
from .types import VisitId
3134
from .utilities.multiprocess_utils import kill_process_and_children
3235
from .utilities.platform_utils import get_configuration_string, get_version
3336
from .utilities.storage_watchdog import StorageLogger
@@ -101,11 +104,7 @@ def __init__(
101104

102105
# Flow control
103106
self.closing = False
104-
self.failure_status: Optional[Dict[str, Any]] = None
105-
self.threadlock = threading.Lock()
106-
self.failure_count = 0
107-
108-
self.failure_limit = manager_params.failure_limit
107+
self.failure_tracker = FailureTracker(manager_params.failure_limit)
109108
# Start logging server thread
110109
self.logging_server = MPLogger(
111110
self.manager_params.log_path,
@@ -331,6 +330,18 @@ def _shutdown_manager(
331330
if hasattr(self, "callback_thread"):
332331
self.callback_thread.join()
333332

333+
# CommandExecutionContext protocol implementation
334+
335+
def store_record(
336+
self, table: TableName, visit_id: VisitId, data: Dict[str, Any]
337+
) -> None:
338+
"""Send a record to the StorageController via DataSocket."""
339+
self.sock.store_record(table, visit_id, data)
340+
341+
def finalize_visit_id(self, visit_id: VisitId, success: bool) -> None:
342+
"""Signal that all data for a visit_id has been sent."""
343+
self.sock.finalize_visit_id(visit_id, success)
344+
334345
def _check_failure_status(self) -> None:
335346
"""Check the status of command failures. Raise exceptions as necessary
336347
@@ -340,25 +351,27 @@ def _check_failure_status(self) -> None:
340351
appropriate steps are taken to gracefully close the infrastructure
341352
"""
342353
self.logger.debug("Checking command failure status indicator...")
343-
if not self.failure_status:
354+
if not self.failure_tracker.has_critical_failure:
344355
return
345356

357+
failure_status = self.failure_tracker.critical_failure
358+
assert failure_status is not None
346359
self.logger.debug("TaskManager failure status set, halting command execution.")
347360
self._shutdown_manager()
348-
if self.failure_status["ErrorType"] == "ExceedCommandFailureLimit":
361+
if failure_status["ErrorType"] == "ExceedCommandFailureLimit":
349362
raise CommandExecutionError(
350363
"TaskManager exceeded maximum consecutive command "
351364
"execution failures.",
352-
self.failure_status["CommandSequence"],
365+
failure_status["CommandSequence"],
353366
)
354-
elif self.failure_status["ErrorType"] == "ExceedLaunchFailureLimit":
367+
elif failure_status["ErrorType"] == "ExceedLaunchFailureLimit":
355368
raise CommandExecutionError(
356369
"TaskManager failed to launch browser within allowable "
357370
"failure limit.",
358-
self.failure_status["CommandSequence"],
371+
failure_status["CommandSequence"],
359372
)
360-
if self.failure_status["ErrorType"] == "CriticalChildException":
361-
_, exc, tb = pickle.loads(self.failure_status["Exception"])
373+
if failure_status["ErrorType"] == "CriticalChildException":
374+
_, exc, tb = pickle.loads(failure_status["Exception"])
362375
raise exc.with_traceback(tb)
363376

364377
# CRAWLER COMMAND CODE

0 commit comments

Comments
 (0)