From 3a0aa9d04feed96e8e5414d7f012299750d11a70 Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 18 May 2026 09:26:49 -0600 Subject: [PATCH 1/9] fix: Misc cleanup, override db vars from env Some long overdue cleanup: - Allow env vars to override db settings to reduce likelihood of sensitive values getting committeed to source control - Make abstract classes ABC - Initialize course config mutables in init - Fix test warning about POST being async mocked --- default_config.yaml | 8 ++++++- xapi_db_load/backends/base_async_backend.py | 23 ++++++-------------- xapi_db_load/course_configs.py | 24 +++++++++++---------- xapi_db_load/main.py | 19 +++++++++++++++- xapi_db_load/tests/test_backends.py | 3 +-- xapi_db_load/xapi/xapi_common.py | 22 +++++++++++-------- 6 files changed, 59 insertions(+), 40 deletions(-) diff --git a/default_config.yaml b/default_config.yaml index ac3ec72..6988c95 100644 --- a/default_config.yaml +++ b/default_config.yaml @@ -1,4 +1,10 @@ -# This default configuration should generally work as long as the logs director is writable. +# This default configuration should generally work as long as the logs directory is writable. +# +# SECRETS: Do not commit credentials to source control. Sensitive config values can be +# overridden via environment variables (env vars take precedence over values in this file): +# XAPI_DB_LOAD_CLICKHOUSE_PASSWORD -> db_password +# XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY -> s3_secret +# XAPI_DB_LOAD_RALPH_PASSWORD -> ralph_password # CSV backend configuration # ######################### diff --git a/xapi_db_load/backends/base_async_backend.py b/xapi_db_load/backends/base_async_backend.py index 1b35412..d75bd29 100644 --- a/xapi_db_load/backends/base_async_backend.py +++ b/xapi_db_load/backends/base_async_backend.py @@ -3,6 +3,7 @@ """ import asyncio +from abc import abstractmethod from logging import Logger from typing import Dict, List @@ -33,11 +34,9 @@ def __init__( def __repr__(self) -> str: return f"{self.__class__.__name__}: {self.config.get('db_host', 'No ClickHouse configured')}" + @abstractmethod def get_test_data_tasks(self) -> List[Waiter]: - """ - Return the tasks to be run to generate test data. - """ - raise NotImplementedError("get_test_data_tasks not implemented") + """Return the tasks to be run to generate test data.""" def get_backend_summary(self) -> Dict: """ @@ -162,23 +161,15 @@ def __init__(self, config: Dict, logger: Logger, event_generator: EventGenerator # queue until there is more room. self.queue = asyncio.Queue(maxsize=20) + @abstractmethod async def _populate_queue(self): - """ - Subclasses override this to enque work for their particular data task. - """ - raise NotImplementedError( - f"_get_batch_data not implemented for {self.task_name}" - ) + """Subclasses override this to enqueue work for their particular data task.""" + @abstractmethod async def _process_queue_item( self, worker_id: int, batch_id: int, batch: List | int ): - """ - Subclasses override this to create workers that process their data for a batch. - """ - raise NotImplementedError( - f"_process_queue_item not implemented for {self.task_name}" - ) + """Subclasses override this to process a single batch item from the queue.""" async def _batch_worker(self, worker_id): """ diff --git a/xapi_db_load/course_configs.py b/xapi_db_load/course_configs.py index ff81528..bb29a65 100644 --- a/xapi_db_load/course_configs.py +++ b/xapi_db_load/course_configs.py @@ -64,17 +64,19 @@ class RandomCourse: Holds "known objects" and configuration values for a fake course. """ - items_in_course = 0 - chapter_ids: List[str] = [] - sequential_ids: List[str] = [] - vertical_ids: List[str] = [] - problem_ids: List[str] = [] - video_ids: List[str] = [] - forum_post_ids: List[str] = [] - actors: list[EnrolledActor] = [] - all_tags: list = [] - start_date: datetime.datetime | None = None - end_date: datetime.datetime | None = None + def __init__(self): + """Initialize per-instance mutable state.""" + self.items_in_course: int = 0 + self.chapter_ids: List[str] = [] + self.sequential_ids: List[str] = [] + self.vertical_ids: List[str] = [] + self.problem_ids: List[str] = [] + self.video_ids: List[str] = [] + self.forum_post_ids: List[str] = [] + self.actors: list[EnrolledActor] = [] + self.all_tags: list = [] + self.start_date: datetime.datetime | None = None + self.end_date: datetime.datetime | None = None async def populate( self, diff --git a/xapi_db_load/main.py b/xapi_db_load/main.py index da7a1b8..56fe8a9 100644 --- a/xapi_db_load/main.py +++ b/xapi_db_load/main.py @@ -13,16 +13,33 @@ from xapi_db_load.ui.text_ui import TextUI +_ENV_VAR_OVERRIDES = { + "XAPI_DB_LOAD_CLICKHOUSE_PASSWORD": "db_password", + "XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY": "s3_secret", + "XAPI_DB_LOAD_RALPH_PASSWORD": "ralph_password", +} + + def get_config(config_file: str) -> dict: """ - Wrap around config loading. + Load YAML config and apply environment variable overrides for secrets. We override this in tests so that we can use temp dirs for logs etc. + Environment variables take precedence over values in the config file: + XAPI_DB_LOAD_CLICKHOUSE_PASSWORD -> db_password + XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY -> s3_secret + XAPI_DB_LOAD_RALPH_PASSWORD -> ralph_password """ with open(config_file, "r") as y: conf = yaml.safe_load(y) conf["config_file"] = config_file + + for env_var, config_key in _ENV_VAR_OVERRIDES.items(): + value = os.environ.get(env_var) + if value is not None: + conf[config_key] = value + return conf diff --git a/xapi_db_load/tests/test_backends.py b/xapi_db_load/tests/test_backends.py index abbee14..9911b2b 100644 --- a/xapi_db_load/tests/test_backends.py +++ b/xapi_db_load/tests/test_backends.py @@ -182,7 +182,7 @@ def test_vector_backend(mock_get_logger, _, tmp_path): assert "Run duration was" in result.output -@patch("xapi_db_load.backends.ralph.requests", new_callable=AsyncMock) +@patch("xapi_db_load.backends.ralph.requests", new_callable=MagicMock) @patch( "xapi_db_load.backends.base_async_backend.clickhouse_connect", new_callable=AsyncMock, @@ -192,7 +192,6 @@ def test_ralph_backend(mock_requests, _, tmp_path): Run a test through the Ralph backend, currently this just checks that the output indicates success. """ - mock_requests.post = MagicMock() test_path = "xapi_db_load/tests/fixtures/small_ralph_config.yaml" runner = CliRunner() diff --git a/xapi_db_load/xapi/xapi_common.py b/xapi_db_load/xapi/xapi_common.py index 2208d7e..b926ef9 100644 --- a/xapi_db_load/xapi/xapi_common.py +++ b/xapi_db_load/xapi/xapi_common.py @@ -2,20 +2,24 @@ Base class for all fake xAPI events. """ +from abc import ABC, abstractmethod -class XAPIBase: + +class XAPIBase(ABC): """ - Base class to handle some common functionality. + Abstract base class for all fake xAPI event types. - Should be turned into a proper ABC when we have a chance. + Subclasses must declare class-level ``verb`` and ``verb_display`` strings + and implement :meth:`get_data`. """ - verb = None - verb_display = None + verb: str + verb_display: str def __init__(self, load_generator): - if not self.verb: - raise NotImplementedError( - f"XAPIBase is abstract, add your verb in subclass {type(self)}." - ) + """Initialize with the parent EventGenerator instance.""" self.parent_load_generator = load_generator + + @abstractmethod + def get_data(self) -> dict: + """Return a dict with event_id, verb, actor_id, emission_time, and event JSON.""" From 9ddff70a63b491940c273ea48ca512e446d345ae Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 18 May 2026 09:45:46 -0600 Subject: [PATCH 2/9] fix: Use native ch paramaterized insert Previously we string concatenated inserts, this is probably faster but definitely tidier. --- xapi_db_load/backends/clickhouse.py | 279 +++++++++++++--------------- 1 file changed, 131 insertions(+), 148 deletions(-) diff --git a/xapi_db_load/backends/clickhouse.py b/xapi_db_load/backends/clickhouse.py index 8d2119f..75deb6e 100644 --- a/xapi_db_load/backends/clickhouse.py +++ b/xapi_db_load/backends/clickhouse.py @@ -47,35 +47,32 @@ class XAPILakeClickhouseAsync(QueueBackend): Abstract implementation for ClickHouse async. """ - async def _insert_list_sql_retry( - self, data_list: List, table: str, database: str = "event_sink" + async def _insert_rows( + self, + table: str, + rows: List[tuple], + column_names: List[str] | str = "*", + database: str = "event_sink", ): - """ - Wrap up inserts that join values lists. - """ - sql = f""" - INSERT INTO {database}.{table} - VALUES {",".join(data_list)} - """ - - await self._insert_sql_with_retry(sql) - - async def _insert_sql_with_retry(self, sql: str): - """ - Wrap insert commands with a single retry. - """ + """Insert rows via clickhouse-connect's parameterized insert.""" try: if not self.client: await self.set_client() assert self.client - await self.client.command(sql) - except (OperationalError, DatabaseError) as e: + await self.client.insert( + table, rows, column_names=column_names, database=database + ) + except OperationalError as e: self.logger.error(e) - self.logger.error(sql) - self.logger.error("ClickHouse Error, retrying once.") + self.logger.error("ClickHouse OperationalError, retrying once.") await self.set_client() assert self.client - await self.client.command(sql) + await self.client.insert( + table, rows, column_names=column_names, database=database + ) + except DatabaseError as e: + self.logger.error("ClickHouse DatabaseError: %s", e) + raise class InsertXAPIEvents(XAPILakeClickhouseAsync): @@ -125,28 +122,24 @@ async def _process_queue_item(self, worker_id: int, batch_id: int, batch: Any): self.logger.debug(f" {self.task_name} worker batch {batch} inserted") self.update_completed_task_count(increment_by=1) - async def _do_insert(self, out_data: List): + async def _do_insert(self, out_data: List[tuple]): """ Performs the actual insert of a batch of events to ClickHouse. """ - vals = ",".join(out_data) - sql = f""" - INSERT INTO xapi.xapi_events_all ( - event_id, - emission_time, - event - ) - VALUES {vals} - """ - await self._insert_sql_with_retry(sql) + await self._insert_rows( + "xapi_events_all", + out_data, + column_names=["event_id", "emission_time", "event"], + database="xapi", + ) - def _format_row(self, row): + def _format_row(self, row: dict) -> tuple: """ Format a row of data for ClickHouse insert. - This is broken out so it can be overridden in the Ralph bakend. + This is broken out so it can be overridden in the Ralph backend. """ - return f"('{row['event_id']}', '{row['emission_time']}', '{row['event']}')" + return (row["event_id"], row["emission_time"], row["event"]) class InsertInitialEnrollments(InsertXAPIEvents): @@ -247,25 +240,25 @@ async def _run_task(self): c = course.serialize_course_data_for_event_sink() dump_id = str(uuid.uuid4()) dump_time = datetime.now(UTC) + out_data.append( + ( + c["org"], + c["course_key"], + c["display_name"], + c["course_start"], + c["course_end"], + c["enrollment_start"], + c["enrollment_end"], + c["self_paced"], + c["course_data_json"], + c["created"], + c["modified"], + dump_id, + dump_time, + ) + ) - out = f"""( - '{c["org"]}', - '{c["course_key"]}', - '{c["display_name"]}', - '{c["course_start"]}', - '{c["course_end"]}', - '{c["enrollment_start"]}', - '{c["enrollment_end"]}', - '{c["self_paced"]}', - '{c["course_data_json"]}', - '{c["created"]}', - '{c["modified"]}', - '{dump_id}', - '{dump_time}' - )""" - out_data.append(out) - - await self._insert_list_sql_retry(out_data, "course_overviews") + await self._insert_rows("course_overviews", out_data) self.update_completed_task_count(increment_by=1) @@ -291,27 +284,24 @@ async def _run_task(self): dump_id = str(uuid.uuid4()) dump_time = datetime.now(UTC) for b in blocks: - try: - out = f"""( - '{b["org"]}', - '{b["course_key"]}', - '{b["location"]}', - '{b["display_name"]}', - '{b["xblock_data_json"]}', - '{b["order"]}', - '{b["edited_on"]}', - '{dump_id}', - '{dump_time}' - )""" - out_data.append(out) - except Exception: - self.logger.info(b) - raise + out_data.append( + ( + b["org"], + b["course_key"], + b["location"], + b["display_name"], + b["xblock_data_json"], + b["order"], + b["edited_on"], + dump_id, + dump_time, + ) + ) self.logger.debug( f" {self.task_name} starting insert for course {course_count}" ) - await self._insert_list_sql_retry(out_data, "course_blocks") + await self._insert_rows("course_blocks", out_data) self.update_completed_task_count(increment_by=1) @@ -338,22 +328,21 @@ async def _run_task(self): row_id = 0 for obj_tag in object_tags: row_id += 1 + obj_tag_out_data.append( + ( + row_id, + obj_tag["object_id"], + obj_tag["taxonomy_id"], + obj_tag["tag_id"], + obj_tag["value"], + "fake export id", + obj_tag["hierarchy"], + dump_id, + dump_time, + ) + ) - out_tag = f"""( - {row_id}, - '{obj_tag["object_id"]}', - {obj_tag["taxonomy_id"]}, - {obj_tag["tag_id"]}, - '{obj_tag["value"]}', - 'fake export id', - '{obj_tag["hierarchy"]}', - '{dump_id}', - '{dump_time}' - )""" - - obj_tag_out_data.append(out_tag) - - await self._insert_list_sql_retry(obj_tag_out_data, "object_tag") + await self._insert_rows("object_tag", obj_tag_out_data) self.update_completed_task_count(increment_by=1) @@ -374,17 +363,10 @@ async def _run_task(self): id = 0 for taxonomy in taxonomies.keys(): id += 1 - out = f"""( - {id}, - '{taxonomy}', - '{dump_id}', - '{dump_time}' - ) - """ - out_data.append(out) + out_data.append((id, taxonomy, dump_id, dump_time)) self.update_completed_task_count(increment_by=1) - await self._insert_list_sql_retry(out_data, "taxonomy") + await self._insert_rows("taxonomy", out_data) class InsertTags(XAPILakeClickhouseAsync): @@ -404,20 +386,20 @@ async def _run_task(self): tag_out_data = [] for tag in tags: - out_tag = f"""( - {tag["tag_id"]}, - {tag["taxonomy_id"]}, - {tag["parent_int_id"] or 0}, - '{tag["value"]}', - '{tag["id"]}', - '{tag["hierarchy"]}', - '{dump_id}', - '{dump_time}' - )""" - - tag_out_data.append(out_tag) - - await self._insert_list_sql_retry(tag_out_data, "tag") + tag_out_data.append( + ( + tag["tag_id"], + tag["taxonomy_id"], + tag["parent_int_id"] or 0, + tag["value"], + tag["id"], + tag["hierarchy"], + dump_id, + dump_time, + ) + ) + + await self._insert_rows("tag", tag_out_data) self.update_completed_task_count(increment_by=1) @@ -440,28 +422,29 @@ async def _run_task(self): actor_cnt += 1 dump_id = str(uuid.uuid4()) dump_time = datetime.now(UTC) - id_row = f"""( - '{actor.id}', - 'xapi', - '{actor.username}', - '{actor.user_id}', - '{dump_id}', - '{dump_time}' - )""" - out_external_id.append(id_row) + out_external_id.append( + ( + actor.id, + "xapi", + actor.username, + actor.user_id, + dump_id, + dump_time, + ) + ) if len(out_external_id) == self.config["batch_size"]: if actor_cnt % 100 == 0: self.logger.debug( f" {self.task_name} starting insert for external ids batch {actor_cnt}" ) - await self._insert_list_sql_retry(out_external_id, "external_id") + await self._insert_rows("external_id", out_external_id) self.update_completed_task_count(increment_by=len(out_external_id)) out_external_id = [] # Catch any stragglers from the last batch if len(out_external_id): - await self._insert_list_sql_retry(out_external_id, "external_id") + await self._insert_rows("external_id", out_external_id) self.update_completed_task_count(increment_by=len(out_external_id)) @@ -491,40 +474,40 @@ async def _run_task(self): dump_time = datetime.now(UTC) # This first column is usually the MySQL row pk, we just - # user this for now to have a unique id. - profile_row = f"""( - '{actor.user_id}', - '{actor.user_id}', - '{actor.name}', - '{actor.username}', - '{actor.username}@aspects.invalid', - '{actor.meta}', - '{actor.courseware}', - '{actor.language}', - '{actor.location}', - '{actor.year_of_birth}', - '{actor.gender}', - '{actor.level_of_education}', - '{actor.mailing_address}', - '{actor.city}', - '{actor.country}', - '{actor.state}', - '{actor.goals}', - '{actor.bio}', - '{actor.profile_image_uploaded_at}', - '{actor.phone_number}', - '{dump_id}', - '{dump_time}' - )""" - - out_profile.append(profile_row) + # use this for now to have a unique id. + out_profile.append( + ( + actor.user_id, + actor.user_id, + actor.name, + actor.username, + f"{actor.username}@aspects.invalid", + actor.meta, + actor.courseware, + actor.language, + actor.location, + actor.year_of_birth, + actor.gender, + actor.level_of_education, + actor.mailing_address, + actor.city, + actor.country, + actor.state, + actor.goals, + actor.bio, + actor.profile_image_uploaded_at, + actor.phone_number, + dump_id, + dump_time, + ) + ) if len(out_profile) == self.config["batch_size"]: - await self._insert_list_sql_retry(out_profile, "user_profile") + await self._insert_rows("user_profile", out_profile) self.update_completed_task_count(increment_by=len(out_profile)) out_profile = [] # Catch any stragglers from the last batch if len(out_profile): - await self._insert_list_sql_retry(out_profile, "user_profile") + await self._insert_rows("user_profile", out_profile) self.update_completed_task_count(increment_by=len(out_profile)) From b16f325fa5fd414d3defc72df52449b72aea91c0 Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 18 May 2026 10:16:35 -0600 Subject: [PATCH 3/9] feat: Add mypy, more type hints Claude Opus 4.7 was used to add missing type hints --- Makefile | 1 + requirements/quality.in | 1 + setup.cfg | 10 +++++ xapi_db_load/async_app.py | 4 ++ xapi_db_load/course_configs.py | 7 +++- xapi_db_load/generate_load_async.py | 14 +++---- xapi_db_load/tests/test_backends.py | 8 +++- xapi_db_load/ui/load_ui.py | 5 ++- xapi_db_load/ui/text_ui.py | 2 +- xapi_db_load/waiter.py | 34 +++++++++++------ xapi_db_load/xapi/xapi_common.py | 2 +- xapi_db_load/xapi/xapi_forum.py | 18 ++++++--- xapi_db_load/xapi/xapi_grade.py | 53 ++++++++++++++------------ xapi_db_load/xapi/xapi_hint_answer.py | 16 ++++++-- xapi_db_load/xapi/xapi_navigation.py | 17 ++++++--- xapi_db_load/xapi/xapi_problem.py | 18 ++++++--- xapi_db_load/xapi/xapi_registration.py | 13 +++++-- xapi_db_load/xapi/xapi_video.py | 11 +++++- 18 files changed, 158 insertions(+), 76 deletions(-) diff --git a/Makefile b/Makefile index 5e75685..60f6f2b 100644 --- a/Makefile +++ b/Makefile @@ -52,6 +52,7 @@ quality: ## check coding style with pycodestyle and pylint pylint xapi_db_load *.py pycodestyle xapi_db_load *.py pydocstyle xapi_db_load *.py + mypy xapi_db_load isort --check-only --diff --recursive xapi_db_load *.py test_settings.py python setup.py bdist_wheel twine check dist/* diff --git a/requirements/quality.in b/requirements/quality.in index d477386..c256965 100644 --- a/requirements/quality.in +++ b/requirements/quality.in @@ -5,6 +5,7 @@ edx-lint # edX pylint rules and plugins isort # to standardize order of imports +mypy # Static type checker pycodestyle # PEP 8 compliance validation pydocstyle # PEP 257 compliance validation twine # Utility for publishing Python packages on PyPI. diff --git a/setup.cfg b/setup.cfg index d782599..b2de6ea 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,3 +8,13 @@ skip= [wheel] universal = 1 + +[mypy] +python_version = 3.12 +ignore_missing_imports = True +check_untyped_defs = False +warn_unused_ignores = True +exclude = (^docs/|^build/|^dist/) + +[mypy-xapi_db_load.tests.*] +ignore_errors = True diff --git a/xapi_db_load/async_app.py b/xapi_db_load/async_app.py index d623ef0..08c9e37 100644 --- a/xapi_db_load/async_app.py +++ b/xapi_db_load/async_app.py @@ -5,9 +5,13 @@ import logging import os import sys +from typing import TYPE_CHECKING from xapi_db_load.runner import Runner +if TYPE_CHECKING: + from xapi_db_load.ui.text_ui import TextUI + class App: """ diff --git a/xapi_db_load/course_configs.py b/xapi_db_load/course_configs.py index bb29a65..635c66b 100644 --- a/xapi_db_load/course_configs.py +++ b/xapi_db_load/course_configs.py @@ -127,7 +127,7 @@ def __repr__(self) -> str: {self.course_config} """ - def configure(self): + def configure(self) -> None: """ Set up the fake course configuration such as course length, start and end dates, and size. """ @@ -171,7 +171,9 @@ def configure(self): ): self.items_in_course += self.course_config[config] - def get_random_emission_time(self, actor=None): + def get_random_emission_time( + self, actor: "EnrolledActor | None" = None + ) -> datetime.datetime: """ Randomizes an emission time for events that falls within the course start and end dates. """ @@ -381,6 +383,7 @@ def serialize_block_data_for_event_sink(self) -> List[Dict]: return course_structure def serialize_object_tag_data_for_event_sink(self) -> List[Dict]: + """Return list of object_tag dicts for the event-sink schema.""" object_tags = [] cnt = 0 for block_type in ( diff --git a/xapi_db_load/generate_load_async.py b/xapi_db_load/generate_load_async.py index 6622c4a..65564ba 100644 --- a/xapi_db_load/generate_load_async.py +++ b/xapi_db_load/generate_load_async.py @@ -93,7 +93,7 @@ def __init__( self.end_date = config["end_date"] self._validate_config() - def _validate_config(self): + def _validate_config(self) -> None: """ Make sure the given values make sense. """ @@ -114,7 +114,7 @@ def _validate_config(self): f"Course size {s} wants more actors than are configured in num_actors." ) - async def run_task(self): + async def run_task(self) -> None: """ We override run_task here instead of _run_task because this is the setup task everyone else is waiting for! @@ -133,7 +133,7 @@ async def run_task(self): self.setup_complete = True self.finished = True - async def run_db_load_task(self): + async def run_db_load_task(self) -> None: """ When we are just loading the database with existing data there is nothing to do. """ @@ -141,14 +141,14 @@ async def run_db_load_task(self): self.setup_complete = True self.finished = True - def setup_orgs(self): + def setup_orgs(self) -> None: """ Create some random organizations based on the config. """ for i in range(self.config["num_organizations"]): self.orgs.append(f"Org{i}") - async def setup_courses(self): + async def setup_courses(self) -> None: """ Pre-create a number of courses based on the config. """ @@ -205,7 +205,7 @@ async def setup_courses(self): if curr_num == num_courses: break - def setup_actors(self): + def setup_actors(self) -> None: """ Create all known actors. @@ -234,7 +234,7 @@ def _get_hierarchy(tag_hierarchy, start_parent_id): hierarchy.reverse() return hierarchy - def setup_taxonomies_tags(self): + def setup_taxonomies_tags(self) -> None: """ Load a sample set of tags and format them for use. """ diff --git a/xapi_db_load/tests/test_backends.py b/xapi_db_load/tests/test_backends.py index 9911b2b..499b25c 100644 --- a/xapi_db_load/tests/test_backends.py +++ b/xapi_db_load/tests/test_backends.py @@ -168,8 +168,12 @@ def test_vector_backend(mock_get_logger, _, tmp_path): # Quick test to make sure that what's being stored is at least parseable for s in (test_str_1, test_str_2): try: - statement = re.match(msg_regex, s).groups()[0] - json.loads(statement) + match = re.match(msg_regex, s) + if match is not None and match.groups(): + statement = match.groups()[0] + json.loads(statement) + else: + raise ValueError("No match found") except Exception as e: print(e) print("Exception! Regex testing: ") diff --git a/xapi_db_load/ui/load_ui.py b/xapi_db_load/ui/load_ui.py index 2569088..d7b44a6 100644 --- a/xapi_db_load/ui/load_ui.py +++ b/xapi_db_load/ui/load_ui.py @@ -133,7 +133,8 @@ async def update_status(self): await asyncio.sleep(1) def keypress(self, size, key): - if key == "up": - App.get_shared_instance().ui.main_display.frame.focus_position = "header" + ui = App.get_shared_instance().ui + if key == "up" and ui is not None: + ui.main_display.frame.focus_position = "header" return super(LoadData, self).keypress(size, key) diff --git a/xapi_db_load/ui/text_ui.py b/xapi_db_load/ui/text_ui.py index 34e9823..3e520dd 100644 --- a/xapi_db_load/ui/text_ui.py +++ b/xapi_db_load/ui/text_ui.py @@ -15,7 +15,7 @@ class TextUI: config: dict app: App palette: list - screen: urwid.raw_display.Screen + screen: "urwid.raw_display.Screen" # type: ignore[name-defined] main_display: Main.MainDisplay aio_loop: asyncio.AbstractEventLoop loop: urwid.MainLoop diff --git a/xapi_db_load/waiter.py b/xapi_db_load/waiter.py index 38d178c..01c1917 100644 --- a/xapi_db_load/waiter.py +++ b/xapi_db_load/waiter.py @@ -1,7 +1,10 @@ import asyncio from logging import Logger from threading import Lock -from typing import Dict +from typing import TYPE_CHECKING, Any, Dict + +if TYPE_CHECKING: + from xapi_db_load.generate_load_async import EventGenerator class Waiter: @@ -9,7 +12,14 @@ class Waiter: Base class for all tasks, handles bookeeping and boilerplate task management. """ - def __init__(self, config: Dict, logger: Logger, event_generator: "EventGenerator"): + task_name: str = "" + + def __init__( + self, + config: Dict, + logger: Logger, + event_generator: "EventGenerator", + ) -> None: self.config = config self.event_generator = event_generator self.logger = logger @@ -25,22 +35,22 @@ def __init__(self, config: Dict, logger: Logger, event_generator: "EventGenerato if not self.task_name: self.task_name = f"[Unnamed task] {type(self)}" - def get_complete(self): + def get_complete(self) -> float: return self.complete_pct - def reset(self): + def reset(self) -> None: self.complete_pct = 0.0 self.total_task_count = 0 self.completed_task_count = 0 self.finished = False - async def _run_task(self): + async def _run_task(self) -> Any: raise NotImplementedError("Subclasses must implement this method") - async def _run_db_load_task(self): + async def _run_db_load_task(self) -> Any: raise NotImplementedError("Subclasses must implement this method") - def update_complete_pct(self): + def update_complete_pct(self) -> None: """ Threadsafe update of the completion percentage. """ @@ -52,7 +62,7 @@ def update_complete_pct(self): except Exception as e: self.logger.error(f"Error in update_complete_pct {e}") - def update_total_task_count(self, increment_by: int): + def update_total_task_count(self, increment_by: int) -> None: """ Threadsafe update of the toal task count, necessary when a task has several subtasks. """ @@ -61,7 +71,7 @@ def update_total_task_count(self, increment_by: int): self.update_complete_pct() - def update_completed_task_count(self, increment_by: int): + def update_completed_task_count(self, increment_by: int) -> None: """ Threadsafe update of the task counter, necessary when a task has several subtasks. """ @@ -70,14 +80,14 @@ def update_completed_task_count(self, increment_by: int): self.update_complete_pct() - def finish(self): + def finish(self) -> None: """ Mark this task as done, and force completion to 100% in case of rounding errors. """ self.finished = True self.complete_pct = 1.0 - async def run_task(self): + async def run_task(self) -> Any: """ This top level task gets called when generating data and wraps the actual task to be run in a loop that checks if the event generator has completed setup. @@ -95,7 +105,7 @@ async def run_task(self): self.logger.info("Setup not complete? Waiting...") await asyncio.sleep(1) - async def run_db_load_task(self): + async def run_db_load_task(self) -> Any: """ This top level task gets called when loading existing data into the database and wraps the actual task to be run. This may not be necessary since the event generator should not need diff --git a/xapi_db_load/xapi/xapi_common.py b/xapi_db_load/xapi/xapi_common.py index b926ef9..7d7570f 100644 --- a/xapi_db_load/xapi/xapi_common.py +++ b/xapi_db_load/xapi/xapi_common.py @@ -21,5 +21,5 @@ def __init__(self, load_generator): self.parent_load_generator = load_generator @abstractmethod - def get_data(self) -> dict: + def get_data(self, *args, **kwargs) -> dict: """Return a dict with event_id, verb, actor_id, emission_time, and event JSON.""" diff --git a/xapi_db_load/xapi/xapi_forum.py b/xapi_db_load/xapi/xapi_forum.py index 59c4ad2..3bbe4a4 100644 --- a/xapi_db_load/xapi/xapi_forum.py +++ b/xapi_db_load/xapi/xapi_forum.py @@ -1,6 +1,7 @@ """ Fake xAPI statements for various forum events. """ + import json from uuid import uuid4 @@ -12,7 +13,7 @@ class BaseForum(XAPIBase): Base xAPI class for forum events. """ - def get_data(self): + def get_data(self) -> dict: """ Generate and return the event dict, including xAPI statement as "event". """ @@ -40,7 +41,14 @@ def get_data(self): "event": e, } - def get_randomized_event(self, event_id, account, course, post_id, create_time): + def get_randomized_event( + self, + event_id: str, + account: str, + course, + post_id: str, + create_time, + ) -> str: """ Given the inputs, return an xAPI statement. @@ -69,15 +77,15 @@ def get_randomized_event(self, event_id, account, course, post_id, create_time): "extensions": { "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", "https://w3id.org/xapi/openedx/extensions/session-id": "054c9ddcb76d2096f862e66bda3bc308", - "https://w3id.org/xapi/acrossx/extensions/type": "discussion" - } + "https://w3id.org/xapi/acrossx/extensions/type": "discussion", + }, }, "object": { "definition": { "type": "http://id.tincanapi.com/activitytype/discussion" }, "id": post_id, - "objectType": "Activity" + "objectType": "Activity", }, "timestamp": create_time.isoformat(), "verb": {"display": {"en": self.verb_display}, "id": self.verb}, diff --git a/xapi_db_load/xapi/xapi_grade.py b/xapi_db_load/xapi/xapi_grade.py index 296b5ae..f5babd9 100644 --- a/xapi_db_load/xapi/xapi_grade.py +++ b/xapi_db_load/xapi/xapi_grade.py @@ -1,6 +1,7 @@ """ Fake xAPI statements for various grading events. """ + import json import random from uuid import uuid4 @@ -16,7 +17,7 @@ class FirstTimePassed(XAPIBase): verb = "http://adlnet.gov/expapi/verbs/passed" verb_display = "passed" - def get_data(self): + def get_data(self) -> dict: """ Generate and return the event dict, including xAPI statement as "event". """ @@ -38,7 +39,13 @@ def get_data(self): "event": e, } - def get_randomized_event(self, event_id, account, course, create_time): + def get_randomized_event( + self, + event_id: str, + account: str, + course, + create_time, + ) -> str: """ Given the inputs, return an xAPI statement. """ @@ -51,7 +58,7 @@ def get_randomized_event(self, event_id, account, course, create_time): "context": { "extensions": { "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", - "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5" + "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5", } }, "object": { @@ -80,7 +87,7 @@ class GradeCalculated(XAPIBase): verb_display = "earned" object_type = None - def get_data(self): + def get_data(self) -> dict: """ Generate and return the event dict, including xAPI statement as "event". """ @@ -101,7 +108,13 @@ def get_data(self): "event": e, } - def get_randomized_event(self, event_id, actor_id, course, emission_time): + def get_randomized_event( + self, + event_id: str, + actor_id: str, + course, + emission_time, + ) -> str: """ Given the inputs, return an xAPI statement for a grade_calculated event. """ @@ -112,24 +125,16 @@ def get_randomized_event(self, event_id, actor_id, course, emission_time): "scaled": scaled_score, "raw": raw_score, "min": 0.0, - "max": max_score + "max": max_score, } event = { "actor": { - "account": { - "homePage": "http://localhost:18000", - "name": actor_id - }, - "objectType": "Agent" + "account": {"homePage": "http://localhost:18000", "name": actor_id}, + "objectType": "Agent", }, "id": event_id, - "verb": { - "id": self.verb, - "display": { - "en": self.verb_display - } - }, + "verb": {"id": self.verb, "display": {"en": self.verb_display}}, "context": { "contextActivities": { "parent": [ @@ -137,10 +142,8 @@ def get_randomized_event(self, event_id, actor_id, course, emission_time): "id": course.course_url, "objectType": "Activity", "definition": { - "name": { - "en-US": "Demonstration Course" - }, - "type": "http://adlnet.gov/expapi/activities/course" + "name": {"en-US": "Demonstration Course"}, + "type": "http://adlnet.gov/expapi/activities/course", }, } ] @@ -168,8 +171,8 @@ def get_randomized_event(self, event_id, actor_id, course, emission_time): "score": score_obj, "extensions": { "http://www.tincanapi.co.uk/activitytypes/grade_classification": grade_classification - } - } + }, + }, } event.update(course_fields) elif self.object_type == "subsection": @@ -179,12 +182,12 @@ def get_randomized_event(self, event_id, actor_id, course, emission_time): "definition": { "type": "http://id.tincanapi.com/activitytype/resource" }, - "objectType": "Activity" + "objectType": "Activity", }, "result": { "score": score_obj, "success": random.choice([True, False]), - } + }, } event.update(subsection_fields) diff --git a/xapi_db_load/xapi/xapi_hint_answer.py b/xapi_db_load/xapi/xapi_hint_answer.py index 5c3043d..f8eb1f8 100644 --- a/xapi_db_load/xapi/xapi_hint_answer.py +++ b/xapi_db_load/xapi/xapi_hint_answer.py @@ -1,6 +1,7 @@ """ Fake xAPI statements for various hint and answer events. """ + import json from uuid import uuid4 @@ -18,7 +19,7 @@ class HintAnswerBase(XAPIBase): # Whether this is a hint or an answer, "hint" or "answer" are valid values type = None - def get_data(self): + def get_data(self) -> dict: """ Generate and return the event dict, including xAPI statement as "event". """ @@ -43,7 +44,14 @@ def get_data(self): "event": e, } - def get_randomized_event(self, event_id, account, course, problem_id, create_time): + def get_randomized_event( + self, + event_id: str, + account: str, + course, + problem_id: str, + create_time, + ) -> str: """ Given the inputs, return an xAPI statement. """ @@ -86,8 +94,8 @@ def get_randomized_event(self, event_id, account, course, problem_id, create_tim }, "extensions": { "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", - "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5" - } + "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5", + }, }, "timestamp": create_time.isoformat(), "verb": {"display": {"en": self.verb_display}, "id": self.verb}, diff --git a/xapi_db_load/xapi/xapi_navigation.py b/xapi_db_load/xapi/xapi_navigation.py index 483e9e4..4248d1e 100644 --- a/xapi_db_load/xapi/xapi_navigation.py +++ b/xapi_db_load/xapi/xapi_navigation.py @@ -1,6 +1,7 @@ """ Fake xAPI statements for various navigation events. """ + import json from uuid import uuid4 @@ -23,7 +24,7 @@ class BaseNavigation(XAPIBase): # To differentiate between links and other nav events, should be "link" or "nav" type = None - def get_data(self): + def get_data(self) -> dict: """ Generate and return the event dict, including xAPI statement as "event". """ @@ -52,8 +53,14 @@ def get_data(self): } def get_randomized_event( - self, event_id, account, course, from_loc, to_loc, create_time - ): + self, + event_id: str, + account: str, + course, + from_loc, + to_loc, + create_time, + ) -> str: """ Given the inputs, return an xAPI statement. """ @@ -78,8 +85,8 @@ def get_randomized_event( }, "extensions": { "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", - "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5" - } + "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5", + }, }, "timestamp": create_time.isoformat(), "verb": {"display": {"en": self.verb_display}, "id": self.verb}, diff --git a/xapi_db_load/xapi/xapi_problem.py b/xapi_db_load/xapi/xapi_problem.py index 3527b14..6a99f3d 100644 --- a/xapi_db_load/xapi/xapi_problem.py +++ b/xapi_db_load/xapi/xapi_problem.py @@ -1,6 +1,7 @@ """ Fake xAPI statements for various problem_check events. """ + import json import random from uuid import uuid4 @@ -17,7 +18,7 @@ class BaseProblemCheck(XAPIBase): problem_type = None # "browser" or "server" - def get_data(self): + def get_data(self) -> dict: """ Generate and return the event dict, including xAPI statement as "event". """ @@ -44,8 +45,13 @@ def get_data(self): } def get_randomized_event( - self, event_id, account, course_locator, problem_id, create_time - ): + self, + event_id: str, + account: str, + course_locator: str, + problem_id: str, + create_time, + ) -> str: """ Given the inputs, return an xAPI statement. """ @@ -77,13 +83,15 @@ def get_randomized_event( "scaled": scaled_score, "raw": raw_score, "min": 0.0, - "max": max_score + "max": max_score, } server_object = { "object": { "definition": { - "extensions": {"http://id.tincanapi.com/extension/attempt-id": attempts}, + "extensions": { + "http://id.tincanapi.com/extension/attempt-id": attempts + }, "description": { "en-US": "Add the question text, or prompt, here. This text is required." }, diff --git a/xapi_db_load/xapi/xapi_registration.py b/xapi_db_load/xapi/xapi_registration.py index 948b957..9dde561 100644 --- a/xapi_db_load/xapi/xapi_registration.py +++ b/xapi_db_load/xapi/xapi_registration.py @@ -1,6 +1,7 @@ """ Fake xAPI statements for various registration events. """ + import json from random import choice from uuid import uuid4 @@ -13,7 +14,7 @@ class BaseRegistration(XAPIBase): Base xAPI class for registration events. """ - def get_data(self, course=None, enrolled_actor=None): + def get_data(self, course=None, enrolled_actor=None) -> dict: """ Generate and return the event dict, including xAPI statement as "event". """ @@ -44,7 +45,13 @@ def get_data(self, course=None, enrolled_actor=None): "event": e, } - def get_randomized_event(self, event_id, account, course_locator, create_time): + def get_randomized_event( + self, + event_id: str, + account: str, + course_locator: str, + create_time, + ) -> str: """ Given the inputs, return an xAPI statement. """ @@ -58,7 +65,7 @@ def get_randomized_event(self, event_id, account, course_locator, create_time): "context": { "extensions": { "https://w3id.org/xapi/openedx/extension/transformer-version": "event-routing-backends@7.0.1", - "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5" + "https://w3id.org/xapi/openedx/extensions/session-id": "e4858858443cd99828206e294587dac5", } }, "object": { diff --git a/xapi_db_load/xapi/xapi_video.py b/xapi_db_load/xapi/xapi_video.py index 3f0b4aa..d24acf6 100644 --- a/xapi_db_load/xapi/xapi_video.py +++ b/xapi_db_load/xapi/xapi_video.py @@ -19,7 +19,7 @@ class BaseVideo(XAPIBase): has_event_time = False has_time_from_to = False - def get_data(self): + def get_data(self) -> dict: """ Generate and return the event dict, including xAPI statement as "event". """ @@ -45,7 +45,14 @@ def get_data(self): "event": e, } - def get_randomized_event(self, event_id, account, course, video_id, create_time): + def get_randomized_event( + self, + event_id: str, + account: str, + course, + video_id: str, + create_time, + ) -> str: """ Given the inputs, return an xAPI statement. """ From f0c3131a714cf3a8c86b38fa212de5624d8d8ed9 Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 18 May 2026 10:29:01 -0600 Subject: [PATCH 4/9] style: Break out magic number constants Also makes lms_url configurable, which is a first step towards using real course content for these tests. --- default_config.yaml | 6 +++++ xapi_db_load/constants.py | 35 ++++++++++++++++++++++++++ xapi_db_load/course_configs.py | 14 +++++++---- xapi_db_load/generate_load_async.py | 11 ++++++-- xapi_db_load/main.py | 5 +++- xapi_db_load/xapi/xapi_forum.py | 2 +- xapi_db_load/xapi/xapi_grade.py | 4 +-- xapi_db_load/xapi/xapi_hint_answer.py | 2 +- xapi_db_load/xapi/xapi_navigation.py | 8 ++++-- xapi_db_load/xapi/xapi_problem.py | 7 +++++- xapi_db_load/xapi/xapi_registration.py | 7 +++++- xapi_db_load/xapi/xapi_video.py | 13 ++++++---- 12 files changed, 93 insertions(+), 21 deletions(-) create mode 100644 xapi_db_load/constants.py diff --git a/default_config.yaml b/default_config.yaml index 6988c95..1470bbc 100644 --- a/default_config.yaml +++ b/default_config.yaml @@ -17,6 +17,12 @@ csv_load_from_s3_after: false # Run options log_dir: logs + +# Base URL used as the LMS "homePage" / course URL prefix in generated xAPI +# statements. Override this to match a real environment when you need the +# emitted events to point at a specific host. +lms_url: http://localhost:18000 + num_xapi_batches: 300 batch_size: 1000 diff --git a/xapi_db_load/constants.py b/xapi_db_load/constants.py new file mode 100644 index 0000000..0e5d36c --- /dev/null +++ b/xapi_db_load/constants.py @@ -0,0 +1,35 @@ +""" +Shared constants for the xapi-db-load package. + +Centralizing these values makes the tool easier to reconfigure and avoids +"magic" literals scattered through event generators and course config code. +""" + +# Default LMS base URL used in generated xAPI statements when not overridden +# via config. Override by setting ``lms_url`` in the YAML config file. +DEFAULT_LMS_URL = "http://localhost:18000" + +# xAPI statement version emitted on every event. +XAPI_VERSION = "1.0.3" + +# Human-readable display name used for the synthetic "Demonstration Course" +# in xAPI activity definitions. +DEMO_COURSE_NAME = "Demonstration Course" + +# Hardcoded transformer / session identifiers from the original fixtures. +# These match values produced by event-routing-backends and are referenced +# in downstream regexes/tests, so they intentionally do not vary per event. +TRANSFORMER_VERSION = "event-routing-backends@7.0.1" +SESSION_ID_PLACEHOLDER = "e4858858443cd99828206e294587dac5" + +# Default length (in seconds) used for synthetic video events. +DEFAULT_VIDEO_LENGTH_SECONDS = 195.0 + +# Lengths used when truncating UUIDs into short, human-readable identifiers. +UUID_SHORT_LENGTH = 8 +COURSE_ID_SHORT_LENGTH = 6 + +# Range of "course runs" generated per logical course (inclusive of low, +# exclusive of high — passed straight to ``random.randrange``). +MIN_COURSE_RUNS = 1 +MAX_COURSE_RUNS = 5 diff --git a/xapi_db_load/course_configs.py b/xapi_db_load/course_configs.py index 635c66b..8b12bd8 100644 --- a/xapi_db_load/course_configs.py +++ b/xapi_db_load/course_configs.py @@ -10,6 +10,8 @@ from random import choice, randrange from typing import Dict, List, NamedTuple +from xapi_db_load.constants import DEFAULT_LMS_URL, UUID_SHORT_LENGTH + class Actor: """ @@ -90,6 +92,7 @@ async def populate( course_config_name: str, course_size_makeup: dict, tags: list[str], + lms_url: str = DEFAULT_LMS_URL, ) -> "RandomCourse": self.course_uuid = course_uuid self.course_run = course_run @@ -98,8 +101,9 @@ async def populate( # to be able to catch all course runs in those queries. self.course_name = f"{self.course_uuid} ({course_config_name})" self.org = org + self.lms_url = lms_url self.course_id = f"course-v1:{org}+{self.course_uuid}+{self.course_run}" - self.course_url = f"http://localhost:18000/course/{self.course_id}" + self.course_url = f"{self.lms_url}/course/{self.course_id}" delta = datetime.timedelta(days=course_length) self.start_date = self._random_datetime( @@ -229,8 +233,8 @@ def get_video_id(self) -> str: return choice(self.video_ids) def _generate_random_block_type_id(self, block_type: str) -> str: - block_uuid = str(uuid.uuid4())[:8] - return f"http://localhost:18000/xblock/block-v1:{self.course_id}+type@{block_type}+block@{block_uuid}" + block_uuid = str(uuid.uuid4())[:UUID_SHORT_LENGTH] + return f"{self.lms_url}/xblock/block-v1:{self.course_id}+type@{block_type}+block@{block_uuid}" def get_problem_id(self) -> str: """ @@ -251,8 +255,8 @@ def get_random_forum_post_id(self) -> str: return choice(self.forum_post_ids) def _generate_random_forum_post_id(self) -> str: - thread_id = str(uuid.uuid4())[:8] - return f"http://localhost:18000/api/discussion/v1/threads/{thread_id}" + thread_id = str(uuid.uuid4())[:UUID_SHORT_LENGTH] + return f"{self.lms_url}/api/discussion/v1/threads/{thread_id}" def get_random_nav_location(self) -> str: """ diff --git a/xapi_db_load/generate_load_async.py b/xapi_db_load/generate_load_async.py index 65564ba..193580c 100644 --- a/xapi_db_load/generate_load_async.py +++ b/xapi_db_load/generate_load_async.py @@ -11,6 +11,12 @@ from random import choice, choices from typing import Dict, Generator, List +from xapi_db_load.constants import ( + COURSE_ID_SHORT_LENGTH, + DEFAULT_LMS_URL, + MAX_COURSE_RUNS, + MIN_COURSE_RUNS, +) from xapi_db_load.course_configs import Actor, RandomCourse from xapi_db_load.fixtures.music_tags import MUSIC_TAGS from xapi_db_load.waiter import Waiter @@ -177,8 +183,8 @@ async def setup_courses(self) -> None: ] org = choice(self.orgs) actors = choices(self.actors, k=course_config_makeup["actors"]) - runs = random.randrange(1, 5) - course_id = str(uuid.uuid4())[:6] + runs = random.randrange(MIN_COURSE_RUNS, MAX_COURSE_RUNS) + course_id = str(uuid.uuid4())[:COURSE_ID_SHORT_LENGTH] # Create 1-5 of the same course size / makeup / name # but different course runs. @@ -194,6 +200,7 @@ async def setup_courses(self) -> None: course_config_name, course_config_makeup, self.tags, + lms_url=self.config.get("lms_url", DEFAULT_LMS_URL), ) self.courses.append(course) diff --git a/xapi_db_load/main.py b/xapi_db_load/main.py index 56fe8a9..ea0c425 100644 --- a/xapi_db_load/main.py +++ b/xapi_db_load/main.py @@ -10,9 +10,9 @@ import uvloop import yaml +from xapi_db_load.constants import DEFAULT_LMS_URL from xapi_db_load.ui.text_ui import TextUI - _ENV_VAR_OVERRIDES = { "XAPI_DB_LOAD_CLICKHOUSE_PASSWORD": "db_password", "XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY": "s3_secret", @@ -40,6 +40,9 @@ def get_config(config_file: str) -> dict: if value is not None: conf[config_key] = value + # Apply defaults for optional config keys. + conf.setdefault("lms_url", DEFAULT_LMS_URL) + return conf diff --git a/xapi_db_load/xapi/xapi_forum.py b/xapi_db_load/xapi/xapi_forum.py index 3bbe4a4..94eb66e 100644 --- a/xapi_db_load/xapi/xapi_forum.py +++ b/xapi_db_load/xapi/xapi_forum.py @@ -59,7 +59,7 @@ def get_randomized_event( "id": event_id, "actor": { "objectType": "Agent", - "account": {"homePage": "http://localhost:18000", "name": account}, + "account": {"homePage": course.lms_url, "name": account}, }, "context": { "contextActivities": { diff --git a/xapi_db_load/xapi/xapi_grade.py b/xapi_db_load/xapi/xapi_grade.py index f5babd9..87bdd57 100644 --- a/xapi_db_load/xapi/xapi_grade.py +++ b/xapi_db_load/xapi/xapi_grade.py @@ -52,7 +52,7 @@ def get_randomized_event( event = { "id": event_id, "actor": { - "account": {"homePage": "http://localhost:18000", "name": account}, + "account": {"homePage": course.lms_url, "name": account}, "objectType": "Agent", }, "context": { @@ -130,7 +130,7 @@ def get_randomized_event( event = { "actor": { - "account": {"homePage": "http://localhost:18000", "name": actor_id}, + "account": {"homePage": course.lms_url, "name": actor_id}, "objectType": "Agent", }, "id": event_id, diff --git a/xapi_db_load/xapi/xapi_hint_answer.py b/xapi_db_load/xapi/xapi_hint_answer.py index f8eb1f8..45b96bd 100644 --- a/xapi_db_load/xapi/xapi_hint_answer.py +++ b/xapi_db_load/xapi/xapi_hint_answer.py @@ -76,7 +76,7 @@ def get_randomized_event( event = { "id": event_id, "actor": { - "account": {"homePage": "http://localhost:18000", "name": account}, + "account": {"homePage": course.lms_url, "name": account}, "objectType": "Agent", }, "context": { diff --git a/xapi_db_load/xapi/xapi_navigation.py b/xapi_db_load/xapi/xapi_navigation.py index 4248d1e..809e0d2 100644 --- a/xapi_db_load/xapi/xapi_navigation.py +++ b/xapi_db_load/xapi/xapi_navigation.py @@ -67,7 +67,7 @@ def get_randomized_event( event = { "id": event_id, "actor": { - "account": {"homePage": "http://localhost:18000", "name": account}, + "account": {"homePage": course.lms_url, "name": account}, "objectType": "Agent", }, "context": { @@ -101,7 +101,11 @@ def get_randomized_event( "definition": { "type": "http://adlnet.gov/expapi/activities/link" }, - "id": "http://localhost:18000/courses/course-v1:edX+DemoX+Demo_Course/jump_to/block-v1:edX+DemoX+Demo_Course+type@sequential+block@6ab9c442501d472c8ed200e367b4edfa", # pylint: disable=line-too-long + "id": ( + f"{course.lms_url}/courses/course-v1:edX+DemoX+Demo_Course" + "/jump_to/block-v1:edX+DemoX+Demo_Course+type@sequential" + "+block@6ab9c442501d472c8ed200e367b4edfa" + ), "objectType": "Activity", } } diff --git a/xapi_db_load/xapi/xapi_problem.py b/xapi_db_load/xapi/xapi_problem.py index 6a99f3d..e0b0934 100644 --- a/xapi_db_load/xapi/xapi_problem.py +++ b/xapi_db_load/xapi/xapi_problem.py @@ -112,7 +112,12 @@ def get_randomized_event( "id": event_id, "actor": { "objectType": "Agent", - "account": {"homePage": "http://localhost:18000", "name": account}, + "account": { + "homePage": self.parent_load_generator.config.get( + "lms_url", "http://localhost:18000" + ), + "name": account, + }, }, "context": { "contextActivities": { diff --git a/xapi_db_load/xapi/xapi_registration.py b/xapi_db_load/xapi/xapi_registration.py index 9dde561..f0ce1d4 100644 --- a/xapi_db_load/xapi/xapi_registration.py +++ b/xapi_db_load/xapi/xapi_registration.py @@ -60,7 +60,12 @@ def get_randomized_event( "id": event_id, "actor": { "objectType": "Agent", - "account": {"homePage": "http://localhost:18000", "name": account}, + "account": { + "homePage": self.parent_load_generator.config.get( + "lms_url", "http://localhost:18000" + ), + "name": account, + }, }, "context": { "extensions": { diff --git a/xapi_db_load/xapi/xapi_video.py b/xapi_db_load/xapi/xapi_video.py index d24acf6..386b072 100644 --- a/xapi_db_load/xapi/xapi_video.py +++ b/xapi_db_load/xapi/xapi_video.py @@ -6,6 +6,8 @@ from random import randrange from uuid import uuid4 +from xapi_db_load.constants import DEFAULT_VIDEO_LENGTH_SECONDS + from .xapi_common import XAPIBase @@ -56,21 +58,22 @@ def get_randomized_event( """ Given the inputs, return an xAPI statement. """ - video_length = 195.0 + video_length = DEFAULT_VIDEO_LENGTH_SECONDS + max_offset = int(DEFAULT_VIDEO_LENGTH_SECONDS) video_event_time = video_event_time_to = video_event_time_from = None if self.has_event_time: - video_event_time = float(randrange(0, 195)) + video_event_time = float(randrange(0, max_offset)) if self.has_time_from_to: - video_event_time_from = float(randrange(0, 195)) - video_event_time_to = float(randrange(0, 195)) + video_event_time_from = float(randrange(0, max_offset)) + video_event_time_to = float(randrange(0, max_offset)) event = { "id": event_id, "actor": { "objectType": "Agent", - "account": {"homePage": "http://localhost:18000", "name": account}, + "account": {"homePage": course.lms_url, "name": account}, }, "context": { "contextActivities": { From 31433129130fc140c0468f11ae5f080f523c707d Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 18 May 2026 10:39:50 -0600 Subject: [PATCH 5/9] docs: Add missing docstrings Also fixes some random urwid typing issues First pass new docstrings were created by Claude Opus 4.7 --- tox.ini | 7 ++++++- xapi_db_load/async_app.py | 2 ++ xapi_db_load/backends/chdb.py | 1 + xapi_db_load/course_configs.py | 1 + xapi_db_load/generate_load_async.py | 2 ++ xapi_db_load/runner.py | 7 +++++++ xapi_db_load/ui/__init__.py | 1 + xapi_db_load/ui/config_ui.py | 6 ++++-- xapi_db_load/ui/load_ui.py | 7 +++++++ xapi_db_load/ui/log_ui.py | 10 ++++++++-- xapi_db_load/ui/main_ui.py | 16 ++++++++++++++-- xapi_db_load/ui/text_ui.py | 2 ++ xapi_db_load/ui/themes.py | 2 ++ xapi_db_load/waiter.py | 4 ++++ 14 files changed, 61 insertions(+), 7 deletions(-) diff --git a/tox.ini b/tox.ini index 39bc0cd..af0180f 100644 --- a/tox.ini +++ b/tox.ini @@ -31,8 +31,13 @@ ignore = E501 ; D412 = No blank lines allowed between a section header and its content (numpy style) ; D413 = Missing blank line after last section (numpy style) ; D414 = Section has no content (numpy style) +; D202 = No blank lines allowed after function docstring (conflicts with isort/black) +; D205 = 1 blank line required between summary line and description (cosmetic only) +; D400 = First line should end with a period (cosmetic only) +; D401 = First line should be in imperative mood (heuristic is unreliable) +; D415 = First line should end with a period, question mark, or exclamation point ; E501 = Line too long, this is handled in pylint -ignore = D101,D105,D107,D200,D203,D212,D215,D404,D405,D406,D407,D408,D409,D410,D411,D412,D413,D414 +ignore = D101,D105,D107,D200,D202,D203,D205,D212,D215,D400,D401,D404,D405,D406,D407,D408,D409,D410,D411,D412,D413,D414,D415 [pytest] diff --git a/xapi_db_load/async_app.py b/xapi_db_load/async_app.py index 08c9e37..3c5ffd5 100644 --- a/xapi_db_load/async_app.py +++ b/xapi_db_load/async_app.py @@ -74,11 +74,13 @@ def set_main_loop(self, loop): self.main_loop = loop def draw_screen(self): + """Trigger a UI redraw if a main loop is currently attached.""" if self.main_loop: self.main_loop.draw_screen() @staticmethod def get_shared_instance() -> "App": + """Return the process-wide singleton ``App`` instance.""" assert App._shared_instance, "App not initialized" return App._shared_instance diff --git a/xapi_db_load/backends/chdb.py b/xapi_db_load/backends/chdb.py index 98cbf50..11f52a0 100644 --- a/xapi_db_load/backends/chdb.py +++ b/xapi_db_load/backends/chdb.py @@ -932,6 +932,7 @@ def __init__( self.registered_tasks = [] def reset(self): + """Reset progress counters and per-run state (errors, shutdown flag).""" super().reset() self.error_count = 0 self.shutting_down = False diff --git a/xapi_db_load/course_configs.py b/xapi_db_load/course_configs.py index 8b12bd8..92000cf 100644 --- a/xapi_db_load/course_configs.py +++ b/xapi_db_load/course_configs.py @@ -94,6 +94,7 @@ async def populate( tags: list[str], lms_url: str = DEFAULT_LMS_URL, ) -> "RandomCourse": + """Populate this course with randomized blocks, dates, actors, and tags.""" self.course_uuid = course_uuid self.course_run = course_run # It's important that the course name stay the same between runs diff --git a/xapi_db_load/generate_load_async.py b/xapi_db_load/generate_load_async.py index 193580c..540b31a 100644 --- a/xapi_db_load/generate_load_async.py +++ b/xapi_db_load/generate_load_async.py @@ -277,9 +277,11 @@ def setup_taxonomies_tags(self) -> None: self.tags.append(tag) def get_random_event_count(self) -> int: + """Return the total number of random xAPI events that will be generated.""" return self.config["batch_size"] * self.config["num_xapi_batches"] def get_batch_events_iter(self) -> Generator[str, None, None]: + """Yield batch events pre-formatted as SQL VALUES tuples.""" for v in self.get_batch_events(): yield f"('{v['event_id']}', '{v['emission_time']}', '{v['event']}')" diff --git a/xapi_db_load/runner.py b/xapi_db_load/runner.py index 0d102d6..c11fd29 100644 --- a/xapi_db_load/runner.py +++ b/xapi_db_load/runner.py @@ -1,3 +1,10 @@ +""" +Top-level ``Runner`` that wires together backends, tasks, and the event generator. + +The ``Runner`` selects the appropriate backend based on config, registers tasks, +and drives the async task loop that performs the data generation / load. +""" + import asyncio import datetime from logging import Logger diff --git a/xapi_db_load/ui/__init__.py b/xapi_db_load/ui/__init__.py index e69de29..fa0367f 100644 --- a/xapi_db_load/ui/__init__.py +++ b/xapi_db_load/ui/__init__.py @@ -0,0 +1 @@ +"""Urwid-based terminal UI components for the xapi-db-load tool.""" diff --git a/xapi_db_load/ui/config_ui.py b/xapi_db_load/ui/config_ui.py index bc4f6d9..65b255f 100644 --- a/xapi_db_load/ui/config_ui.py +++ b/xapi_db_load/ui/config_ui.py @@ -14,9 +14,11 @@ def __init__(self, app: App): @property def config_widget(self): + """Return the underlying ``ConfigWidget``.""" return self.widget def show(self): + """Create the ``ConfigWidget`` if it doesn't already exist.""" if self.widget is None: self.widget = ConfigWidget(self.app) @@ -25,8 +27,8 @@ class ConfigWidget(urwid.WidgetWrap): def __init__(self, app: App): self.app = app self.config_text = urwid.Text(self._get_config_contents()) - self.config = urwid.Scrollable(self.config_text) - self.config_scrollbar = urwid.ScrollBar(self.config) + self.config = urwid.Scrollable(self.config_text) # type: ignore[arg-type] + self.config_scrollbar = urwid.ScrollBar(self.config) # type: ignore[arg-type] self.config.set_scrollpos(0) super().__init__(self.config_scrollbar) diff --git a/xapi_db_load/ui/load_ui.py b/xapi_db_load/ui/load_ui.py index d7b44a6..af2b64c 100644 --- a/xapi_db_load/ui/load_ui.py +++ b/xapi_db_load/ui/load_ui.py @@ -1,3 +1,5 @@ +"""Urwid widgets for the "Load" tab: triggers data generation and shows progress.""" + import asyncio import urwid @@ -15,6 +17,7 @@ def __init__(self, app): self.widget = LoadData(self.app) def show(self): + """Create the ``LoadData`` widget if it doesn't already exist.""" if self.widget is None: self.widget = LoadData(self.app) @@ -82,6 +85,7 @@ def __init__(self, app): super().__init__(self.widget) def go_pressed(self, button): + """Handler for the "Create Test Data" button: starts a full data-generation run.""" self.app.log("Go pressed") if self.app.runner.running: self.app.log("Already running") @@ -95,6 +99,7 @@ def go_pressed(self, button): asyncio.create_task(self.app.runner.run()) def load_pressed(self, button): + """Handler for the "Load from Object Storage Only" button.""" # TODO: This can be combined with go_pressed using user data self.app.log("Load pressed") if self.app.runner.running: @@ -109,6 +114,7 @@ def load_pressed(self, button): asyncio.create_task(self.app.runner.run(load_db_only=True)) async def update_status(self): + """Poll task progress and refresh the progress bars until the run finishes.""" while True: assert len(self.to_do_widgets) == len(self.app.runner.test_data_tasks) for i in range(len(self.to_do_widgets)): @@ -133,6 +139,7 @@ async def update_status(self): await asyncio.sleep(1) def keypress(self, size, key): + """Move focus to the header on Up arrow; otherwise delegate to the base widget.""" ui = App.get_shared_instance().ui if key == "up" and ui is not None: ui.main_display.frame.focus_position = "header" diff --git a/xapi_db_load/ui/log_ui.py b/xapi_db_load/ui/log_ui.py index 8f9c78e..915b138 100644 --- a/xapi_db_load/ui/log_ui.py +++ b/xapi_db_load/ui/log_ui.py @@ -1,3 +1,5 @@ +"""Urwid widgets implementing the log-tail view of the application's log file.""" + import asyncio from collections import deque @@ -5,15 +7,19 @@ class LogDisplay: + """Lazy container for the log-tail widget; instantiates it on first show.""" + def __init__(self, app): self.app = app self.widget = None @property def log_term(self): + """Return the underlying ``LogTail`` widget (or ``None`` if not yet shown).""" return self.widget def show(self): + """Create the ``LogTail`` widget if it doesn't already exist.""" if self.widget is None: self.widget = LogTail(self.app) @@ -22,10 +28,10 @@ class LogTail(urwid.WidgetWrap): def __init__(self, app): self.app = app self.log_tail = urwid.Text("Loading...") - self.log = urwid.Scrollable(self.log_tail) + self.log = urwid.Scrollable(self.log_tail) # type: ignore[arg-type] # Start scrolled to the bottom self.log.set_scrollpos(-1) - self.log_scrollbar = urwid.ScrollBar(self.log) + self.log_scrollbar = urwid.ScrollBar(self.log) # type: ignore[arg-type] asyncio.create_task(self.update()) super().__init__(self.log_scrollbar) diff --git a/xapi_db_load/ui/main_ui.py b/xapi_db_load/ui/main_ui.py index 7561be0..4885b1e 100644 --- a/xapi_db_load/ui/main_ui.py +++ b/xapi_db_load/ui/main_ui.py @@ -1,3 +1,7 @@ +"""Urwid widgets implementing the main tab-style display and menu.""" + +from typing import Union + import urwid from urwid import Widget @@ -6,6 +10,8 @@ from xapi_db_load.ui.load_ui import LoadDisplay from xapi_db_load.ui.log_ui import LogDisplay +SubDisplay = Union[LoadDisplay, ConfigDisplay, LogDisplay] + class SubDisplays: """ @@ -17,9 +23,10 @@ def __init__(self, app): self.load_display = LoadDisplay(self.app) self.config_display = ConfigDisplay(self.app) self.log_display = LogDisplay(self.app) - self.active_display = self.load_display + self.active_display: SubDisplay = self.load_display def active(self): + """Return the currently-active sub-display object.""" return self.active_display @@ -47,6 +54,7 @@ def __init__( super().__init__(body, header, footer) def mouse_event(self, size, event, button, col, row, focus): + """Track current focus on mouse events and delegate to the parent frame.""" current_focus = self.delegate.widget.get_focus_widgets()[-1] self.current_focus = current_focus return super(MainFrame, self).mouse_event(size, event, button, col, row, focus) @@ -66,8 +74,10 @@ def __init__(self, ui, app): self.menu_display = MenuDisplay(self.app, self) self.sub_displays = SubDisplays(self.app) + active_widget = self.sub_displays.active().widget + assert active_widget is not None self.frame = MainFrame( - self.sub_displays.active().widget, + active_widget, header=self.menu_display.widget, footer=urwid.AttrMap(urwid.Text(""), "shortcutbar"), delegate=self, @@ -103,9 +113,11 @@ def show_log(self, user_data): self.update_active_sub_display() def request_redraw(self, extra_delay=0.0): + """Schedule a screen redraw via the urwid main-loop alarm.""" self.app.ui.loop.set_alarm_in(0.25 + extra_delay, self.redraw_now) def redraw_now(self, sender=None, data=None): + """Clear and force redraw of the screen immediately.""" self.app.ui.loop.screen.clear() def quit(self, sender=None): diff --git a/xapi_db_load/ui/text_ui.py b/xapi_db_load/ui/text_ui.py index 3e520dd..d0a80c8 100644 --- a/xapi_db_load/ui/text_ui.py +++ b/xapi_db_load/ui/text_ui.py @@ -1,3 +1,5 @@ +"""Top-level urwid ``TextUI`` that owns the screen, palette, and asyncio main loop.""" + import asyncio import urwid diff --git a/xapi_db_load/ui/themes.py b/xapi_db_load/ui/themes.py index c6dac54..a1d8243 100644 --- a/xapi_db_load/ui/themes.py +++ b/xapi_db_load/ui/themes.py @@ -1,3 +1,5 @@ +"""Color palette definitions for the urwid UI.""" + THEMES = { "default": [ # Style name # 16-color style # Monochrome style # 88, 256 and true-color style diff --git a/xapi_db_load/waiter.py b/xapi_db_load/waiter.py index 01c1917..8bd73b6 100644 --- a/xapi_db_load/waiter.py +++ b/xapi_db_load/waiter.py @@ -1,3 +1,5 @@ +"""Base ``Waiter`` task class with shared progress bookkeeping for all backend tasks.""" + import asyncio from logging import Logger from threading import Lock @@ -36,9 +38,11 @@ def __init__( self.task_name = f"[Unnamed task] {type(self)}" def get_complete(self) -> float: + """Return the current completion percentage (0.0 - 1.0).""" return self.complete_pct def reset(self) -> None: + """Reset progress counters so the task can be re-run.""" self.complete_pct = 0.0 self.total_task_count = 0 self.completed_task_count = 0 From 32e8b4c0ca3ea928399298a270b668ea7e1296b9 Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 18 May 2026 11:20:23 -0600 Subject: [PATCH 6/9] test: Add some tests for uncovered areas Uncovered & fixed some bugs with reusing classes that don't happen in normal use but are good to fix. Claude Opus 4.7 handled the bulk of the new tests --- .gitignore | 1 + xapi_db_load/fixtures/music_tags.py | 5 +- xapi_db_load/generate_load_async.py | 11 +- xapi_db_load/tests/test_config.py | 81 +++++++++++ xapi_db_load/tests/test_event_generator.py | 115 ++++++++++++++++ xapi_db_load/tests/test_waiter.py | 90 +++++++++++++ xapi_db_load/tests/test_xapi_events.py | 150 +++++++++++++++++++++ 7 files changed, 447 insertions(+), 6 deletions(-) create mode 100644 xapi_db_load/tests/test_config.py create mode 100644 xapi_db_load/tests/test_event_generator.py create mode 100644 xapi_db_load/tests/test_waiter.py create mode 100644 xapi_db_load/tests/test_xapi_events.py diff --git a/.gitignore b/.gitignore index 9888632..66bac49 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ dist/ .dir-locals.el private_configs/ logs/ +tmp.* diff --git a/xapi_db_load/fixtures/music_tags.py b/xapi_db_load/fixtures/music_tags.py index 9d53c2f..4e7684e 100644 --- a/xapi_db_load/fixtures/music_tags.py +++ b/xapi_db_load/fixtures/music_tags.py @@ -1,6 +1,7 @@ """ This is a pythonized version the testing CSV we use for tags in edx-platform. """ + import csv from io import StringIO @@ -36,4 +37,6 @@ PIANO,Piano,CHORD, """) -MUSIC_TAGS = csv.DictReader(MUSIC_TAGS_CSV) +# Materialize into a list so the fixture can be consumed multiple times +# without having to do another read. This happens in tests, for instance. +MUSIC_TAGS = list(csv.DictReader(MUSIC_TAGS_CSV)) diff --git a/xapi_db_load/generate_load_async.py b/xapi_db_load/generate_load_async.py index 540b31a..e3e5413 100644 --- a/xapi_db_load/generate_load_async.py +++ b/xapi_db_load/generate_load_async.py @@ -83,11 +83,6 @@ class EventGenerator(Waiter): Generates a batch of random xAPI events based on the EVENT_WEIGHTS proportions. """ - actors: List[Actor] = [] - courses: List[RandomCourse] = [] - orgs: List[str] = [] - taxonomies: Dict = {} - tags: List = [] setup_complete: bool = False task_name: str = "Setup" @@ -95,6 +90,12 @@ def __init__( self, config: Dict, logger: Logger, event_generator: "EventGenerator|None" ): super().__init__(config, logger, self) + self.actors: List[Actor] = [] + self.courses: List[RandomCourse] = [] + self.orgs: List[str] = [] + self.taxonomies: Dict = {} + self.tags: List = [] + self.setup_complete = False self.start_date = config["start_date"] self.end_date = config["end_date"] self._validate_config() diff --git a/xapi_db_load/tests/test_config.py b/xapi_db_load/tests/test_config.py new file mode 100644 index 0000000..0f226cb --- /dev/null +++ b/xapi_db_load/tests/test_config.py @@ -0,0 +1,81 @@ +""" +Tests for ``xapi_db_load.main.get_config``: env-var overrides and defaults. +""" + +import textwrap + +import pytest + +from xapi_db_load.constants import DEFAULT_LMS_URL +from xapi_db_load.main import get_config + +# (env var name, config key it should populate) +ENV_VAR_OVERRIDES = [ + ("XAPI_DB_LOAD_CLICKHOUSE_PASSWORD", "db_password"), + ("XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY", "s3_secret"), + ("XAPI_DB_LOAD_RALPH_PASSWORD", "ralph_password"), +] + + +@pytest.fixture +def config_file(tmp_path): + """Write a minimal YAML config to a temp file and return its path.""" + path = tmp_path / "config.yaml" + path.write_text( + textwrap.dedent( + """ + backend: csv + db_password: from_file + s3_secret: from_file + ralph_password: from_file + """ + ).strip() + ) + return str(path) + + +@pytest.mark.parametrize( + "env_var,config_key", + ENV_VAR_OVERRIDES, + ids=[ck for _, ck in ENV_VAR_OVERRIDES], +) +def test_env_var_overrides_config_value(monkeypatch, config_file, env_var, config_key): + """When the env var is set, it takes precedence over the YAML value.""" + monkeypatch.setenv(env_var, "from_env") + conf = get_config(config_file) + assert conf[config_key] == "from_env" + + +@pytest.mark.parametrize( + "env_var,config_key", + ENV_VAR_OVERRIDES, + ids=[ck for _, ck in ENV_VAR_OVERRIDES], +) +def test_unset_env_var_falls_back_to_file( + monkeypatch, config_file, env_var, config_key +): + """When the env var is unset, the YAML value is kept.""" + monkeypatch.delenv(env_var, raising=False) + conf = get_config(config_file) + assert conf[config_key] == "from_file" + + +def test_lms_url_default_applied(config_file): + """If ``lms_url`` is absent from the config, the default is filled in.""" + conf = get_config(config_file) + assert conf["lms_url"] == DEFAULT_LMS_URL + + +def test_lms_url_from_config_is_preserved(tmp_path): + """An explicit ``lms_url`` in the file overrides the default.""" + path = tmp_path / "config.yaml" + path.write_text("backend: csv\nlms_url: https://my.lms.example.com\n") + + conf = get_config(str(path)) + assert conf["lms_url"] == "https://my.lms.example.com" + + +def test_config_file_path_is_recorded(config_file): + """``get_config`` stamps the source path into the returned dict.""" + conf = get_config(config_file) + assert conf["config_file"] == config_file diff --git a/xapi_db_load/tests/test_event_generator.py b/xapi_db_load/tests/test_event_generator.py new file mode 100644 index 0000000..0923948 --- /dev/null +++ b/xapi_db_load/tests/test_event_generator.py @@ -0,0 +1,115 @@ +""" +Validation tests for ``EventGenerator._validate_config`` and basic setup. +""" + +import datetime +import logging + +import pytest + +from xapi_db_load.generate_load_async import EventGenerator + + +def _base_config(**overrides) -> dict: + """Return a valid config dict that can be tweaked per-test via overrides.""" + base = { + "lms_url": "http://localhost:18000", + "start_date": datetime.date(2023, 1, 1), + "end_date": datetime.date(2023, 6, 1), + "course_length_days": 30, + "num_organizations": 1, + "num_actors": 5, + "num_course_sizes": {"small": 1}, + "num_course_publishes": 1, + "course_size_makeup": { + "small": { + "actors": 2, + "problems": 1, + "videos": 1, + "chapters": 1, + "sequences": 1, + "verticals": 1, + "forum_posts": 1, + } + }, + "batch_size": 1, + "num_xapi_batches": 1, + "num_actor_profile_changes": 1, + } + base.update(overrides) + return base + + +def _make_generator(config: dict) -> EventGenerator: + return EventGenerator(config, logging.getLogger("test"), None) + + +def test_valid_config_does_not_raise(): + """A correctly-formed config produces an EventGenerator without error.""" + _make_generator(_base_config()) + + +@pytest.mark.parametrize( + "overrides,expected_substring", + [ + pytest.param( + { + "start_date": datetime.date(2023, 6, 1), + "end_date": datetime.date(2023, 1, 1), + }, + "Start date must be before end date", + id="start_after_end", + ), + pytest.param( + { + "start_date": datetime.date(2023, 1, 1), + "end_date": datetime.date(2023, 1, 1), + }, + "Start date must be before end date", + id="start_equals_end", + ), + pytest.param( + { + "start_date": datetime.date(2023, 1, 1), + "end_date": datetime.date(2023, 1, 15), + "course_length_days": 30, + }, + "longer than course_length_days", + id="window_shorter_than_course", + ), + pytest.param( + { + "num_actors": 2, + "course_size_makeup": { + "small": { + "actors": 5, + "problems": 1, + "videos": 1, + "chapters": 1, + "sequences": 1, + "verticals": 1, + "forum_posts": 1, + } + }, + }, + "wants more actors", + id="actors_exceed_num_actors", + ), + ], +) +def test_invalid_config_raises(overrides, expected_substring): + """Each invalid config combination raises ``ValueError`` with a helpful message.""" + with pytest.raises(ValueError, match=expected_substring): + _make_generator(_base_config(**overrides)) + + +def test_instances_have_isolated_state(): + """Two generators must not share ``actors``/``courses``/``orgs`` collections.""" + a = _make_generator(_base_config()) + b = _make_generator(_base_config()) + + a.actors.append("sentinel") # type: ignore[arg-type] + a.orgs.append("sentinel") + + assert b.actors == [] + assert b.orgs == [] diff --git a/xapi_db_load/tests/test_waiter.py b/xapi_db_load/tests/test_waiter.py new file mode 100644 index 0000000..5eb6868 --- /dev/null +++ b/xapi_db_load/tests/test_waiter.py @@ -0,0 +1,90 @@ +""" +Unit tests for the ``Waiter`` base class used by every backend task. +""" + +import logging +from unittest.mock import MagicMock + +import pytest + +from xapi_db_load.waiter import Waiter + + +class _TestWaiter(Waiter): + """Concrete subclass that sets ``task_name`` so ``__init__`` succeeds.""" + + task_name = "test" + + +@pytest.fixture +def waiter() -> Waiter: + """Return a ready-to-use ``_TestWaiter`` with a stub EventGenerator.""" + event_generator = MagicMock() + event_generator.setup_complete = True + return _TestWaiter({}, logging.getLogger("test"), event_generator) + + +def test_initial_state(waiter): + """Counters and percentages start at zero, and ``finished`` is False.""" + assert waiter.complete_pct == 0.0 + assert waiter.total_task_count == 0 + assert waiter.completed_task_count == 0 + assert waiter.finished is False + assert waiter.get_complete() == 0.0 + + +def test_update_counts_drive_percentage(waiter): + """Updating total then completed counts produces the correct percentage.""" + waiter.update_total_task_count(10) + waiter.update_completed_task_count(3) + + assert waiter.total_task_count == 10 + assert waiter.completed_task_count == 3 + assert waiter.get_complete() == pytest.approx(0.3) + + +def test_update_complete_pct_handles_zero_total(waiter, caplog): + """``update_complete_pct`` swallows ZeroDivisionError and logs an error.""" + # total stays at 0 - the next call should not raise + waiter.completed_task_count = 1 + with caplog.at_level(logging.ERROR, logger="test"): + waiter.update_complete_pct() + + # Percentage is unchanged from its initial value, and an error was logged. + assert waiter.complete_pct == 0.0 + assert any("update_complete_pct" in r.message for r in caplog.records) + + +def test_finish_marks_complete(waiter): + """``finish()`` sets ``finished`` and forces percentage to 1.0.""" + waiter.update_total_task_count(5) + waiter.update_completed_task_count(2) + waiter.finish() + + assert waiter.finished is True + assert waiter.complete_pct == 1.0 + + +def test_reset_clears_state(waiter): + """``reset()`` returns the task to its initial state for re-runs.""" + waiter.update_total_task_count(10) + waiter.update_completed_task_count(5) + waiter.finish() + + waiter.reset() + + assert waiter.complete_pct == 0.0 + assert waiter.total_task_count == 0 + assert waiter.completed_task_count == 0 + assert waiter.finished is False + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "method_name", + ["_run_task", "_run_db_load_task"], +) +async def test_abstract_methods_raise(waiter, method_name): + """The default async hooks raise ``NotImplementedError`` until subclassed.""" + with pytest.raises(NotImplementedError): + await getattr(waiter, method_name)() diff --git a/xapi_db_load/tests/test_xapi_events.py b/xapi_db_load/tests/test_xapi_events.py new file mode 100644 index 0000000..cd5b848 --- /dev/null +++ b/xapi_db_load/tests/test_xapi_events.py @@ -0,0 +1,150 @@ +""" +Shape and content tests for every concrete xAPI event class. + +These tests guard against regressions in the dict layout returned by +``get_data()``, validate that the embedded ``event`` field is valid JSON, +and verify that the configurable ``lms_url`` propagates through to the +generated statements (i.e. nothing is silently hardcoded back to localhost). +""" + +import asyncio +import datetime +import json +import logging + +import pytest + +from xapi_db_load.generate_load_async import EventGenerator +from xapi_db_load.xapi.xapi_forum import PostCreated +from xapi_db_load.xapi.xapi_grade import ( + CourseGradeCalculated, + FirstTimePassed, + SubsectionGradeCalculated, +) +from xapi_db_load.xapi.xapi_hint_answer import ShowAnswer, ShowHint +from xapi_db_load.xapi.xapi_navigation import ( + LinkClicked, + NextNavigation, + PreviousNavigation, + TabSelectedNavigation, +) +from xapi_db_load.xapi.xapi_problem import BrowserProblemCheck, ServerProblemCheck +from xapi_db_load.xapi.xapi_registration import Registered, Unregistered +from xapi_db_load.xapi.xapi_video import ( + CompletedVideo, + LoadedVideo, + PausedVideo, + PlayedVideo, + PositionChangedVideo, + StoppedVideo, + TranscriptDisabled, + TranscriptEnabled, +) + +# Custom LMS URL so we can assert it propagates through the whole stack +# instead of accidentally falling back to the hardcoded default. +TEST_LMS_URL = "https://lms.test.example.com" + +# Every concrete event class the load tool can emit. +ALL_EVENT_CLASSES = [ + # Video + LoadedVideo, + PlayedVideo, + PausedVideo, + StoppedVideo, + CompletedVideo, + PositionChangedVideo, + TranscriptEnabled, + TranscriptDisabled, + # Problem + BrowserProblemCheck, + ServerProblemCheck, + # Navigation + NextNavigation, + PreviousNavigation, + TabSelectedNavigation, + LinkClicked, + # Hint / Answer + ShowHint, + ShowAnswer, + # Forum + PostCreated, + # Grade + FirstTimePassed, + CourseGradeCalculated, + SubsectionGradeCalculated, + # Registration + Registered, + Unregistered, +] + + +def _make_config() -> dict: + """Return a minimal but valid EventGenerator config.""" + return { + "lms_url": TEST_LMS_URL, + "start_date": datetime.date(2023, 1, 1), + "end_date": datetime.date(2023, 12, 31), + "course_length_days": 30, + "num_organizations": 2, + "num_actors": 5, + "num_course_sizes": {"small": 2}, + "num_course_publishes": 1, + "course_size_makeup": { + "small": { + "actors": 2, + "problems": 3, + "videos": 2, + "chapters": 2, + "sequences": 2, + "verticals": 2, + "forum_posts": 2, + } + }, + "batch_size": 10, + "num_xapi_batches": 1, + "num_actor_profile_changes": 1, + } + + +@pytest.fixture(scope="module") +def event_generator() -> EventGenerator: + """Build a fully-populated EventGenerator usable by every event class.""" + gen = EventGenerator(_make_config(), logging.getLogger("test"), None) + asyncio.run(gen.run_task()) + return gen + + +REQUIRED_KEYS = {"event_id", "verb", "actor_id", "emission_time", "event"} + + +@pytest.mark.parametrize("event_class", ALL_EVENT_CLASSES, ids=lambda c: c.__name__) +def test_event_get_data_shape(event_generator, event_class): + """Every event class returns a dict with the expected keys and a JSON event.""" + data = event_class(event_generator).get_data() + + assert REQUIRED_KEYS.issubset(data.keys()), ( + f"{event_class.__name__} missing required keys: " + f"{REQUIRED_KEYS - set(data.keys())}" + ) + assert data["verb"] == event_class.verb + assert isinstance(data["actor_id"], str) and data["actor_id"] + assert isinstance(data["event_id"], str) and data["event_id"] + + # The "event" field is a JSON-encoded xAPI statement. + statement = json.loads(data["event"]) + assert statement["id"] == data["event_id"] + assert statement["verb"]["id"] == event_class.verb + + +@pytest.mark.parametrize("event_class", ALL_EVENT_CLASSES, ids=lambda c: c.__name__) +def test_event_respects_configured_lms_url(event_generator, event_class): + """The configured ``lms_url`` must appear in the generated actor.account.homePage.""" + data = event_class(event_generator).get_data() + statement = json.loads(data["event"]) + + home_page = statement["actor"]["account"]["homePage"] + assert home_page == TEST_LMS_URL, ( + f"{event_class.__name__} generated homePage={home_page!r}, " + f"expected {TEST_LMS_URL!r}" + ) From b39f2120543adf4cd8292030c3eb8d664217ddaf Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 18 May 2026 11:41:52 -0600 Subject: [PATCH 7/9] fix: Clean up exception handling, raise in more places Previously some legitimate errors could be swallowed, this tightens things up and clarifies some logging. --- xapi_db_load/async_app.py | 9 +++++-- xapi_db_load/backends/ralph.py | 40 ++++++++++++++++++++++++++----- xapi_db_load/tests/test_waiter.py | 15 ++++++++---- xapi_db_load/waiter.py | 12 ++++++---- 4 files changed, 60 insertions(+), 16 deletions(-) diff --git a/xapi_db_load/async_app.py b/xapi_db_load/async_app.py index 3c5ffd5..2ff847c 100644 --- a/xapi_db_load/async_app.py +++ b/xapi_db_load/async_app.py @@ -60,8 +60,13 @@ def _setup_logger(self): file_handler.setFormatter(formatter) self.logger.setLevel(logging.DEBUG) self.logger.addHandler(file_handler) - except Exception: - self.logger.warning("Unable to open log file.") + except OSError as exc: + # File logging is a nice-to-have; failing to open the log file + # (permissions, disk full, missing directory, etc.) should not abort + # the run, especially in non-UI mode where stdout logging still works. + self.logger.warning( + "Unable to open log file %r: %s", self.logfile_path, exc + ) self.log("Logging set up") diff --git a/xapi_db_load/backends/ralph.py b/xapi_db_load/backends/ralph.py index fc9874f..cf80e82 100644 --- a/xapi_db_load/backends/ralph.py +++ b/xapi_db_load/backends/ralph.py @@ -58,12 +58,22 @@ class InsertXAPIEventsRalph(InsertXAPIEvents): Ralph to do the xAPI the insertion. """ + # Default timeout (seconds) applied to Ralph POST requests when the config + # does not specify one. Generous so legitimately-slow Ralph servers are not + # cut off, but finite so a hung endpoint does not stall the whole run. + # Set ``lrs_request_timeout: null`` in config to restore unbounded waits. + DEFAULT_LRS_REQUEST_TIMEOUT = 120 + def __init__(self, config: dict, logger: Logger, event_generator: EventGenerator): super().__init__(config, logger, event_generator) self.lrs_url = config["lrs_url"] self.lrs_username = config["lrs_username"] self.lrs_password = config["lrs_password"] + # ``None`` is a valid value for ``requests`` and means "wait forever". + self.lrs_request_timeout = config.get( + "lrs_request_timeout", self.DEFAULT_LRS_REQUEST_TIMEOUT + ) def _format_row(self, row: dict): """ @@ -74,13 +84,31 @@ def _format_row(self, row: dict): async def _do_insert(self, out_data: List): """ POST a batch of rows to Ralph instead of inserting directly to ClickHouse. + + A timeout is applied (configurable via ``lrs_request_timeout``) so a + hung Ralph endpoint cannot stall the run indefinitely. ``HTTPError`` and + connection-level failures are logged with the offending payload and + re-raised so the worker's existing retry/error-count handling applies. """ - resp = requests.post( - self.lrs_url, - auth=(self.lrs_username, self.lrs_password), - json=out_data, - headers={"Content-Type": "application/json"}, - ) + try: + resp = requests.post( + self.lrs_url, + auth=(self.lrs_username, self.lrs_password), + json=out_data, + headers={"Content-Type": "application/json"}, + timeout=self.lrs_request_timeout, + ) + except requests.RequestException as exc: + # Connection / timeout / SSL / etc. log the payload + # and re-raise so the worker loop records the failure. + self.logger.error( + "Ralph POST to %s failed before a response was received.", + self.lrs_url, + exc_info=exc, + ) + self.logger.error(json.dumps(out_data)) + raise + try: resp.raise_for_status() except requests.HTTPError: diff --git a/xapi_db_load/tests/test_waiter.py b/xapi_db_load/tests/test_waiter.py index 5eb6868..0544871 100644 --- a/xapi_db_load/tests/test_waiter.py +++ b/xapi_db_load/tests/test_waiter.py @@ -44,15 +44,22 @@ def test_update_counts_drive_percentage(waiter): def test_update_complete_pct_handles_zero_total(waiter, caplog): - """``update_complete_pct`` swallows ZeroDivisionError and logs an error.""" + """``update_complete_pct`` swallows ZeroDivisionError and leaves pct unchanged.""" # total stays at 0 - the next call should not raise waiter.completed_task_count = 1 - with caplog.at_level(logging.ERROR, logger="test"): + with caplog.at_level(logging.DEBUG, logger="test"): waiter.update_complete_pct() - # Percentage is unchanged from its initial value, and an error was logged. + # Percentage is unchanged from its initial value. assert waiter.complete_pct == 0.0 - assert any("update_complete_pct" in r.message for r in caplog.records) + + +def test_update_complete_pct_propagates_real_errors(waiter): + """Unexpected errors should propagate rather than being silently swallowed.""" + waiter.total_task_count = 10 + waiter.completed_task_count = "not a number" # type: ignore[assignment] + with pytest.raises(TypeError): + waiter.update_complete_pct() def test_finish_marks_complete(waiter): diff --git a/xapi_db_load/waiter.py b/xapi_db_load/waiter.py index 8bd73b6..cd406b5 100644 --- a/xapi_db_load/waiter.py +++ b/xapi_db_load/waiter.py @@ -61,10 +61,14 @@ def update_complete_pct(self) -> None: with self.complete_pct_lock: try: self.complete_pct = self.completed_task_count / self.total_task_count - # Eat all errors here since this can be happening in a long running thread that would - # otherwise die and leave the application hanging in the case of a div by 0 etc. - except Exception as e: - self.logger.error(f"Error in update_complete_pct {e}") + # Guard against div-by-zero when ``total_task_count`` has not yet + # been initialised by a producer task. Any other exception here indicates a + # real bug and should propagate. + except ZeroDivisionError: + self.logger.debug( + "update_complete_pct called before total_task_count was set; " + "leaving complete_pct unchanged." + ) def update_total_task_count(self, increment_by: int) -> None: """ From d296f48541dc3d54add32954eaa65464833b83f4 Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 18 May 2026 11:52:53 -0600 Subject: [PATCH 8/9] feat: Make logs rotate We've run into issues in the past with logs growing to fill huge amounts of disk, this limits that. --- default_config.yaml | 7 +++++ xapi_db_load/async_app.py | 65 ++++++++++++++++++++++++++++++--------- xapi_db_load/main.py | 2 +- 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/default_config.yaml b/default_config.yaml index 1470bbc..77d82ff 100644 --- a/default_config.yaml +++ b/default_config.yaml @@ -18,6 +18,13 @@ csv_load_from_s3_after: false # Run options log_dir: logs +# Maximum size of db_load.log before it rotates, in bytes. +# Set to 0 to disable rotation. +log_max_bytes: 10485760 # 10 MB + +# Number of rotated log backups to retain (e.g. db_load.log.1 ... .5). +log_backup_count: 5 + # Base URL used as the LMS "homePage" / course URL prefix in generated xAPI # statements. Override this to match a real environment when you need the # emitted events to point at a specific host. diff --git a/xapi_db_load/async_app.py b/xapi_db_load/async_app.py index 2ff847c..5dda8bc 100644 --- a/xapi_db_load/async_app.py +++ b/xapi_db_load/async_app.py @@ -3,6 +3,7 @@ """ import logging +import logging.handlers import os import sys from typing import TYPE_CHECKING @@ -32,33 +33,45 @@ def __init__(self, config, ui: "TextUI | None" = None): self.logfile_path = os.path.join(self.logfile_path, "db_load.log") # If we are in UI mode, this lets us communicate with the UI self.ui = ui - self._setup_logger() self.config = config + self._setup_logger() # The Runner coordinates the async tasks from the configured backend self.runner = Runner(self.config, self.logger) + # Default size (bytes) at which the rotating log file rolls over. + # Override via the ``log_max_bytes`` config key. Set to ``0`` to disable + # rotation (1 big file, unbounded growth). + DEFAULT_LOG_MAX_BYTES = 10 * 1024 * 1024 # 10 MB + + # Default number of rotated logs to retain. Override via + # ``log_backup_count``. Ignored if rotation is disabled. + DEFAULT_LOG_BACKUP_COUNT = 5 + def _setup_logger(self): """ - Logging is a little complicated, we always want to log to a file but also to stdout if we - are not in UI mode. + Configure file and (optionally) stdout logging. + + The logger always captures DEBUG to the file (with rotation) and INFO + to stdout when not running under the urwid UI. Per-handler levels are + set explicitly so adding a handler later cannot accidentally raise the + root level for existing handlers. """ self.logger = logging.getLogger(__name__) + self.logger.setLevel(logging.DEBUG) if not self.ui: stream_handler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter("%(asctime)s - %(message)s") - stream_handler.setFormatter(formatter) - self.logger.setLevel(logging.INFO) + stream_handler.setLevel(logging.INFO) + stream_handler.setFormatter(logging.Formatter("%(asctime)s - %(message)s")) self.logger.addHandler(stream_handler) try: - file_handler = logging.FileHandler(self.logfile_path) - formatter = logging.Formatter( - "%(asctime)s %(name)-12s %(levelname)-8s %(message)s" + file_handler = self._build_file_handler() + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter( + logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s") ) - file_handler.setFormatter(formatter) - self.logger.setLevel(logging.DEBUG) self.logger.addHandler(file_handler) except OSError as exc: # File logging is a nice-to-have; failing to open the log file @@ -70,6 +83,27 @@ def _setup_logger(self): self.log("Logging set up") + def _build_file_handler(self) -> logging.Handler: + """ + Return the file handler to use, honoring rotation config. + + ``log_max_bytes = 0`` disables rotation and returns a plain + ``FileHandler`` for users who prefer the pre-rotation behavior. + """ + max_bytes = self.config.get("log_max_bytes", self.DEFAULT_LOG_MAX_BYTES) + backup_count = self.config.get( + "log_backup_count", self.DEFAULT_LOG_BACKUP_COUNT + ) + + if not max_bytes: + return logging.FileHandler(self.logfile_path) + + return logging.handlers.RotatingFileHandler( + self.logfile_path, + maxBytes=max_bytes, + backupCount=backup_count, + ) + def set_main_loop(self, loop): """ Set the main loop. @@ -91,9 +125,10 @@ def get_shared_instance() -> "App": def log(self, message): """ - Convenience method to log a message and print if necessary. + Log an info-level message. + + In non-UI mode the stdout ``StreamHandler`` attached to ``self.logger`` + already prints to the terminal, so we do not also call ``print()`` -- + doing so would emit every message twice. """ self.logger.info(message) - - if not self.ui: - print(message) diff --git a/xapi_db_load/main.py b/xapi_db_load/main.py index ea0c425..094fb08 100644 --- a/xapi_db_load/main.py +++ b/xapi_db_load/main.py @@ -102,7 +102,7 @@ def load_db(config_file: str, load_db_only: bool): config = get_config(config_file) app = App(config) asyncio.run(app.runner.run(load_db_only)) - print(f"Total duration: {datetime.datetime.now() - start}") + app.log(f"Total duration: {datetime.datetime.now() - start}") exit(0) From 976778474993265eaece57a4ebbd720aa7e4206f Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Mon, 18 May 2026 15:39:19 -0600 Subject: [PATCH 9/9] docs: Update readme, add Ralph pw env override --- README.rst | 118 ++++++++++++++++++++++++++++-- xapi_db_load/main.py | 4 +- xapi_db_load/tests/test_config.py | 4 +- 3 files changed, 116 insertions(+), 10 deletions(-) diff --git a/README.rst b/README.rst index 32f1a76..b20fe47 100644 --- a/README.rst +++ b/README.rst @@ -26,6 +26,44 @@ Please add any issues you find here: https://github.com/openedx/xapi-db-load/iss Data can be generated using the following backends: +Backend comparison +------------------ + +.. list-table:: + :header-rows: 1 + :widths: 12 18 18 18 34 + + * - Backend + - Recommended scale + - Speed + - Output + - Best for + * - ``clickhouse`` + - Up to ~10K xAPI events + - Slow + - Direct ClickHouse inserts + - Smoke tests, configuration and permission checks + * - ``ralph`` + - Up to ~1M xAPI events + - Slowest + - HTTP POST to Ralph LRS + - Exercising the full Aspects / Ralph integration path + * - ``vector`` + - Up to ~10M xAPI events + - Medium + - Log statements consumed by Vector + - Testing a Vector-based pipeline into ClickHouse + * - ``csv`` + - Up to 100M xAPI events + - Fast + - Gzipped CSV files (local or block storage) + - Reusable fixtures, readable output, medium-to-large performance tests + * - ``chdb`` + - 100M+ xAPI events + - Fastest + - lz4 ClickHouse Native files on block storage + - Reusable fixtures, readable output, very large scale tests + clickhouse ---------- This backend issues batched insert statements directly against the configured @@ -42,6 +80,14 @@ clickhouse backend. It is useful for testing Ralph configuration, integration, and permissions. This is the slowest method, but exercises the largest surface area of the Aspects project. +vector +------ +This backend emits xAPI statements through a dedicated ``xapi_tracking`` +Python logger so that a co-located `Vector `_ agent can +read them and forward them to ClickHouse. All non-xAPI data (courses, blocks, +enrollments, etc.) is still written using the direct ``clickhouse`` backend. +Use this backend when validating a Vector-based ingestion pipeline. + csv --- This backend generates a single gzipped CSV file for each type of data and @@ -59,8 +105,8 @@ configured `start_date` and `end_date`. chdb ---- -This backend generates lz4 compressed ClickHouse Native files in S3 using the -CHDB in-process ClickHouse engine and can optionally load the files to +This backend generates lz4 compressed ClickHouse Native files in block storage +using the CHDB in-process ClickHouse engine, and can optionally load the files to a ClickHouse service directly after creation or at a later time using the ``--load_db_only`` option. The generated files are partitioned differently per data type to parallelize data writing and loading. This is the fastest engine for @@ -103,6 +149,37 @@ To try out the new UI mode: +Secrets and environment variable overrides +------------------------------------------ +Sensitive credentials should not be committed to source control. The following +environment variables override their corresponding config keys at load time +(env vars take precedence over values in the YAML file): + +.. list-table:: + :header-rows: 1 + :widths: 45 25 30 + + * - Environment variable + - Config key it overrides + - Used by + * - ``XAPI_DB_LOAD_CLICKHOUSE_PASSWORD`` + - ``db_password`` + - All ClickHouse-backed runs + * - ``XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY`` + - ``s3_secret`` + - ``csv`` (S3 destination), ``chdb`` + * - ``XAPI_DB_LOAD_RALPH_PASSWORD`` + - ``lrs_password`` + - ``ralph`` + +A typical pattern is to keep all non-secret keys in YAML and provide the +secrets via the shell, your CI secret store, or a ``.env`` file:: + + export XAPI_DB_LOAD_CLICKHOUSE_PASSWORD=... + export XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY=... + export XAPI_DB_LOAD_RALPH_PASSWORD=... + xapi-db-load load-db --config_file my_config.yaml + Configuration Format -------------------- There are a number of different configuration options for tuning the output. @@ -117,6 +194,20 @@ test:: # Location where timing logs will be saved log_dir: logs + # Maximum size of db_load.log before it rotates, in bytes. + # Defaults to 10 MB. Set to 0 to disable rotation and keep a single + # unbounded log file (the pre-rotation behavior). + log_max_bytes: 10485760 + + # Number of rotated log backups to retain (db_load.log.1 ... .5 by default). + log_backup_count: 5 + + # Base URL used as the LMS "homePage" / course URL prefix in every + # generated xAPI statement. Defaults to http://localhost:18000. Set this + # to match a real environment when you need the emitted events to point + # at a specific host. + lms_url: http://localhost:18000 + # xAPI statements will be generated in batches, the total number of # statements is ``num_xapi_batches * batch_size``. The batch size is the number # of xAPI statements sent to the backend (Ralph POST, ClickHouse insert, etc.) @@ -208,26 +299,41 @@ Ralph / ClickHouse Backend ^^^^^^^^^^^^^^^^^^^^^^^^^^ Variables necessary to send xAPI statements via Ralph:: - backend: ralph_clickhouse + backend: ralph lrs_url: http://ralph.tutor-nightly-local.orb.local/xAPI/statements lrs_username: ralph lrs_password: secret + # Optional: per-request timeout (seconds) applied to every Ralph POST. + # Defaults to 120. Set to ``null`` for unbounded waits (pre-timeout + # behavior). A finite value prevents a hung Ralph endpoint from stalling + # the entire run. + lrs_request_timeout: 120 + # This also requires all of the ClickHouse backend variables! +Vector Backend +^^^^^^^^^^^^^^ +The ``vector`` backend reuses the ``clickhouse`` connection variables for the +non-xAPI data and emits xAPI statements through the ``xapi_tracking`` logger +for Vector to consume:: + + backend: vector + # ... plus all the ClickHouse backend variables above + CSV Backend, Local Files ^^^^^^^^^^^^^^^^^^^^^^^^ Generates gzipped CSV files to a local directory:: - backend: csv_file + backend: csv csv_output_destination: logs/ CSV Backend, S3 Compatible Destination ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Generates gzipped CSV files to remote location:: - backend: csv_file + backend: csv # This can be anything smart-open can handle (ex. a local directory or # an S3 bucket etc.) but importing to ClickHouse using this tool only # supports S3 or compatible services like MinIO right now. @@ -244,7 +350,7 @@ CSV Backend, S3 Compatible Destination, Load to ClickHouse Generates gzipped CSV files to a remote location, then automatically loads them to ClickHouse:: - backend: csv_file + backend: csv # csv_output_destination can be anything smart_open can handle, a local # directory or an S3 bucket etc., but importing to ClickHouse using this # tool only supports S3 or compatible services (ex: MinIO) right now diff --git a/xapi_db_load/main.py b/xapi_db_load/main.py index 094fb08..277a5dc 100644 --- a/xapi_db_load/main.py +++ b/xapi_db_load/main.py @@ -16,7 +16,7 @@ _ENV_VAR_OVERRIDES = { "XAPI_DB_LOAD_CLICKHOUSE_PASSWORD": "db_password", "XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY": "s3_secret", - "XAPI_DB_LOAD_RALPH_PASSWORD": "ralph_password", + "XAPI_DB_LOAD_RALPH_PASSWORD": "lrs_password", } @@ -28,7 +28,7 @@ def get_config(config_file: str) -> dict: Environment variables take precedence over values in the config file: XAPI_DB_LOAD_CLICKHOUSE_PASSWORD -> db_password XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY -> s3_secret - XAPI_DB_LOAD_RALPH_PASSWORD -> ralph_password + XAPI_DB_LOAD_RALPH_PASSWORD -> lrs_password """ with open(config_file, "r") as y: conf = yaml.safe_load(y) diff --git a/xapi_db_load/tests/test_config.py b/xapi_db_load/tests/test_config.py index 0f226cb..b0e3c9a 100644 --- a/xapi_db_load/tests/test_config.py +++ b/xapi_db_load/tests/test_config.py @@ -13,7 +13,7 @@ ENV_VAR_OVERRIDES = [ ("XAPI_DB_LOAD_CLICKHOUSE_PASSWORD", "db_password"), ("XAPI_DB_LOAD_AWS_SECRET_ACCESS_KEY", "s3_secret"), - ("XAPI_DB_LOAD_RALPH_PASSWORD", "ralph_password"), + ("XAPI_DB_LOAD_RALPH_PASSWORD", "lrs_password"), ] @@ -27,7 +27,7 @@ def config_file(tmp_path): backend: csv db_password: from_file s3_secret: from_file - ralph_password: from_file + lrs_password: from_file """ ).strip() )