Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions collectoss/tasks/github/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,22 @@ def collect(self, repo_git, key_auth, since):
self.repo_identifier = f"{owner}/{repo}"

event_batch_size = get_batch_size("event")

# Build mappings once before processing any events
issue_url_to_id_map = self._get_map_from_issue_url_to_id(repo_id)
pr_url_to_id_map = self._get_map_from_pr_url_to_id(repo_id)

events = []
for event in self._collect_events(repo_git, key_auth, since):
events.append(event)

# making this a decent size since process_events retrieves all the issues and prs each time
if len(events) >= event_batch_size:
self._process_events(events, repo_id)
self._process_events(events, repo_id, issue_url_to_id_map, pr_url_to_id_map)
events.clear()

if events:
self._process_events(events, repo_id)
self._process_events(events, repo_id, issue_url_to_id_map, pr_url_to_id_map)

def _collect_events(self, repo_git: str, key_auth, since):

Expand All @@ -146,7 +150,7 @@ def _collect_events(self, repo_git: str, key_auth, since):
if since and datetime.fromisoformat(event["created_at"].replace("Z", "+00:00")).replace(tzinfo=timezone.utc) < since:
return

def _process_events(self, events, repo_id):
def _process_events(self, events, repo_id, issue_url_to_id_map, pr_url_to_id_map):

issue_events = []
pr_events = []
Expand All @@ -164,19 +168,16 @@ def _process_events(self, events, repo_id):
if not_mappable_events:
self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Unable to map these github events to an issue or pr: {not_mappable_events}")

self._process_issue_events(issue_events, repo_id)
self._process_pr_events(pr_events, repo_id)
self._process_issue_events(issue_events, repo_id, issue_url_to_id_map)
self._process_pr_events(pr_events, repo_id, pr_url_to_id_map)

update_issue_closed_cntrbs_by_repo_id(repo_id)

def _process_issue_events(self, issue_events, repo_id):
def _process_issue_events(self, issue_events, repo_id, issue_url_to_id_map):

issue_event_dicts = []
contributors = []


issue_url_to_id_map = self._get_map_from_issue_url_to_id(repo_id)

for event in issue_events:

event, contributor = self._process_github_event_contributors(event)
Expand All @@ -203,13 +204,11 @@ def _process_issue_events(self, issue_events, repo_id):

self._insert_issue_events(issue_event_dicts)

def _process_pr_events(self, pr_events, repo_id):
def _process_pr_events(self, pr_events, repo_id, pr_url_to_id_map):

pr_event_dicts = []
contributors = []

pr_url_to_id_map = self._get_map_from_pr_url_to_id(repo_id)

for event in pr_events:

event, contributor = self._process_github_event_contributors(event)
Expand Down
39 changes: 20 additions & 19 deletions collectoss/tasks/github/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,30 @@ def collect_github_messages(repo_git: str, full_collection: bool) -> None:
core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc)


# Build mappings once before processing any messages
# create mapping from issue url to issue id of current issues
issue_url_to_id_map = {}
issues = db_session.session.query(Issue).filter(Issue.repo_id == repo_id).all()
for issue in issues:
issue_url_to_id_map[issue.issue_url] = issue.issue_id

# create mapping from pr url to pr id of current pull requests
pr_issue_url_to_id_map = {}
prs = db_session.session.query(PullRequest).filter(PullRequest.repo_id == repo_id).all()
for pr in prs:
pr_issue_url_to_id_map[pr.pr_issue_url] = pr.pull_request_id

if is_repo_small(repo_id):
message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name, core_data_last_collected)

if message_data:
process_messages(message_data, task_name, repo_id, logger, db_session)
process_messages(message_data, task_name, repo_id, logger, db_session, issue_url_to_id_map, pr_issue_url_to_id_map)

else:
logger.info(f"{owner}/{repo} has no messages")

else:
process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, db_session, core_data_last_collected)
process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, db_session, core_data_last_collected, issue_url_to_id_map, pr_issue_url_to_id_map)


def is_repo_small(repo_id):
Expand Down Expand Up @@ -82,7 +95,7 @@ def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, tas
return list(github_data_access.paginate_resource(url))


def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, db_session, since) -> None:
def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, db_session, since, issue_url_to_id_map, pr_issue_url_to_id_map) -> None:

message_batch_size = get_batch_size("message")

Expand Down Expand Up @@ -129,16 +142,16 @@ def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger
skipped_urls += 1

if len(all_data) >= message_batch_size:
process_messages(all_data, task_name, repo_id, logger, db_session)
process_messages(all_data, task_name, repo_id, logger, db_session, issue_url_to_id_map, pr_issue_url_to_id_map)
all_data.clear()

if len(all_data) > 0:
process_messages(all_data, task_name, repo_id, logger, db_session)
process_messages(all_data, task_name, repo_id, logger, db_session, issue_url_to_id_map, pr_issue_url_to_id_map)

logger.info(f"{task_name}: Finished. Skipped {skipped_urls} comment URLs due to 404.")


def process_messages(messages, task_name, repo_id, logger, db_session):
def process_messages(messages, task_name, repo_id, logger, db_session, issue_url_to_id_map, pr_issue_url_to_id_map):

tool_version = "2.0"
data_source = "Github API"
Expand All @@ -154,18 +167,6 @@ def process_messages(messages, task_name, repo_id, logger, db_session):
if len(messages) == 0:
logger.info(f"{task_name}: No messages to process")

# create mapping from issue url to issue id of current issues
issue_url_to_id_map = {}
issues = db_session.session.query(Issue).filter(Issue.repo_id == repo_id).all()
for issue in issues:
issue_url_to_id_map[issue.issue_url] = issue.issue_id

# create mapping from pr url to pr id of current pull requests
pr_issue_url_to_id_map = {}
prs = db_session.session.query(PullRequest).filter(PullRequest.repo_id == repo_id).all()
for pr in prs:
pr_issue_url_to_id_map[pr.pr_issue_url] = pr.pull_request_id


message_len = len(messages)
for index, message in enumerate(messages):
Expand Down Expand Up @@ -297,4 +298,4 @@ def process_github_comment_contributors(message, tool_source, tool_version, data
# This is done by searching all the dicts for the given key that has the specified value
def find_dict_in_list_of_dicts(data, key, value):

return next((item for item in data if item[key] == value), None)
return next((item for item in data if item[key] == value), None)
Loading