|
| 1 | +import os |
| 2 | +import requests |
| 3 | +import time |
| 4 | +from github import Github, Auth |
| 5 | +from requests.adapters import HTTPAdapter |
| 6 | +from urllib3.util.retry import Retry |
| 7 | +import logging |
| 8 | +import sys |
| 9 | + |
| 10 | +# Replace with your actual organization name |
| 11 | +TOPIC = os.getenv('TOPIC_TO_UNCOVER', '') |
| 12 | +ORGANIZATION = os.getenv('GITHUB_ORGANIZATION', '') |
| 13 | + |
| 14 | +MAX_RETRIES = 5 |
| 15 | +RETRY_BACKOFF_FACTOR = 2 |
| 16 | + |
| 17 | + |
| 18 | +def main(): |
| 19 | + logging.basicConfig( |
| 20 | + level=logging.INFO, |
| 21 | + format='%(asctime)s - %(levelname)s - %(message)s', |
| 22 | + stream=sys.stdout |
| 23 | + ) |
| 24 | + logger = logging.getLogger(__name__) |
| 25 | + |
| 26 | + assets_deactivated = AssetesDeactivate(logger=logger) |
| 27 | + assets_deactivated.run(TOPIC, ORGANIZATION) |
| 28 | + |
| 29 | + |
| 30 | +class AssetesDeactivate: |
| 31 | + def __init__(self, logger): |
| 32 | + self.logger = logger |
| 33 | + |
| 34 | + def run(self, topic, org): |
| 35 | + self.logger.info( |
| 36 | + f"Fetching repositories with the {topic} topic in org {org}...") |
| 37 | + if not org: |
| 38 | + self.logger.error( |
| 39 | + "GITHUB_ORGANIZATION environment variable is not set.") |
| 40 | + exit(1) |
| 41 | + if not topic: |
| 42 | + self.logger.error( |
| 43 | + "TOPIC_TO_UNCOVER environment variable is not set.") |
| 44 | + exit(1) |
| 45 | + relevant_repos = self.get_topic_repos(topic, org) |
| 46 | + auth_token = self.jit_authentication() |
| 47 | + |
| 48 | + # Check if token is valid for only 1 hour |
| 49 | + failed_repos = [] |
| 50 | + for repo in relevant_repos: |
| 51 | + try: |
| 52 | + self.jit_deactivate_asset(repo, auth_token) |
| 53 | + except Exception as e: |
| 54 | + self.logger.error( |
| 55 | + f"Failed to deactivate asset for repository: {repo.name}. Error message: {str(e)}") |
| 56 | + failed_repos.append(repo.name) |
| 57 | + if failed_repos: |
| 58 | + self.logger.warn( |
| 59 | + f"Failed to deactivate assets for the following repositories: {failed_repos}") |
| 60 | + |
| 61 | + def get_topic_repos(self, topic, org): |
| 62 | + github_token = os.getenv('GITHUB_TOKEN') |
| 63 | + if github_token == "" or github_token is None: |
| 64 | + self.logger.error("GITHUB_TOKEN environment variable is not set.") |
| 65 | + exit(1) |
| 66 | + |
| 67 | + auth = Auth.Token(github_token) |
| 68 | + github_client = Github(auth=auth) |
| 69 | + |
| 70 | + repos = github_client.get_organization(org).get_repos() |
| 71 | + # Can take a while, 10-20 seconds |
| 72 | + |
| 73 | + filterd_repos = [] |
| 74 | + for repo in repos: |
| 75 | + if topic in repo.topics: |
| 76 | + filterd_repos.append(repo) |
| 77 | + self.logger.info( |
| 78 | + f"Found {len(filterd_repos)} repositories with the {topic} topic in org {org}.") |
| 79 | + return filterd_repos |
| 80 | + |
| 81 | + def jit_authentication(self): |
| 82 | + """Authenticate with the JIT API and retrieve the access token.""" |
| 83 | + auth_url = "https://api.jit.io/authentication/login" |
| 84 | + auth_payload = { |
| 85 | + "clientId": os.getenv('JIT_CLIENT_ID'), |
| 86 | + "secret": os.getenv('JIT_SECRET') |
| 87 | + } |
| 88 | + auth_headers = { |
| 89 | + "accept": "application/json", |
| 90 | + "content-type": "application/json" |
| 91 | + } |
| 92 | + |
| 93 | + self.logger.info( |
| 94 | + f"Authenticating with JIT API using client ID: {auth_payload['clientId']}") |
| 95 | + response = self.send_request( |
| 96 | + url=auth_url, method="POST", headers=auth_headers, json=auth_payload) |
| 97 | + |
| 98 | + if response.status_code == 200: |
| 99 | + token = response.json().get('accessToken') |
| 100 | + self.logger.info("Authentication successful.") |
| 101 | + return token |
| 102 | + else: |
| 103 | + self.logger.error("Authentication failed. Exiting.") |
| 104 | + exit(1) |
| 105 | + |
| 106 | + def jit_deactivate_asset(self, repo, token): |
| 107 | + repo_name = repo.name |
| 108 | + self.logger.info(f"Processing repository: {repo_name}") |
| 109 | + |
| 110 | + asset_url = f"https://api.jit.io/asset/type/repo/vendor/github/owner/{repo.owner.login}/name/{repo_name}" |
| 111 | + asset_headers = { |
| 112 | + "accept": "application/json", |
| 113 | + "authorization": f"Bearer {token}" |
| 114 | + } |
| 115 | + |
| 116 | + self.logger.info(f"Fetching asset for repository: {repo_name}") |
| 117 | + asset_response = self.send_request( |
| 118 | + url=asset_url, headers=asset_headers) |
| 119 | + self.logger.info( |
| 120 | + f"Asset response status for {repo_name}: {asset_response.status_code}") |
| 121 | + |
| 122 | + if asset_response.status_code != 200: |
| 123 | + self.logger.error( |
| 124 | + f"Failed to fetch asset for repository {repo_name}!") |
| 125 | + return |
| 126 | + asset = asset_response.json() |
| 127 | + |
| 128 | + if asset.get('is_covered') is False: |
| 129 | + self.logger.info( |
| 130 | + f"Asset is already deactivated for repository: {repo_name}. Skipping...") |
| 131 | + return |
| 132 | + self.deactivate_asset(updated_asset=asset, token=token) |
| 133 | + self.logger.info(f"Asset deactivated for repository: {repo_name}") |
| 134 | + |
| 135 | + def deactivate_asset(self, updated_asset, token): |
| 136 | + update_url = f"https://api.jit.io/asset/asset/{updated_asset['asset_id']}" |
| 137 | + asset_headers = { |
| 138 | + "accept": "application/json", |
| 139 | + "authorization": f"Bearer {token}" |
| 140 | + } |
| 141 | + fields_to_update = { |
| 142 | + # "is_active": False, |
| 143 | + "is_covered": False |
| 144 | + } |
| 145 | + |
| 146 | + self.logger.info( |
| 147 | + f"Updating asset: {updated_asset['asset_id']} with data: {fields_to_update}") |
| 148 | + update_response = self.send_request( |
| 149 | + url=update_url, method="PATCH", headers=asset_headers, json=fields_to_update) |
| 150 | + self.logger.info( |
| 151 | + f"Update response status: {update_response.status_code}") |
| 152 | + |
| 153 | + def retry_request(func): |
| 154 | + """Decorator to retry a request in case of failure.""" |
| 155 | + |
| 156 | + def wrapper(self, *args, **kwargs): |
| 157 | + retries = MAX_RETRIES |
| 158 | + backoff_factor = RETRY_BACKOFF_FACTOR |
| 159 | + response = None |
| 160 | + for attempt in range(retries): |
| 161 | + response = func(*args, **kwargs) |
| 162 | + if response.status_code == 200: |
| 163 | + return response |
| 164 | + elif response.status_code == 401: |
| 165 | + self.logger.warn( |
| 166 | + "Unauthorized. The token might be expired. Re-authenticating...") |
| 167 | + # Re-authenticate and update the token |
| 168 | + main.auth_token = self.jit_authentication() |
| 169 | + kwargs['headers']['authorization'] = f"Bearer {main.auth_token}" |
| 170 | + elif response.status_code == 429: |
| 171 | + wait_time = (attempt + 1) * backoff_factor |
| 172 | + self.logger.warn( |
| 173 | + f"Rate limit hit. Retrying in {wait_time} seconds...") |
| 174 | + time.sleep(wait_time) |
| 175 | + elif response.status_code == 403: |
| 176 | + self.logger.error("Access is forbidden.") |
| 177 | + if "rate limit" in response.text.lower(): |
| 178 | + wait_time = (attempt + 1) * backoff_factor |
| 179 | + self.logger.warn( |
| 180 | + f"Rate limit possibly hit. Retrying in {wait_time} seconds...") |
| 181 | + time.sleep(wait_time) |
| 182 | + else: |
| 183 | + self.logger.error( |
| 184 | + "Access is permanently forbidden. Exiting.") |
| 185 | + exit(1) |
| 186 | + else: |
| 187 | + self.logger.warn( |
| 188 | + f"Request failed with status code {response.status_code}. Retrying...") |
| 189 | + wait_time = (attempt + 1) * backoff_factor |
| 190 | + time.sleep(wait_time) |
| 191 | + |
| 192 | + self.logger.error( |
| 193 | + f"Max retries reached. Request failed for URL: {args[0]}") |
| 194 | + return response |
| 195 | + return wrapper |
| 196 | + |
| 197 | + @ retry_request |
| 198 | + def send_request(url, method="GET", headers=None, json=None, params=None): |
| 199 | + session = requests.Session() |
| 200 | + retry = Retry(total=5, backoff_factor=1, |
| 201 | + status_forcelist=[429, 500, 502, 503, 504]) |
| 202 | + adapter = HTTPAdapter(max_retries=retry) |
| 203 | + session.mount('https://', adapter) |
| 204 | + session.mount('http://', adapter) |
| 205 | + |
| 206 | + if method == "GET": |
| 207 | + return session.get(url, headers=headers, params=params) |
| 208 | + elif method == "PATCH": |
| 209 | + return session.patch(url, headers=headers, json=json) |
| 210 | + elif method == "POST": |
| 211 | + return session.post(url, headers=headers, json=json) |
| 212 | + |
| 213 | + |
| 214 | +if __name__ == "__main__": |
| 215 | + main() |
0 commit comments