From e3c6796342ba2212413de5c7af3bfcc2d63cde3e Mon Sep 17 00:00:00 2001 From: PredictiveManish Date: Sat, 6 Dec 2025 22:55:06 +0530 Subject: [PATCH] Fix: Eliminate redundant full table scans in messages and events collection Signed-off-by: PredictiveManish --- collectoss/tasks/github/events.py | 23 ++++++++--------- collectoss/tasks/github/messages.py | 39 +++++++++++++++-------------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/collectoss/tasks/github/events.py b/collectoss/tasks/github/events.py index 24b1e42ff..22358547a 100644 --- a/collectoss/tasks/github/events.py +++ b/collectoss/tasks/github/events.py @@ -117,6 +117,10 @@ def collect(self, repo_git, key_auth, since): self.repo_identifier = f"{owner}/{repo}" event_batch_size = get_batch_size("event") + + # Build mappings once before processing any events + issue_url_to_id_map = self._get_map_from_issue_url_to_id(repo_id) + pr_url_to_id_map = self._get_map_from_pr_url_to_id(repo_id) events = [] for event in self._collect_events(repo_git, key_auth, since): @@ -124,11 +128,11 @@ def collect(self, repo_git, key_auth, since): # making this a decent size since process_events retrieves all the issues and prs each time if len(events) >= event_batch_size: - self._process_events(events, repo_id) + self._process_events(events, repo_id, issue_url_to_id_map, pr_url_to_id_map) events.clear() if events: - self._process_events(events, repo_id) + self._process_events(events, repo_id, issue_url_to_id_map, pr_url_to_id_map) def _collect_events(self, repo_git: str, key_auth, since): @@ -146,7 +150,7 @@ def _collect_events(self, repo_git: str, key_auth, since): if since and datetime.fromisoformat(event["created_at"].replace("Z", "+00:00")).replace(tzinfo=timezone.utc) < since: return - def _process_events(self, events, repo_id): + def _process_events(self, events, repo_id, issue_url_to_id_map, pr_url_to_id_map): issue_events = [] pr_events = [] @@ -164,19 +168,16 @@ def _process_events(self, events, repo_id): if not_mappable_events: self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Unable to map these github events to an issue or pr: {not_mappable_events}") - self._process_issue_events(issue_events, repo_id) - self._process_pr_events(pr_events, repo_id) + self._process_issue_events(issue_events, repo_id, issue_url_to_id_map) + self._process_pr_events(pr_events, repo_id, pr_url_to_id_map) update_issue_closed_cntrbs_by_repo_id(repo_id) - def _process_issue_events(self, issue_events, repo_id): + def _process_issue_events(self, issue_events, repo_id, issue_url_to_id_map): issue_event_dicts = [] contributors = [] - - issue_url_to_id_map = self._get_map_from_issue_url_to_id(repo_id) - for event in issue_events: event, contributor = self._process_github_event_contributors(event) @@ -203,13 +204,11 @@ def _process_issue_events(self, issue_events, repo_id): self._insert_issue_events(issue_event_dicts) - def _process_pr_events(self, pr_events, repo_id): + def _process_pr_events(self, pr_events, repo_id, pr_url_to_id_map): pr_event_dicts = [] contributors = [] - pr_url_to_id_map = self._get_map_from_pr_url_to_id(repo_id) - for event in pr_events: event, contributor = self._process_github_event_contributors(event) diff --git a/collectoss/tasks/github/messages.py b/collectoss/tasks/github/messages.py index 342eeb2ca..a954fb4b7 100644 --- a/collectoss/tasks/github/messages.py +++ b/collectoss/tasks/github/messages.py @@ -39,17 +39,30 @@ def collect_github_messages(repo_git: str, full_collection: bool) -> None: core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc) + # Build mappings once before processing any messages + # create mapping from issue url to issue id of current issues + issue_url_to_id_map = {} + issues = db_session.session.query(Issue).filter(Issue.repo_id == repo_id).all() + for issue in issues: + issue_url_to_id_map[issue.issue_url] = issue.issue_id + + # create mapping from pr url to pr id of current pull requests + pr_issue_url_to_id_map = {} + prs = db_session.session.query(PullRequest).filter(PullRequest.repo_id == repo_id).all() + for pr in prs: + pr_issue_url_to_id_map[pr.pr_issue_url] = pr.pull_request_id + if is_repo_small(repo_id): message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name, core_data_last_collected) if message_data: - process_messages(message_data, task_name, repo_id, logger, db_session) + process_messages(message_data, task_name, repo_id, logger, db_session, issue_url_to_id_map, pr_issue_url_to_id_map) else: logger.info(f"{owner}/{repo} has no messages") else: - process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, db_session, core_data_last_collected) + process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, db_session, core_data_last_collected, issue_url_to_id_map, pr_issue_url_to_id_map) def is_repo_small(repo_id): @@ -82,7 +95,7 @@ def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, tas return list(github_data_access.paginate_resource(url)) -def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, db_session, since) -> None: +def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, db_session, since, issue_url_to_id_map, pr_issue_url_to_id_map) -> None: message_batch_size = get_batch_size("message") @@ -129,16 +142,16 @@ def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger skipped_urls += 1 if len(all_data) >= message_batch_size: - process_messages(all_data, task_name, repo_id, logger, db_session) + process_messages(all_data, task_name, repo_id, logger, db_session, issue_url_to_id_map, pr_issue_url_to_id_map) all_data.clear() if len(all_data) > 0: - process_messages(all_data, task_name, repo_id, logger, db_session) + process_messages(all_data, task_name, repo_id, logger, db_session, issue_url_to_id_map, pr_issue_url_to_id_map) logger.info(f"{task_name}: Finished. Skipped {skipped_urls} comment URLs due to 404.") -def process_messages(messages, task_name, repo_id, logger, db_session): +def process_messages(messages, task_name, repo_id, logger, db_session, issue_url_to_id_map, pr_issue_url_to_id_map): tool_version = "2.0" data_source = "Github API" @@ -154,18 +167,6 @@ def process_messages(messages, task_name, repo_id, logger, db_session): if len(messages) == 0: logger.info(f"{task_name}: No messages to process") - # create mapping from issue url to issue id of current issues - issue_url_to_id_map = {} - issues = db_session.session.query(Issue).filter(Issue.repo_id == repo_id).all() - for issue in issues: - issue_url_to_id_map[issue.issue_url] = issue.issue_id - - # create mapping from pr url to pr id of current pull requests - pr_issue_url_to_id_map = {} - prs = db_session.session.query(PullRequest).filter(PullRequest.repo_id == repo_id).all() - for pr in prs: - pr_issue_url_to_id_map[pr.pr_issue_url] = pr.pull_request_id - message_len = len(messages) for index, message in enumerate(messages): @@ -297,4 +298,4 @@ def process_github_comment_contributors(message, tool_source, tool_version, data # This is done by searching all the dicts for the given key that has the specified value def find_dict_in_list_of_dicts(data, key, value): - return next((item for item in data if item[key] == value), None) \ No newline at end of file + return next((item for item in data if item[key] == value), None)