diff --git a/README.md b/README.md index ca1e29f..fd1bbbf 100644 --- a/README.md +++ b/README.md @@ -228,6 +228,98 @@ options: -h, --help show this help message and exit ``` +### Alert Rules Management +Manage alert rules (list, get, create, update, delete, update-status) +```sh +python examples/alert_rules.py -h +usage: alert_rules.py [-h] {list,get,create,update,delete,update-status} ... + +Cisco Secure Access Alert Rules Management CLI + +positional arguments: + {list,get,create,update,delete,update-status} + Available commands + list List all alert rules + get Get a specific alert rule by ID + create Create a new alert rule + update Update an existing alert rule + delete Delete one or more alert rules + update-status Update the status of alert rules + +options: + -h, --help show this help message and exit +``` + +### Alert Integration +Create webhook integrations and associated alert rules end-to-end +```sh +python examples/alert_integration.py +``` + +### Complex Example +Class-based client with idempotent operations for destination lists, network tunnel groups, private resources, and access policies +```sh +python examples/complex_example.py -h +usage: complex_example.py [-h] -o {all,destination-list,network-tunnel-groups,private-resources,access-policy,list-network-tunnel-groups,list-private-resources,identities} + [--ntg-id NTG_ID] [--pr-id PR_ID] [-v] + +Cisco Secure Access API Client - Create and manage resources with idempotent operations. + +options: + -h, --help show this help message and exit + -o, --operation {all,destination-list,network-tunnel-groups,private-resources,access-policy,list-network-tunnel-groups,list-private-resources,identities} + Operation to perform + --ntg-id NTG_ID Network Tunnel Group ID (required for 'access-policy' operation when not running 'all') + --pr-id PR_ID Private Resource ID (required for 'access-policy' operation when not running 'all') + -v, --verbose Enable verbose/debug logging +``` + +### DLP Rule Events +Retrieve DLP rule events (Real-Time, SaaS API, AI Guardrails) with regional endpoint support +```sh +python examples/dlp_rule_events.py -h +usage: dlp_rule_events.py [-h] [--region {us,eu}] {list-realtime,list-saas,list-ai-guardrails,get} ... + +Cisco Secure Access DLP Rule Events Management CLI + +positional arguments: + {list-realtime,list-saas,list-ai-guardrails,get} + Available commands + list-realtime List Real-Time DLP rule events + list-saas List SaaS API DLP rule events + list-ai-guardrails List AI Guardrails DLP rule events + get Get DLP event details by ID + +options: + -h, --help show this help message and exit + --region {us,eu} API region: 'us' (default) or 'eu' +``` + +### Top Identities List +Fetch top identities with pagination, export to JSON/CSV, and optional chart visualization +```sh +python examples/top_identities_list.py -h +usage: top_identities_list.py [-h] [--from FROM] [--to TO] [--identitytypes TYPES] + [--top-n N] [--format {json,csv}] [--output FILE] + [--chart {none,bar,horizontal_bar,line,pie}] + [--chart-output FILE] [--page-delay SECONDS] + +Fetch all top identities from Cisco Secure Access (last 7 days by default). + +options: + -h, --help show this help message and exit + --from FROM Start of time range (default: -7days) + --to TO End of time range (default: now) + --identitytypes TYPES Identity type or comma-delimited list (e.g. 'roaming computers,users') + --top-n N Keep only the top N records after fetching (default: all) + --format {json,csv} Output format: json or csv (default: json) + --output FILE Output file path (- for stdout, default: top_identities.json) + --chart {none,bar,horizontal_bar,line,pie} + Chart type for visualization (default: none) + --chart-output FILE File path to save the chart PNG (default: top_identities_chart.png) + --page-delay SECONDS Seconds to sleep between page batches (default: 0) +``` + ### Key Admin API Management Manage API keys and administrative functions ```sh diff --git a/examples/access_rule_backup_restore.py b/examples/access_rule_backup_restore.py index 35f0cec..fb3279b 100644 --- a/examples/access_rule_backup_restore.py +++ b/examples/access_rule_backup_restore.py @@ -1,4 +1,4 @@ -# Copyright 2025 Cisco Systems, Inc. and its affiliates +# Copyright 2026 Cisco Systems, Inc. and its affiliates # # SPDX-License-Identifier: Apache-2.0 diff --git a/examples/access_token.py b/examples/access_token.py index 308738a..908cc23 100644 --- a/examples/access_token.py +++ b/examples/access_token.py @@ -1,4 +1,4 @@ -# Copyright 2025 Cisco Systems, Inc. and its affiliates +# Copyright 2026 Cisco Systems, Inc. and its affiliates # # SPDX-License-Identifier: Apache-2.0 @@ -27,12 +27,15 @@ import os import base64 +from time import time from secure_access.api.token_api import TokenApi from typing import Optional +import dotenv def generate_access_token( - client_id: Optional[str] = None, client_secret: Optional[str] = None + client_id: Optional[str] = None, client_secret: Optional[str] = None, + save_to_file: bool = False, file_path: str = ".env" ) -> str: """ Generates an OAuth2 access token for Cisco SSE API authentication. @@ -40,6 +43,8 @@ def generate_access_token( Args: client_id (Optional[str]): The client ID to use. If not provided, uses CLIENT_ID env var. client_secret (Optional[str]): The client secret to use. If not provided, uses CLIENT_SECRET env var. + save_to_file (bool): Whether to save the generated token to a file. Defaults to False. + file_path (str): The file path to save the token if save_to_file is True. Defaults to ".env". Returns: str: The access token string. @@ -61,7 +66,65 @@ def generate_access_token( grant_type="client_credentials", _headers={"Authorization": f"Basic {base64_credentials}"}, ) + if save_to_file: + save_access_token(response, file_path) return response.access_token except Exception as e: print(f"An error occurred while creating the access token: {e}") raise Exception("Failed to generate access token") from e + +def save_access_token(response: dict, file_path: str = ".env") -> None: + """ + Saves the access token to a file. + + Args: + response (dict): The response containing the access token and expiration. + file_path (str): The path to the file where the token will be saved. Defaults to '.env'. + """ + try: + dotenv.set_key(file_path, "ACCESS_TOKEN", str(response.access_token)) + dotenv.set_key(file_path, "EXPIRES_IN", str(response.expires_in)) + dotenv.set_key(file_path, "TIMESTAMP", str(time())) + print(f"Access token saved to {file_path}") + except Exception as e: + print(f"An error occurred while saving the access token: {e}") + raise Exception("Failed to save access token") from e + +def is_token_expired(file_path: str = ".env") -> bool: + """ + Checks if the access token is expired based on the saved timestamp and expiration time. + + Args: + file_path (str): The path to the file where the token is saved. Defaults to '.env'. + + Returns: + bool: True if the token is expired, False otherwise. + """ + try: + expires_in = int(dotenv.get_key(file_path, "EXPIRES_IN") or 0) + timestamp = float(dotenv.get_key(file_path, "TIMESTAMP") or 0) + current_time = time() + return current_time >= timestamp + expires_in + except Exception as e: + print(f"An error occurred while checking token expiration: {e}") + raise Exception("Failed to check token expiration") from e + +def get_valid_access_token(file_path: str = ".env") -> str: + """ + Retrieves a valid access token, generating a new one if the current token is expired. + + Args: + file_path (str): The path to the file where the token is saved. Defaults to '.env'. + + Returns: + str: A valid access token string. + """ + try: + if is_token_expired(file_path): + print("Access token is expired. Generating a new one.") + return generate_access_token(save_to_file=True, file_path=file_path) + else: + return dotenv.get_key(file_path, "ACCESS_TOKEN") or "" + except Exception as e: + print(f"An error occurred while retrieving the access token: {e}") + raise Exception("Failed to retrieve access token") from e \ No newline at end of file diff --git a/examples/alert_integration.py b/examples/alert_integration.py new file mode 100644 index 0000000..f630311 --- /dev/null +++ b/examples/alert_integration.py @@ -0,0 +1,104 @@ +# Copyright 2026 Cisco Systems, Inc. and its affiliates +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Cisco Secure Access API Client - Class-based implementation with idempotent operations. + +This module provides a class-based wrapper around the Cisco Secure Access API, +handling authentication, resource creation with idempotency checks, and proper logging. +""" + +import logging +from typing import Any, List, Tuple, Optional + +from secure_access import ConditionsAlertRule +from secure_access.models import NotificationInfoAlertRule, NotificationTypeAll, SeverityAlert, StatusAlertRule +from access_token import get_valid_access_token +from secure_access.models.create_alert_rule_request import CreateAlertRuleRequest +from secure_access.api.alert_rules_api import AlertRulesApi + +from integrations import CiscoSecureAccessIntegrationClient + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class CiscoSecureAccessAlertIntegrationClient: + """Client for interacting with Cisco Secure Access APIs with idempotent operations.""" + + def __init__(self) -> None: + """Initialize the client with authentication token.""" + try: + self.access_token = get_valid_access_token() + logger.info("Successfully obtained access token") + except Exception as e: + logger.error(f"Failed to obtain access token: {e}") + raise + + def _set_authorization_header(self, api_client: Any) -> None: + """ + Set the Authorization header for an API client. + + Args: + api_client: The API client instance to configure. + """ + api_client.api_client.set_default_header( + "Authorization", f"Bearer {self.access_token}" + ) + + def _build_alert_integration_payload(self, webhook_ids: List[str]) -> CreateAlertRuleRequest: + """Build the payload for creating a push security events integration.""" + return CreateAlertRuleRequest( + description="Alert rule created by CiscoSecureAccessAlertIntegrationClient", + name="Example Alert Rule", + notification_info=[NotificationInfoAlertRule( + webhook_ids=webhook_ids, + type=NotificationTypeAll("webhook"), + )], + rule_type_id=10, + severity=SeverityAlert(1), + status=StatusAlertRule(1), + conditions=ConditionsAlertRule( + match_type="all" + ) + ) + + def create_alert_rule(self, webhook_ids: List[str]) -> None: + """Create a push security events integration with idempotency check.""" + api_client = AlertRulesApi() + self._set_authorization_header(api_client) + + payload = self._build_alert_integration_payload(webhook_ids) + + try: + response = api_client.list_alert_rules_without_preload_content() + if not response.status == 200: + logger.error(f"Failed to retrieve existing alert rules. Status code: {response.status}") + return None + + existing_alert_rules = response.json() + logger.info(f"Retrieved existing alert rules: {existing_alert_rules}") + + for alert_rule in existing_alert_rules: + if alert_rule.get("name") == payload.name: + logger.info(f"Alert rule '{payload.name}' already exists with ID: {alert_rule.get('id')}") + return alert_rule.get("id") + + logger.info(f"Creating new alert rule '{payload.name}'...") + response = api_client.create_alert_rule_without_preload_content(payload) + alert_rule = response.json() + logger.info(f"Successfully created alert rule '{payload.name}': {alert_rule}") + except Exception as e: + logger.error(f"Failed to create alert rule '{payload.name}': {e}") + return None + +if __name__ == "__main__": + client = CiscoSecureAccessAlertIntegrationClient() + client2 = CiscoSecureAccessIntegrationClient() + webhook_id = client2.create_webhook_integration() + client.create_alert_rule([webhook_id]) \ No newline at end of file diff --git a/examples/alert_rules.py b/examples/alert_rules.py new file mode 100644 index 0000000..6b5056d --- /dev/null +++ b/examples/alert_rules.py @@ -0,0 +1,1064 @@ +# Copyright 2026 Cisco Systems, Inc. and its affiliates +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Alert Rules Management Example for Cisco Secure Access API. + +This module provides a comprehensive example of managing alert rules using +the Cisco Secure Access Python SDK. It demonstrates CRUD operations: +- List all alert rules +- Get a specific alert rule by ID +- Create a new alert rule +- Update an existing alert rule +- Delete alert rules +- Update the status of alert rules + +Usage: + python alert_rules.py list + python alert_rules.py get --id + python alert_rules.py create --name "My Alert" --rule-type-id 10 --severity 1 + python alert_rules.py create --name "My Alert" --rule-type-id 10 --severity 1 \\ + --conditions allAccessRules= --email admin@example.com + python alert_rules.py update --id --name "Updated Alert" --rule-type-id 10 --severity 2 + python alert_rules.py delete --ids + python alert_rules.py update-status --ids --status 1 + +Requirements: + - Set CLIENT_ID and CLIENT_SECRET environment variables + - Ensure all dependencies in requirements.txt are installed + +API Documentation: + - https://developer.cisco.com/docs/cloud-security/list-alert-rules/ + - https://developer.cisco.com/docs/cloud-security/get-alert-rule/ + - https://developer.cisco.com/docs/cloud-security/create-alert-rule/ + - https://developer.cisco.com/docs/cloud-security/update-alert-rule/ + - https://developer.cisco.com/docs/cloud-security/delete-alert-rules/ + - https://developer.cisco.com/docs/cloud-security/update-status-of-alert-rules/ +""" + +import argparse +import json +import logging +import sys +from typing import Any, Dict, List, Optional + +from access_token import get_valid_access_token +from secure_access.api.alert_rules_api import AlertRulesApi +from secure_access.models import ( + AlertRule, + ConditionsAlertRule, + ConditionsAlertRuleRowsInner, + CreateAlertRule201Response, + CreateAlertRuleRequest, + DeleteAlertRules200Response, + DeleteAlertRulesRequest, + NotificationInfoAlertRule, + NotificationTypeAll, + SeverityAlert, + StatusAlertRule, + UpdateAlertRule200Response, + UpdateAlertRuleRequest, + UpdateAlertRulesStatusRequest, + UpdateAlertRulesStatus200Response, +) +from secure_access.exceptions import ApiException + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class AlertRulesClient: + """ + Client for managing Cisco Secure Access Alert Rules. + + Provides methods for CRUD operations on alert rules including: + - Listing all alert rules + - Getting a specific alert rule + - Creating new alert rules + - Updating existing alert rules + - Deleting alert rules + - Updating alert rule status + """ + + def __init__(self) -> None: + """ + Initialize the AlertRulesClient with authentication. + + Raises: + ValueError: If required environment variables are not set. + Exception: If access token generation fails. + """ + try: + self.access_token = get_valid_access_token() + logger.info("Successfully obtained access token") + except Exception as e: + logger.error(f"Failed to obtain access token: {e}") + raise + + def _get_api_client(self) -> AlertRulesApi: + """ + Create and configure an AlertRulesApi client with authorization header. + + Returns: + AlertRulesApi: Configured API client instance. + """ + api_client = AlertRulesApi() + api_client.api_client.set_default_header( + "Authorization", f"Bearer {self.access_token}" + ) + return api_client + + def _build_condition_rows( + self, + conditions_rows: Optional[List[Dict[str, str]]], + rule_type_id: int, + ) -> List[ConditionsAlertRuleRowsInner]: + """ + Build condition rows for an alert rule. + + If conditions_rows is provided, uses those. Otherwise, applies a sensible + default based on the rule_type_id. + + Default condition rows by rule_type_id: + - 10 (Access Rule Changes): allAccessRules + - 1, 2 (Connectivity): allTunnelGroups + - 3-8 (API Anomalies): allApiKeys + - 9 (Data Usage): allDataUsage + - 11-17 (Behavior Analytics): allIdentities + """ + if conditions_rows: + return [ + ConditionsAlertRuleRowsInner(var_field=row["field"], value=row.get("value", "")) + for row in conditions_rows + ] + + # Default condition rows based on rule type + default_fields = { + 10: "allAccessRules", + } + field = default_fields.get(rule_type_id, "allAccessRules") + return [ConditionsAlertRuleRowsInner(var_field=field, value="")] + + # ========================================================================= + # LIST ALERT RULES + # ========================================================================= + def list_alert_rules(self) -> List[AlertRule]: + """ + List all alert rules for the organization. + + Returns: + List[AlertRule]: List of AlertRule model objects. + + Raises: + ApiException: If the API request fails. + + Example: + >>> client = AlertRulesClient() + >>> rules = client.list_alert_rules() + >>> for rule in rules: + ... print(f"Rule: {rule.name} (ID: {rule.id})") + """ + api_client = self._get_api_client() + logger.info("Retrieving all alert rules...") + + try: + alert_rules = api_client.list_alert_rules() + logger.info(f"Successfully retrieved {len(alert_rules)} alert rules") + + for rule in alert_rules: + logger.debug( + f"Rule ID: {rule.id}, " + f"Name: {rule.name}, " + f"Status: {rule.status}, " + f"Severity: {rule.severity}" + ) + + return alert_rules + + except ApiException as e: + logger.error(f"API error listing alert rules: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error listing alert rules: {e}") + raise + + # ========================================================================= + # GET ALERT RULE BY ID + # ========================================================================= + def get_alert_rule_by_id(self, rule_id: int) -> Optional[AlertRule]: + """ + Get a specific alert rule by its ID. + + Args: + rule_id (int): The unique identifier of the alert rule. + + Returns: + Optional[AlertRule]: The AlertRule model object, or None if not found. + + Raises: + ApiException: If the API request fails. + + Example: + >>> client = AlertRulesClient() + >>> rule = client.get_alert_rule_by_id(12345) + >>> if rule: + ... print(f"Found rule: {rule.name}") + """ + api_client = self._get_api_client() + logger.info(f"Retrieving alert rule with ID: {rule_id}") + + try: + alert_rule = api_client.get_alert_rule_by_id(rule_id=rule_id) + logger.info(f"Successfully retrieved alert rule: {alert_rule.name}") + return alert_rule + + except ApiException as e: + if e.status == 404: + logger.warning(f"Alert rule {rule_id} not found") + return None + logger.error(f"API error getting alert rule {rule_id}: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error getting alert rule {rule_id}: {e}") + raise + + # ========================================================================= + # CREATE ALERT RULE + # ========================================================================= + def create_alert_rule( + self, + name: str, + rule_type_id: int, + severity: int, + status: int = 1, + description: Optional[str] = None, + conditions_match_type: str = "all", + conditions_rows: Optional[List[Dict[str, str]]] = None, + webhook_ids: Optional[List[str]] = None, + email_recipients: Optional[List[str]] = None, + ) -> Optional[AlertRule]: + """ + Create a new alert rule (idempotent). + + If an alert rule with the same name already exists, returns the existing + rule instead of failing. This makes the operation safe to retry. + + Args: + name (str): The name of the alert rule (max 255 characters). + rule_type_id (int): The identifier of the rule type. + severity (int): The severity level (1=High, 2=Medium, 3=Low). + status (int): The status (1=Enabled, 2=Disabled). Defaults to 1. + description (Optional[str]): Description of the alert rule (max 100 chars). + conditions_match_type (str): Match type for conditions ("all" or "any"). + conditions_rows (Optional[List[Dict[str, str]]]): List of condition rows, + each with 'field' and 'value' keys. If not provided, defaults based on + rule_type_id (e.g., allAccessRules for rule_type_id 10). + webhook_ids (Optional[List[str]]): List of webhook IDs for notifications. + email_recipients (Optional[List[str]]): List of email recipients. + + Returns: + Optional[AlertRule]: The created or existing AlertRule model, or None on failure. + + Raises: + ApiException: If the API request fails (except for 409 which is handled). + + Example: + >>> client = AlertRulesClient() + >>> rule = client.create_alert_rule( + ... name="Security Alert", + ... rule_type_id=10, + ... severity=1, + ... email_recipients=["admin@example.com"] + ... ) + """ + api_client = self._get_api_client() + logger.info(f"Creating new alert rule: {name}") + + # Build notification info based on provided parameters + notification_info = [] + + if webhook_ids: + notification_info.append(NotificationInfoAlertRule( + webhook_ids=webhook_ids, + type=NotificationTypeAll("webhook"), + )) + + if email_recipients: + notification_info.append(NotificationInfoAlertRule( + recipients=email_recipients, + type=NotificationTypeAll("email"), + )) + + # Build condition rows + rows = self._build_condition_rows(conditions_rows, rule_type_id) + + # Build the request payload + payload = CreateAlertRuleRequest( + name=name, + rule_type_id=rule_type_id, + severity=SeverityAlert(severity), + status=StatusAlertRule(status), + description=description or "", + conditions=ConditionsAlertRule(match_type=conditions_match_type, rows=rows), + notification_info=notification_info, + ) + + try: + response: CreateAlertRule201Response = api_client.create_alert_rule( + create_alert_rule_request=payload + ) + logger.info(f"Create response: {response.message}") + + # Successfully created - fetch the rule by name to get full details + existing_rules = self.list_alert_rules() + for rule in existing_rules: + if rule.name == name: + logger.info(f"Successfully created alert rule '{name}' with ID: {rule.id}") + return rule + + logger.warning(f"Created rule '{name}' but could not find it in list") + return None + + except ApiException as e: + if e.status == 409: + # Rule with same name already exists - fetch and return it + logger.info(f"Alert rule '{name}' already exists, fetching existing rule...") + existing_rules = self.list_alert_rules() + for rule in existing_rules: + if rule.name == name: + logger.info(f"Found existing alert rule '{name}' with ID: {rule.id}") + return rule + logger.warning(f"Could not find existing rule '{name}' after 409 response") + return None + logger.error(f"API error creating alert rule: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error creating alert rule: {e}") + raise + + # ========================================================================= + # UPDATE ALERT RULE + # ========================================================================= + def update_alert_rule( + self, + rule_id: int, + name: str, + rule_type_id: int, + severity: int, + status: int = 1, + description: Optional[str] = None, + conditions_match_type: str = "all", + conditions_rows: Optional[List[Dict[str, str]]] = None, + webhook_ids: Optional[List[str]] = None, + email_recipients: Optional[List[str]] = None, + ) -> Optional[AlertRule]: + """ + Update an existing alert rule. + + Note: All fields must be provided as the update replaces the entire rule. + + Args: + rule_id (int): The unique identifier of the alert rule to update. + name (str): The new name of the alert rule. + rule_type_id (int): The identifier of the rule type. + severity (int): The severity level (1=High, 2=Medium, 3=Low). + status (int): The status (1=Enabled, 2=Disabled). Defaults to 1. + description (Optional[str]): Description of the alert rule. + conditions_match_type (str): Match type for conditions. + webhook_ids (Optional[List[str]]): List of webhook IDs for notifications. + email_recipients (Optional[List[str]]): List of email recipients. + + Returns: + Optional[AlertRule]: The updated AlertRule model, or None on failure. + + Raises: + ApiException: If the API request fails. + + Example: + >>> client = AlertRulesClient() + >>> updated_rule = client.update_alert_rule( + ... rule_id=12345, + ... name="Updated Security Alert", + ... rule_type_id=10, + ... severity=2 + ... ) + """ + api_client = self._get_api_client() + logger.info(f"Updating alert rule ID: {rule_id}") + + # Build notification info based on provided parameters + notification_info = [] + + if webhook_ids: + notification_info.append(NotificationInfoAlertRule( + webhook_ids=webhook_ids, + type=NotificationTypeAll("webhook"), + )) + + if email_recipients: + notification_info.append(NotificationInfoAlertRule( + recipients=email_recipients, + type=NotificationTypeAll("email"), + )) + + # Build condition rows + rows = self._build_condition_rows(conditions_rows, rule_type_id) + + # Build the request payload + payload = UpdateAlertRuleRequest( + name=name, + rule_type_id=rule_type_id, + severity=SeverityAlert(severity), + status=StatusAlertRule(status), + description=description or "", + conditions=ConditionsAlertRule(match_type=conditions_match_type, rows=rows), + notification_info=notification_info, + ) + + try: + response: UpdateAlertRule200Response = api_client.update_alert_rule( + rule_id=rule_id, + update_alert_rule_request=payload + ) + logger.info(f"Update response: {response.message}") + + # Fetch the updated rule to return full details + updated_rule = self.get_alert_rule_by_id(rule_id) + if updated_rule: + logger.info(f"Successfully updated alert rule ID: {rule_id}") + return updated_rule + + except ApiException as e: + logger.error(f"API error updating alert rule {rule_id}: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error updating alert rule {rule_id}: {e}") + raise + + # ========================================================================= + # DELETE ALERT RULES + # ========================================================================= + def delete_alert_rules(self, rule_ids: List[int]) -> Optional[DeleteAlertRules200Response]: + """ + Delete multiple alert rules by their IDs. + + Args: + rule_ids (List[int]): List of alert rule IDs to delete (max 100). + + Returns: + Optional[DeleteAlertRules200Response]: The deletion result model, or None on failure. + + Raises: + ApiException: If the API request fails. + ValueError: If rule_ids is empty or exceeds 100 items. + + Example: + >>> client = AlertRulesClient() + >>> result = client.delete_alert_rules([12345, 12346]) + >>> if result.success: + ... print(f"Deleted IDs: {result.successful_ids}") + """ + if not rule_ids: + raise ValueError("At least one rule ID must be provided") + + if len(rule_ids) > 100: + raise ValueError("Cannot delete more than 100 rules at once") + + api_client = self._get_api_client() + logger.info(f"Deleting {len(rule_ids)} alert rule(s): {rule_ids}") + + payload = DeleteAlertRulesRequest(rule_ids=rule_ids) + + try: + result: DeleteAlertRules200Response = api_client.delete_alert_rules( + delete_alert_rules_request=payload + ) + + if result.success: + logger.info(f"Successfully deleted alert rules: {result.successful_ids}") + else: + logger.warning(f"Partial delete: successful={result.successful_ids}, errors={result.error_ids}") + + return result + + except ApiException as e: + logger.error(f"API error deleting alert rules: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error deleting alert rules: {e}") + raise + + # ========================================================================= + # UPDATE ALERT RULES STATUS + # ========================================================================= + def update_alert_rules_status( + self, + rule_ids: List[int], + status: int + ) -> Optional[UpdateAlertRulesStatus200Response]: + """ + Update the status of multiple alert rules. + + Args: + rule_ids (List[int]): List of alert rule IDs to update (1-100 items). + status (int): The new status (1=Enabled, 2=Disabled). + + Returns: + Optional[UpdateAlertRulesStatus200Response]: The update result model, or None on failure. + + Raises: + ApiException: If the API request fails. + ValueError: If rule_ids is empty, exceeds 100 items, or status is invalid. + + Example: + >>> client = AlertRulesClient() + >>> # Disable alert rules + >>> result = client.update_alert_rules_status([12345, 12346], status=2) + >>> # Enable alert rules + >>> result = client.update_alert_rules_status([12345], status=1) + """ + if not rule_ids: + raise ValueError("At least one rule ID must be provided") + + if len(rule_ids) > 100: + raise ValueError("Cannot update more than 100 rules at once") + + if status not in [1, 2]: + raise ValueError("Status must be 1 (Enabled) or 2 (Disabled)") + + api_client = self._get_api_client() + status_text = "Enabled" if status == 1 else "Disabled" + logger.info(f"Updating status to '{status_text}' for {len(rule_ids)} rule(s): {rule_ids}") + + payload = UpdateAlertRulesStatusRequest( + entity_ids=rule_ids, + status=status + ) + + try: + result: UpdateAlertRulesStatus200Response = api_client.update_alert_rules_status( + update_alert_rules_status_request=payload + ) + + if result.success: + logger.info(f"Successfully updated status for alert rules: {result.successful_ids}") + else: + logger.warning(f"Partial update: successful={result.successful_ids}, errors={result.error_ids}") + + return result + + except ApiException as e: + logger.error(f"API error updating alert rules status: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error updating alert rules status: {e}") + raise + + def enable_alert_rules(self, rule_ids: List[int]) -> Optional[UpdateAlertRulesStatus200Response]: + """ + Enable multiple alert rules. + + Convenience method that calls update_alert_rules_status with status=1. + + Args: + rule_ids (List[int]): List of alert rule IDs to enable. + + Returns: + Optional[UpdateAlertRulesStatus200Response]: The update result, or None on failure. + + Example: + >>> client = AlertRulesClient() + >>> result = client.enable_alert_rules([12345, 12346]) + """ + return self.update_alert_rules_status(rule_ids, status=1) + + def disable_alert_rules(self, rule_ids: List[int]) -> Optional[UpdateAlertRulesStatus200Response]: + """ + Disable multiple alert rules. + + Convenience method that calls update_alert_rules_status with status=2. + + Args: + rule_ids (List[int]): List of alert rule IDs to disable. + + Returns: + Optional[UpdateAlertRulesStatus200Response]: The update result, or None on failure. + + Example: + >>> client = AlertRulesClient() + >>> result = client.disable_alert_rules([12345, 12346]) + """ + return self.update_alert_rules_status(rule_ids, status=2) + + +# ============================================================================= +# COMMAND LINE INTERFACE +# ============================================================================= +def setup_argparse() -> argparse.ArgumentParser: + """ + Set up the argument parser for the CLI. + + Returns: + argparse.ArgumentParser: Configured argument parser. + """ + parser = argparse.ArgumentParser( + description="Cisco Secure Access Alert Rules Management CLI", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # List all alert rules + python alert_rules.py list + + # Get a specific alert rule + python alert_rules.py get --id 12345 + + # Create a new alert rule (uses default conditions for rule type 10) + python alert_rules.py create --name "My Alert Rule" --rule-type-id 10 --severity 1 + + # Create with email notifications + python alert_rules.py create --name "Email Alert" --rule-type-id 10 --severity 1 \\ + --email admin@example.com security@example.com + + # Create with specific access rules (comma-separated rule IDs) + python alert_rules.py create --name "Specific Rules Alert" --rule-type-id 10 --severity 2 \\ + --conditions "specificAccessRules=1261863,1260333,1260344" + + # Create with webhook notifications + python alert_rules.py create --name "Webhook Alert" --rule-type-id 10 --severity 2 \\ + --webhook-ids webhook.v1:abc123 + + # Update an existing alert rule + python alert_rules.py update --id 12345 --name "Updated Name" --rule-type-id 10 --severity 2 + + # Update with specific conditions + python alert_rules.py update --id 12345 --name "Updated" --rule-type-id 10 --severity 2 \\ + --conditions allAccessRules= + + # Delete alert rules + python alert_rules.py delete --ids 12345 12346 + + # Enable alert rules + python alert_rules.py update-status --ids 12345 12346 --status 1 + + # Disable alert rules + python alert_rules.py update-status --ids 12345 --status 2 + +Condition Defaults (when --conditions is not specified): + Rule Type 10 (Access Rule Changes): allAccessRules= + Other rule types: allAccessRules= + +Note: Connectivity alerts (rule_type_id 1-2) require an 'operator' field in + conditions which is not supported by the current SDK. Use the UI for those. + +API Documentation: + https://developer.cisco.com/docs/cloud-security/list-alert-rules/ + https://developer.cisco.com/docs/cloud-security/get-alert-rule/ + https://developer.cisco.com/docs/cloud-security/create-alert-rule/ + https://developer.cisco.com/docs/cloud-security/update-alert-rule/ + https://developer.cisco.com/docs/cloud-security/delete-alert-rules/ + https://developer.cisco.com/docs/cloud-security/update-status-of-alert-rules/ +""" + ) + + subparsers = parser.add_subparsers(dest="command", help="Available commands") + + # List command + list_parser = subparsers.add_parser( + "list", + help="List all alert rules", + description="Retrieve and display all alert rules for the organization." + ) + list_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + # Get command + get_parser = subparsers.add_parser( + "get", + help="Get a specific alert rule by ID", + description="Retrieve details of a specific alert rule." + ) + get_parser.add_argument( + "--id", + type=int, + required=True, + help="The unique ID of the alert rule" + ) + get_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + # Create command + create_parser = subparsers.add_parser( + "create", + help="Create a new alert rule", + description="Create a new alert rule with the specified configuration." + ) + create_parser.add_argument( + "--name", + type=str, + required=True, + help="Name of the alert rule (max 255 characters)" + ) + create_parser.add_argument( + "--rule-type-id", + type=int, + required=True, + help="Rule type identifier" + ) + create_parser.add_argument( + "--severity", + type=int, + choices=[1, 2, 3], + required=True, + help="Severity level: 1=High, 2=Medium, 3=Low" + ) + create_parser.add_argument( + "--status", + type=int, + choices=[1, 2], + default=1, + help="Status: 1=Enabled (default), 2=Disabled" + ) + create_parser.add_argument( + "--description", + type=str, + help="Description of the alert rule (max 100 characters)" + ) + create_parser.add_argument( + "--conditions-match-type", + type=str, + choices=["all", "any"], + default="all", + help="Conditions match type: 'all' (default) or 'any'" + ) + create_parser.add_argument( + "--conditions", + type=str, + nargs="+", + metavar="FIELD=VALUE", + help="Condition rows as field=value pairs (e.g., allAccessRules= or tunnelGroupName=myTunnel). " + "If not specified, a default is applied based on --rule-type-id." + ) + create_parser.add_argument( + "--webhook-ids", + type=str, + nargs="+", + help="Webhook IDs for notifications" + ) + create_parser.add_argument( + "--email", + type=str, + nargs="+", + dest="email_recipients", + help="Email recipients for notifications" + ) + create_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + # Update command + update_parser = subparsers.add_parser( + "update", + help="Update an existing alert rule", + description="Update an existing alert rule. All fields must be provided." + ) + update_parser.add_argument( + "--id", + type=int, + required=True, + help="The unique ID of the alert rule to update" + ) + update_parser.add_argument( + "--name", + type=str, + required=True, + help="New name of the alert rule" + ) + update_parser.add_argument( + "--rule-type-id", + type=int, + required=True, + help="Rule type identifier" + ) + update_parser.add_argument( + "--severity", + type=int, + choices=[1, 2, 3], + required=True, + help="Severity level: 1=High, 2=Medium, 3=Low" + ) + update_parser.add_argument( + "--status", + type=int, + choices=[1, 2], + default=1, + help="Status: 1=Enabled (default), 2=Disabled" + ) + update_parser.add_argument( + "--description", + type=str, + help="Description of the alert rule (max 100 characters)" + ) + update_parser.add_argument( + "--conditions-match-type", + type=str, + choices=["all", "any"], + default="all", + help="Conditions match type: 'all' (default) or 'any'" + ) + update_parser.add_argument( + "--conditions", + type=str, + nargs="+", + metavar="FIELD=VALUE", + help="Condition rows as field=value pairs (e.g., allAccessRules= or tunnelGroupName=myTunnel). " + "If not specified, a default is applied based on --rule-type-id." + ) + update_parser.add_argument( + "--webhook-ids", + type=str, + nargs="+", + help="Webhook IDs for notifications" + ) + update_parser.add_argument( + "--email", + type=str, + nargs="+", + dest="email_recipients", + help="Email recipients for notifications" + ) + update_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + # Delete command + delete_parser = subparsers.add_parser( + "delete", + help="Delete alert rules", + description="Delete one or more alert rules by their IDs." + ) + delete_parser.add_argument( + "--ids", + type=int, + nargs="+", + required=True, + help="IDs of the alert rules to delete (max 100)" + ) + delete_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + # Update status command + status_parser = subparsers.add_parser( + "update-status", + help="Update the status of alert rules", + description="Enable or disable one or more alert rules." + ) + status_parser.add_argument( + "--ids", + type=int, + nargs="+", + required=True, + help="IDs of the alert rules to update (max 100)" + ) + status_parser.add_argument( + "--status", + type=int, + choices=[1, 2], + required=True, + help="New status: 1=Enabled, 2=Disabled" + ) + status_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + return parser + + +def print_result(data: Any, as_json: bool = False) -> None: + """ + Print the result data in the specified format. + + Args: + data: The data to print (can be SDK models, dicts, or lists). + as_json: If True, output as JSON; otherwise, print normally. + """ + # Convert SDK models to dicts for serialization + def to_serializable(obj): + if hasattr(obj, 'to_dict'): + return obj.to_dict() + elif isinstance(obj, list): + return [to_serializable(item) for item in obj] + return obj + + serializable_data = to_serializable(data) + + if as_json: + print(json.dumps(serializable_data, indent=2, default=str)) + else: + if isinstance(serializable_data, list): + for item in serializable_data: + print("-" * 60) + if isinstance(item, dict): + for key, value in item.items(): + print(f" {key}: {value}") + else: + print(f" {item}") + print("-" * 60) + elif isinstance(serializable_data, dict): + for key, value in serializable_data.items(): + print(f" {key}: {value}") + else: + print(serializable_data) + + +def parse_conditions(conditions: Optional[List[str]]) -> Optional[List[Dict[str, str]]]: + """ + Parse condition strings from CLI into list of dicts. + + Each condition is in the format 'field=value' (value can be empty). + + Args: + conditions: List of 'field=value' strings, or None. + + Returns: + List of dicts with 'field' and 'value' keys, or None. + """ + if not conditions: + return None + rows = [] + for cond in conditions: + if "=" in cond: + field, value = cond.split("=", 1) + rows.append({"field": field, "value": value}) + else: + rows.append({"field": cond, "value": ""}) + return rows + + +def main() -> int: + """ + Main entry point for the CLI. + + Returns: + int: Exit code (0 for success, 1 for failure). + """ + parser = setup_argparse() + args = parser.parse_args() + + if not args.command: + parser.print_help() + return 1 + + try: + client = AlertRulesClient() + + if args.command == "list": + rules = client.list_alert_rules() + print(f"\nFound {len(rules)} alert rule(s):\n") + print_result(rules, args.json) + + elif args.command == "get": + rule = client.get_alert_rule_by_id(args.id) + if rule: + print(f"\nAlert Rule Details:\n") + print_result(rule, args.json) + else: + print(f"Alert rule with ID {args.id} not found.") + return 1 + + elif args.command == "create": + conditions_rows = parse_conditions(getattr(args, 'conditions', None)) + rule = client.create_alert_rule( + name=args.name, + rule_type_id=args.rule_type_id, + severity=args.severity, + status=args.status, + description=args.description, + conditions_match_type=args.conditions_match_type, + conditions_rows=conditions_rows, + webhook_ids=args.webhook_ids, + email_recipients=args.email_recipients, + ) + + if rule: + print(f"\nAlert Rule:\n") + print_result(rule, args.json) + else: + print("Failed to create alert rule.") + return 1 + + elif args.command == "update": + conditions_rows = parse_conditions(getattr(args, 'conditions', None)) + rule = client.update_alert_rule( + rule_id=args.id, + name=args.name, + rule_type_id=args.rule_type_id, + severity=args.severity, + status=args.status, + description=args.description, + conditions_match_type=args.conditions_match_type, + conditions_rows=conditions_rows, + webhook_ids=args.webhook_ids, + email_recipients=args.email_recipients, + ) + + if rule: + print(f"\nAlert Rule Updated:\n") + print_result(rule, args.json) + else: + print(f"Failed to update alert rule with ID {args.id}.") + return 1 + + elif args.command == "delete": + result = client.delete_alert_rules(args.ids) + if result: + print(f"\nDeletion Result:\n") + print_result(result, args.json) + else: + print(f"Failed to delete alert rules: {args.ids}") + return 1 + + elif args.command == "update-status": + result = client.update_alert_rules_status(args.ids, args.status) + status_text = "Enabled" if args.status == 1 else "Disabled" + if result: + print(f"\nStatus Update Result ({status_text}):\n") + print_result(result, args.json) + else: + print(f"Failed to update status for alert rules: {args.ids}") + return 1 + + return 0 + + except ValueError as e: + logger.error(f"Validation error: {e}") + return 1 + except ApiException as e: + logger.error(f"API error: {e}") + return 1 + except Exception as e: + logger.error(f"Unexpected error: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/complex_example.py b/examples/complex_example.py new file mode 100644 index 0000000..4dfda86 --- /dev/null +++ b/examples/complex_example.py @@ -0,0 +1,744 @@ +# Copyright 2026 Cisco Systems, Inc. and its affiliates +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Cisco Secure Access API Client - Class-based implementation with idempotent operations. + +This module provides a class-based wrapper around the Cisco Secure Access API, +handling authentication, resource creation with idempotency checks, and proper logging. + +Usage: + # Run all operations (create destination list, network tunnel groups, private resources, and access policy) + python complex_example.py --operation all + + # Create only destination list + python complex_example.py --operation destination-list + + # Create only network tunnel groups + python complex_example.py --operation network-tunnel-groups + + # Create only private resources + python complex_example.py --operation private-resources + + # Create access policy (requires existing network tunnel group and private resource IDs) + python complex_example.py --operation access-policy --ntg-id --pr-id + + # List existing network tunnel groups (without creating) + python complex_example.py --operation list-network-tunnel-groups + + # List existing private resources (without creating) + python complex_example.py --operation list-private-resources + + # Get identities + python complex_example.py --operation identities + + # Enable verbose logging + python complex_example.py --operation all --verbose +""" + +import argparse +import logging +import sys +from typing import Any, List, Tuple, Optional + +from access_token import get_valid_access_token +from secure_access.api.destination_lists_api import DestinationListsApi +from secure_access.models.destination_list_create import DestinationListCreate +from secure_access.models.destination_list_create_destinations_inner import DestinationListCreateDestinationsInner +from secure_access.models.add_network_tunnel_group_request import AddNetworkTunnelGroupRequest +from secure_access.models.add_network_tunnel_group_request_auth_id_prefix import AddNetworkTunnelGroupRequestAuthIdPrefix +from secure_access.models.device_type import DeviceType +from secure_access.models.routing_request_data import RoutingRequestData +from secure_access.models.static_data_request_obj import StaticDataRequestObj +from secure_access.api.network_tunnel_groups_api import NetworkTunnelGroupsApi +from secure_access.models.routing_request import RoutingRequest +from secure_access.models.private_resource_request import PrivateResourceRequest +from secure_access.models.access_types_request_inner import AccessTypesRequestInner +from secure_access.models.network_based_access import NetworkBasedAccess +from secure_access.models.resource_addresses_inner import ResourceAddressesInner +from secure_access.models.resource_addresses_inner_protocol_ports_inner import ResourceAddressesInnerProtocolPortsInner +from secure_access.api.private_resources_api import PrivateResourcesApi +from secure_access.models.client_based_access import ClientBasedAccess +from secure_access.models.add_rule_request import AddRuleRequest +from secure_access.models.rule_action import RuleAction +from secure_access.models.rule_settings_inner import RuleSettingsInner +from secure_access.models.rule_conditions_inner import RuleConditionsInner +from secure_access.api.access_rules_api import AccessRulesApi +from secure_access.api.identities_api import IdentitiesApi +from secure_access.models.access import Access +from secure_access.models.attribute_name import AttributeName +from secure_access.models.attribute_value import AttributeValue +from secure_access.models.setting_value import SettingValue + + +# Logger instance (configured in setup_logging) +logger = logging.getLogger(__name__) + + +class CiscoSecureAccessClient: + """Client for interacting with Cisco Secure Access APIs with idempotent operations.""" + + def __init__(self): + """Initialize the client with authentication token.""" + try: + self.access_token = get_valid_access_token() + logger.info("Successfully obtained access token") + except Exception as e: + logger.error(f"Failed to obtain access token: {e}") + raise + + def _set_authorization_header(self, api_client: Any) -> None: + """ + Set the Authorization header for an API client. + + Args: + api_client: The API client instance to configure. + """ + api_client.api_client.set_default_header( + "Authorization", f"Bearer {self.access_token}" + ) + + @staticmethod + def _build_destination_list() -> DestinationListCreate: + """Build a destination list request object.""" + destination1 = DestinationListCreateDestinationsInner( + comment="First warning url managed by SDK", + type="ipv4", + destination="127.0.0.1" + ) + destination2 = DestinationListCreateDestinationsInner( + comment="Second warning url managed by SDK", + type="url", + destination="http://foo.bar/blockwarn" + ) + destination3 = DestinationListCreateDestinationsInner( + comment="Next warning url managed by SDK", + type="domain", + destination="warn.foo.bar" + ) + + return DestinationListCreate( + name="Test Destination List", + destinations=[destination1, destination2, destination3], + is_global=False, + access=Access("block"), + bundle_type_id=2 + ) + + @staticmethod + def _build_network_tunnel_group_1() -> AddNetworkTunnelGroupRequest: + """Build the first network tunnel group request object.""" + return AddNetworkTunnelGroupRequest( + region="us-test-1", + name="Test Network Tunnel Group", + device_type=DeviceType("other"), + auth_id_prefix=AddNetworkTunnelGroupRequestAuthIdPrefix("test-prefix-1"), + routing=RoutingRequest( + type="static", + data=RoutingRequestData( + StaticDataRequestObj(network_cidrs=["10.17.176.0/24"]) + ) + ), + passphrase="Testpassphrase123" + ) + + @staticmethod + def _build_network_tunnel_group_2() -> AddNetworkTunnelGroupRequest: + """Build the second network tunnel group request object.""" + return AddNetworkTunnelGroupRequest( + region="us-test-1", + name="Test Network Tunnel Group 2", + device_type=DeviceType("other"), + auth_id_prefix=AddNetworkTunnelGroupRequestAuthIdPrefix("test-prefix-2"), + routing=RoutingRequest( + type="static", + data=RoutingRequestData( + StaticDataRequestObj(network_cidrs=["10.17.178.0/24"]) + ) + ), + passphrase="Testpassphrase123" + ) + + @staticmethod + def _build_private_resource_1() -> PrivateResourceRequest: + """Build the first private resource request object.""" + return PrivateResourceRequest( + name="Test Private Resource 1", + description="This is a test private resource 1", + access_types=[AccessTypesRequestInner(NetworkBasedAccess(type="network"))], + resource_addresses=[ResourceAddressesInner( + destination_addr=["test.example.com", "192.168.1.1", "10.17.178.2/32"], + protocol_ports=[ + ResourceAddressesInnerProtocolPortsInner(protocol="TCP", ports="22,80,443") + ] + )] + ) + + @staticmethod + def _build_private_resource_2() -> PrivateResourceRequest: + """Build the second private resource request object.""" + return PrivateResourceRequest( + name="Test Private Resource 2", + description="This is a test private resource 2", + access_types=[AccessTypesRequestInner( + ClientBasedAccess(type="client", reachableAddresses=["10.10.10.3"]) + )], + resource_addresses=[ResourceAddressesInner( + destination_addr=["test.example.com", "192.168.1.1", "10.17.178.2/32"], + protocol_ports=[ + ResourceAddressesInnerProtocolPortsInner(protocol="TCP", ports="22,80,443") + ] + )] + ) + + @staticmethod + def _build_access_policy_1( + network_tunnel_group_id: str, private_resource_id: str + ) -> AddRuleRequest: + """Build the first access policy request object.""" + return AddRuleRequest( + rule_name="Test Access Policy 1", + rule_description="This is a test access policy 1", + rule_action=RuleAction("allow"), + rule_is_enabled=True, + rule_settings=[ + RuleSettingsInner( + setting_name="umbrella.logLevel", + setting_value=SettingValue("LOG_ALL") + ), + RuleSettingsInner( + setting_name="umbrella.default.traffic", + setting_value=SettingValue("PRIVATE_NETWORK") + ) + ], + rule_conditions=[ + RuleConditionsInner( + attribute_name=AttributeName("umbrella.source.identity_ids"), + attribute_value=AttributeValue([int(network_tunnel_group_id)]), + attribute_operator="INTERSECT" + ), + RuleConditionsInner( + attribute_name=AttributeName("umbrella.destination.private_resource_ids"), + attribute_value=AttributeValue([int(private_resource_id)]), + attribute_operator="IN" + ) + ] + ) + + @staticmethod + def _build_access_policy_2( + identity_id: str, destination_list_id: str + ) -> AddRuleRequest: + """Build the second access policy request object.""" + return AddRuleRequest( + rule_name="Test Access Policy 2", + rule_description="This is a test access policy 2", + rule_action=RuleAction("block"), + rule_is_enabled=True, + rule_settings=[ + RuleSettingsInner( + setting_name="umbrella.logLevel", + setting_value=SettingValue("LOG_ALL") + ), + RuleSettingsInner( + setting_name="umbrella.default.traffic", + setting_value=SettingValue("PRIVATE_NETWORK") + ) + ], + rule_conditions=[ + RuleConditionsInner( + attribute_name=AttributeName("umbrella.source.identity_ids"), + attribute_value=AttributeValue([int(identity_id)]), + attribute_operator="=" + ), + RuleConditionsInner( + attribute_name=AttributeName("umbrella.destination.destination_list_ids"), + attribute_value=AttributeValue([int(destination_list_id)]), + attribute_operator="=" + ) + ] + ) + + def create_destination_list(self) -> Any: + """ + Create or retrieve an existing destination list. + + Returns: + The destination list object (existing or newly created). + """ + api_client = DestinationListsApi() + self._set_authorization_header(api_client) + + destination_list_body = self._build_destination_list() + list_name = destination_list_body.name + + try: + logger.info(f"Checking for existing destination list: '{list_name}'") + existing_lists = api_client.get_destination_lists_without_preload_content(limit=100) + for existing_list in existing_lists.json().get("data", []): + if existing_list["name"] == list_name: + logger.info( + f"Destination list '{list_name}' already exists with ID: {existing_list['id']}" + ) + return existing_list + + logger.info(f"Creating new destination list '{list_name}'...") + response = api_client.create_destination_list_without_preload_content( + destination_list_body + ) + logger.info(f"Successfully created destination list '{list_name}'") + return response.json().get("data",[]) + except Exception as e: + logger.error(f"Failed to create destination list '{list_name}': {e}") + raise + + def create_network_tunnel_groups(self) -> Tuple[Any, Any]: + """ + Create or retrieve existing network tunnel groups. + + Returns: + Tuple of two network tunnel group objects (existing or newly created). + """ + api_client = NetworkTunnelGroupsApi() + self._set_authorization_header(api_client) + + ntg_body_1 = self._build_network_tunnel_group_1() + ntg_body_2 = self._build_network_tunnel_group_2() + + try: + logger.info("Fetching existing network tunnel groups") + existing_groups = api_client.list_network_tunnel_groups_without_preload_content(limit=100) + existing_names = {group["name"] for group in existing_groups.json().get("data", [])} + + responses = [] + for body in [ntg_body_1, ntg_body_2]: + if body.name in existing_names: + logger.info(f"Network tunnel group '{body.name}' already exists") + for group in existing_groups.json().get("data", []): + if group["name"] == body.name: + responses.append(group) + break + else: + logger.info(f"Creating new network tunnel group '{body.name}'...") + response = api_client.add_network_tunnel_group_without_preload_content(body) + responses.append(response.json()) + logger.info(f"Successfully created network tunnel group '{body.name}'") + + return responses[0], responses[1] + except Exception as e: + logger.error(f"Failed to create network tunnel groups: {e}") + raise + + def create_private_resources(self) -> Tuple[Any, Any]: + """ + Create or retrieve existing private resources. + + Returns: + Tuple of two private resource objects (existing or newly created). + """ + api_client = PrivateResourcesApi() + self._set_authorization_header(api_client) + + pr_body_1 = self._build_private_resource_1() + pr_body_2 = self._build_private_resource_2() + + try: + logger.info("Fetching existing private resources") + existing_resources = api_client.list_private_resources_without_preload_content(limit=100) + existing_names = { + resource["name"] for resource in existing_resources.json().get("items", []) + } + logger.debug(f"Existing private resource names: {existing_names}") + + responses = [] + for body in [pr_body_1, pr_body_2]: + if body.name in existing_names: + logger.info(f"Private resource '{body.name}' already exists") + for resource in existing_resources.json().get("items", []): + if resource["name"] == body.name: + responses.append(resource) + break + else: + logger.info(f"Creating new private resource '{body.name}'...") + response = api_client.add_private_resource_without_preload_content(body) + responses.append(response.json()) + logger.info(f"Successfully created private resource '{body.name}'") + + return responses[0], responses[1] + except Exception as e: + logger.error(f"Failed to create private resources: {e}") + raise + + def list_network_tunnel_groups(self) -> List[Any]: + """ + List all existing network tunnel groups. + + Returns: + List of network tunnel group objects. + """ + api_client = NetworkTunnelGroupsApi() + self._set_authorization_header(api_client) + + try: + logger.info("Fetching existing network tunnel groups") + response = api_client.list_network_tunnel_groups_without_preload_content(limit=100) + groups = response.json().get("data", []) + logger.info(f"Found {len(groups)} network tunnel groups") + return groups + except Exception as e: + logger.error(f"Failed to list network tunnel groups: {e}") + raise + + def list_private_resources(self) -> List[Any]: + """ + List all existing private resources. + + Returns: + List of private resource objects. + """ + api_client = PrivateResourcesApi() + self._set_authorization_header(api_client) + + try: + logger.info("Fetching existing private resources") + response = api_client.list_private_resources_without_preload_content(limit=100) + resources = response.json().get("items", []) + logger.info(f"Found {len(resources)} private resources") + return resources + except Exception as e: + logger.error(f"Failed to list private resources: {e}") + raise + + def get_identities(self) -> Any: + """ + Get the identities of the current user. + + Returns: + The identities response. + """ + api_client = IdentitiesApi() + self._set_authorization_header(api_client) + + try: + logger.info("Fetching identities for current user") + response = api_client.get_identities(type="securityGroupTag", label="") + logger.info("Successfully fetched identities") + return response + except Exception as e: + logger.error(f"Failed to fetch identities: {e}") + raise + + def create_access_policy_1( + self, network_tunnel_group_id: str, private_resource_id: str + ) -> Any: + """ + Create or retrieve existing access policy 1. + + Args: + network_tunnel_group_id: ID of the network tunnel group. + private_resource_id: ID of the private resource. + + Returns: + The access policy object (existing or newly created). + """ + api_client = AccessRulesApi() + self._set_authorization_header(api_client) + + access_policy_body = self._build_access_policy_1( + network_tunnel_group_id, private_resource_id + ) + rule_name = access_policy_body.rule_name + + try: + logger.info(f"Checking for existing access policy: '{rule_name}'") + existing_rules = api_client.list_rules_without_preload_content( + limit=100, rule_name=rule_name + ) + results = existing_rules.json().get("results", []) + if results: + rule_id = results[0].get("ruleId") + logger.info( + f"Access policy '{rule_name}' already exists with ID: {rule_id}" + ) + return results[0] + + logger.info(f"Creating new access policy '{rule_name}'...") + response = api_client.add_rule(access_policy_body) + logger.info(f"Successfully created access policy '{rule_name}'") + return response + except Exception as e: + logger.error(f"Failed to create access policy '{rule_name}': {e}") + raise + + def create_access_policy_2( + self, identity_id: str, destination_list_id: str + ) -> Any: + """ + Create or retrieve existing access policy 2. + + Args: + identity_id: ID of the identity. + destination_list_id: ID of the destination list. + + Returns: + The access policy object (existing or newly created). + """ + api_client = AccessRulesApi() + self._set_authorization_header(api_client) + + access_policy_body = self._build_access_policy_2( + identity_id, destination_list_id + ) + rule_name = access_policy_body.rule_name + + try: + logger.info(f"Checking for existing access policy: '{rule_name}'") + existing_rules = api_client.list_rules(limit=100, rule_name=rule_name) + results = existing_rules.json().get("results", []) + if results: + rule_id = results[0].get("ruleId") + logger.info( + f"Access policy '{rule_name}' already exists with ID: {rule_id}" + ) + return results[0] + + logger.info(f"Creating new access policy '{rule_name}'...") + response = api_client.add_rule(access_policy_body) + logger.info(f"Successfully created access policy '{rule_name}'") + return response + except Exception as e: + logger.error(f"Failed to create access policy '{rule_name}': {e}") + raise + + +def setup_logging(verbose: bool = False) -> None: + """ + Configure logging based on verbosity level. + + Args: + verbose: If True, set logging level to DEBUG; otherwise INFO. + """ + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + +def parse_arguments() -> argparse.Namespace: + """ + Parse command-line arguments. + + Returns: + Parsed arguments namespace. + """ + parser = argparse.ArgumentParser( + description="Cisco Secure Access API Client - Create and manage resources with idempotent operations.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Run all operations (destination list, network tunnel groups, private resources, access policy) + python complex_example.py --operation all + + # Create only a destination list + python complex_example.py --operation destination-list + + # Create only network tunnel groups + python complex_example.py --operation network-tunnel-groups + + # Create only private resources + python complex_example.py --operation private-resources + + # Create access policy with specific IDs (use when resources already exist) + python complex_example.py --operation access-policy --ntg-id 12345 --pr-id 67890 + + # List existing network tunnel groups (without creating) + python complex_example.py --operation list-network-tunnel-groups + + # List existing private resources (without creating) + python complex_example.py --operation list-private-resources + + # Get identities for current user + python complex_example.py --operation identities + + # Enable verbose/debug logging + python complex_example.py --operation all --verbose + +Environment Variables: + CLIENT_ID Cisco Secure Access API client ID + CLIENT_SECRET Cisco Secure Access API client secret + """ + ) + + parser.add_argument( + '-o', '--operation', + help="Operation to perform", + required=True, + choices=[ + 'all', + 'destination-list', + 'network-tunnel-groups', + 'private-resources', + 'access-policy', + 'list-network-tunnel-groups', + 'list-private-resources', + 'identities' + ], + type=str + ) + + parser.add_argument( + '--ntg-id', + help="Network Tunnel Group ID (required for 'access-policy' operation when not running 'all')", + required=False, + type=str + ) + + parser.add_argument( + '--pr-id', + help="Private Resource ID (required for 'access-policy' operation when not running 'all')", + required=False, + type=str + ) + + parser.add_argument( + '-v', '--verbose', + help="Enable verbose/debug logging", + action='store_true', + default=False + ) + + return parser.parse_args() + + +def run_all_operations(client: CiscoSecureAccessClient) -> None: + """ + Run all operations: create destination list, network tunnel groups, + private resources, and access policy. + + Args: + client: Initialized CiscoSecureAccessClient instance. + """ + # Create destination list + logger.info("Creating destination list...") + destination_list = client.create_destination_list() + destination_list_id = destination_list.get("id") + logger.info(f"Destination List ID: {destination_list_id}") + + # Create network tunnel groups + logger.info("Creating network tunnel groups...") + response2 = client.create_network_tunnel_groups() + network_tunnel_group_1_id = response2[0].get("id") + logger.info(f"Network Tunnel Group 1 ID: {network_tunnel_group_1_id}") + + # Create private resources + logger.info("Creating private resources...") + response3 = client.create_private_resources() + private_resource_1_id = response3[0].get("resourceId") + logger.info(f"Private Resource 1 ID: {private_resource_1_id}") + + # Create access policy + logger.info("Creating access policies...") + response4 = client.create_access_policy_1( + network_tunnel_group_1_id, private_resource_1_id + ) + logger.info("Access policy 1 created successfully") + + # Get identities for current user (commented out - uncomment if needed) + # logger.info("Fetching identities for current user...") + # identities_response = client.get_identities() + # identity_id = identities_response.data[0].id + # logger.info(f"Identity ID: {identity_id}") + + # Create second access policy (commented out - uncomment if needed) + # response5 = client.create_access_policy_2(identity_id, destination_list.id) + # logger.info("Access policy 2 created successfully") + + logger.info("All resources created successfully") + + +def main(): + """Main execution function with CLI support.""" + args = parse_arguments() + + # Setup logging based on verbosity + setup_logging(args.verbose) + + logger.info("Starting Cisco Secure Access client") + + try: + client = CiscoSecureAccessClient() + logger.info("Client initialized successfully") + + if args.operation == 'all': + run_all_operations(client) + + elif args.operation == 'destination-list': + logger.info("Creating destination list...") + destination_list = client.create_destination_list() + destination_list_id = destination_list.get("id") + logger.info(f"Destination List ID: {destination_list_id}") + print(f"Destination List created/found with ID: {destination_list_id}") + + elif args.operation == 'network-tunnel-groups': + logger.info("Creating network tunnel groups...") + response = client.create_network_tunnel_groups() + ntg1_id = response[0].get("id") + ntg2_id = response[1].get("id") + logger.info(f"Network Tunnel Group 1 ID: {ntg1_id}") + logger.info(f"Network Tunnel Group 2 ID: {ntg2_id}") + print(f"Network Tunnel Groups created/found with IDs: {ntg1_id}, {ntg2_id}") + + elif args.operation == 'private-resources': + logger.info("Creating private resources...") + response = client.create_private_resources() + pr1_id = response[0].get("resourceId") + pr2_id = response[1].get("resourceId") + logger.info(f"Private Resource 1 ID: {pr1_id}") + logger.info(f"Private Resource 2 ID: {pr2_id}") + print(f"Private Resources created/found with IDs: {pr1_id}, {pr2_id}") + + elif args.operation == 'access-policy': + if not args.ntg_id or not args.pr_id: + logger.error( + "Both --ntg-id and --pr-id are required for 'access-policy' operation" + ) + print("Error: Both --ntg-id and --pr-id are required for 'access-policy' operation") + sys.exit(1) + + logger.info("Creating access policy...") + response = client.create_access_policy_1(args.ntg_id, args.pr_id) + rule_id = response.get("ruleId") if hasattr(response, 'get') else getattr(response, 'rule_id', None) + logger.info(f"Access Policy created/found with ID: {rule_id}") + print(f"Access Policy created/found with ID: {rule_id}") + + elif args.operation == 'list-network-tunnel-groups': + logger.info("Listing network tunnel groups...") + groups = client.list_network_tunnel_groups() + print(f"\nFound {len(groups)} Network Tunnel Groups:") + for group in groups: + print(f" ID: {group.get('id')}, Name: {group.get('name')}") + + elif args.operation == 'list-private-resources': + logger.info("Listing private resources...") + resources = client.list_private_resources() + print(f"\nFound {len(resources)} Private Resources:") + for resource in resources: + print(f" ID: {resource.get('resourceId')}, Name: {resource.get('name')}") + + elif args.operation == 'identities': + logger.info("Fetching identities for current user...") + identities_response = client.get_identities() + logger.info("Successfully fetched identities") + print(f"Identities: {identities_response}") + + logger.info("Operation completed successfully") + + except Exception as e: + logger.error(f"Failed to complete operations: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/config.py b/examples/config.py index ea6e230..8927655 100644 --- a/examples/config.py +++ b/examples/config.py @@ -1,4 +1,4 @@ -# Copyright 2025 Cisco Systems, Inc. and its affiliates +# Copyright 2026 Cisco Systems, Inc. and its affiliates # # SPDX-License-Identifier: Apache-2.0 diff --git a/examples/destination_list_manager.py b/examples/destination_list_manager.py index de89836..2c01d49 100644 --- a/examples/destination_list_manager.py +++ b/examples/destination_list_manager.py @@ -1,4 +1,4 @@ -# Copyright 2025 Cisco Systems, Inc. and its affiliates +# Copyright 2026 Cisco Systems, Inc. and its affiliates # # SPDX-License-Identifier: Apache-2.0 diff --git a/examples/dlp_rule_events.py b/examples/dlp_rule_events.py new file mode 100644 index 0000000..4ed5a86 --- /dev/null +++ b/examples/dlp_rule_events.py @@ -0,0 +1,861 @@ +# Copyright 2026 Cisco Systems, Inc. and its affiliates +# +# SPDX-License-Identifier: Apache-2.0 + +""" +DLP Rule Events Management Example for Cisco Secure Access API. + +This module provides a comprehensive example of retrieving DLP rule events using +the Cisco Secure Access Python SDK. It demonstrates read operations for: +- List Real-Time DLP rule events +- List SaaS API DLP rule events +- List AI Guardrails DLP rule events +- Get DLP event details by ID + +Usage: + python dlp_rule_events.py list-realtime --from="-1days" --to="now" + python dlp_rule_events.py list-saas --from="-7days" --to="now" --severity WARNING + python dlp_rule_events.py list-ai-guardrails --from="-1days" --to="now" + python dlp_rule_events.py get --event-type realTime --id + python dlp_rule_events.py --region eu list-realtime --from="-1days" --to="now" + +Note: Use --from="value" syntax (with =) for negative values like "-1days" + +API Base URLs (DLP Events uses regional endpoints): + - US (default): https://api.sse.cisco.com/reports.us/v2 + - EU: https://api.sse.cisco.com/reports.eu/v2 + +Requirements: + - Set CLIENT_ID and CLIENT_SECRET environment variables + - Ensure all dependencies in requirements.txt are installed + +API Documentation: + - https://developer.cisco.com/docs/cloud-security/list-real-time-dlp-rule-events/ + - https://developer.cisco.com/docs/cloud-security/list-saas-api-dlp-rule-events/ + - https://developer.cisco.com/docs/cloud-security/list-ai-guardrails-dlp-rule-events/ + - https://developer.cisco.com/docs/cloud-security/get-dlp-event-details/ +""" + +import argparse +import json +import logging +import sys +from typing import Any, Dict, Optional + +from access_token import get_valid_access_token +from secure_access.api.dlp_rule_events_api import DLPRuleEventsApi +from secure_access.exceptions import ApiException + +# Configure logging +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +# Valid values for filter parameters +VALID_ACTIONS = ["block", "delete", "monitor", "quarantine", "revoke_sharing"] +VALID_SEVERITIES = ["INFO", "WARNING", "CRITICAL"] +VALID_EXPOSURES = ["PUBLIC", "INTERNAL", "EXTERNAL"] +VALID_EVENT_TYPES = ["realTime", "saasApi", "aiGuardrails"] +VALID_REGIONS = ["us", "eu"] + +# DLP Events API uses a different base URL with region +# DLP_REPORTS_BASE_URL = "https://api.sse.cisco.com/reports.{region}/v2" +DLP_REPORTS_BASE_URL = "https://api.umbrella.com/reports.{region}/v2" + + +class DLPRuleEventsClient: + """ + Client for retrieving Cisco Secure Access DLP Rule Events. + + Provides methods for querying DLP events including: + - Listing Real-Time DLP rule events + - Listing SaaS API DLP rule events + - Listing AI Guardrails DLP rule events + - Getting specific DLP event details by ID + + Note: DLP Events API uses a regional endpoint: + - US: https://api.sse.cisco.com/reports.us/v2 + - EU: https://api.sse.cisco.com/reports.eu/v2 + """ + + def __init__(self, region: str = "us") -> None: + """ + Initialize the DLPRuleEventsClient with authentication. + + Args: + region (str): The region for the DLP events API. + Valid values: 'us' (default), 'eu' + + Raises: + ValueError: If region is invalid or environment variables are not set. + Exception: If access token generation fails. + """ + if region not in VALID_REGIONS: + raise ValueError(f"Invalid region '{region}'. Must be one of: {', '.join(VALID_REGIONS)}") + + self.region = region + self.base_url = DLP_REPORTS_BASE_URL.format(region=region) + + try: + self.access_token = get_valid_access_token() + logger.info(f"Successfully obtained access token (region: {region})") + except Exception as e: + logger.error(f"Failed to obtain access token: {e}") + raise + + def _get_api_client(self) -> DLPRuleEventsApi: + """ + Create and configure a DLPRuleEventsApi client with authorization header. + + Returns: + DLPRuleEventsApi: Configured API client instance. + """ + from secure_access.configuration import Configuration + from secure_access.api_client import ApiClient + + # Configure with the regional reports URL + configuration = Configuration(host=self.base_url, access_token=self.access_token) + client = ApiClient(configuration=configuration) + + api_client = DLPRuleEventsApi(api_client=client) + return api_client + + # ========================================================================= + # LIST REAL-TIME DLP RULE EVENTS + # ========================================================================= + def list_realtime_events( + self, + var_from: str, + to: str, + action: Optional[str] = None, + severity: Optional[str] = None, + identity_type: Optional[str] = None, + application_id: Optional[int] = None, + application_category_id: Optional[int] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + ) -> Dict[str, Any]: + """ + List Real-Time DLP rule events. + + Get the Real-Time DLP rule events triggered by rules applied to + private or internet resources. + + Uses the without_preload_content variant to bypass SDK model validation, + which may reject valid API responses with newer enum values. + + Args: + var_from (str): Timestamp or relative time string (e.g., '-1days'). + Filters for data after this time. + to (str): Timestamp or relative time string (e.g., 'now'). + Filters for data before this time. + action (Optional[str]): Filter by action (blocked, deleted, + monitored, quarantined, restored, revoked). + severity (Optional[str]): Filter by severity (INFO, WARNING, CRITICAL). + identity_type (Optional[str]): Filter by identity type + (e.g., directory_user, directory_group). + application_id (Optional[int]): Filter by application ID. + application_category_id (Optional[int]): Filter by application category ID. + limit (Optional[int]): Maximum number of events to return (default: 50). + offset (Optional[int]): Index offset for pagination (default: 0). + + Returns: + Dict[str, Any]: List of Real-Time DLP rule events as raw dict. + + Raises: + ApiException: If the API request fails. + + Example: + >>> client = DLPRuleEventsClient() + >>> events = client.list_realtime_events( + ... var_from="-1days", + ... to="now", + ... severity="CRITICAL" + ... ) + >>> for event in events.get("events", []): + ... print(f"Event: {event['eventId']}") + """ + api_client = self._get_api_client() + logger.info(f"Retrieving Real-Time DLP events from '{var_from}' to '{to}'...") + + try: + response = api_client.get_all_real_time_events_without_preload_content( + var_from=var_from, + to=to, + action=action, + severity=severity, + identity_type=identity_type, + application_id=application_id, + application_category_id=application_category_id, + limit=limit, + offset=offset, + ) + events = json.loads(response.data) + event_count = len(events.get("events", [])) + logger.info(f"Successfully retrieved {event_count} Real-Time DLP events") + return events + + except ApiException as e: + logger.error(f"API error listing Real-Time DLP events: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error listing Real-Time DLP events: {e}") + raise + + # ========================================================================= + # LIST SAAS API DLP RULE EVENTS + # ========================================================================= + def list_saas_api_events( + self, + var_from: str, + to: str, + action: Optional[str] = None, + severity: Optional[str] = None, + identity_type: Optional[str] = None, + application_id: Optional[int] = None, + application_category_id: Optional[int] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + exposure: Optional[str] = None, + ) -> Dict[str, Any]: + """ + List SaaS API DLP rule events. + + Get SaaS API DLP rule events triggered by rules applied to SaaS applications. + + Uses the without_preload_content variant to bypass SDK model validation. + + Args: + var_from (str): Timestamp or relative time string (e.g., '-1days'). + Filters for data after this time. + to (str): Timestamp or relative time string (e.g., 'now'). + Filters for data before this time. + action (Optional[str]): Filter by action (blocked, deleted, + monitored, quarantined, restored, revoked). + severity (Optional[str]): Filter by severity (INFO, WARNING, CRITICAL). + identity_type (Optional[str]): Filter by identity type + (e.g., directory_user, directory_group). + application_id (Optional[int]): Filter by application ID. + application_category_id (Optional[int]): Filter by application category ID. + limit (Optional[int]): Maximum number of events to return (default: 50). + offset (Optional[int]): Index offset for pagination (default: 0). + exposure (Optional[str]): Filter by exposure level (PUBLIC, INTERNAL, EXTERNAL). + + Returns: + Dict[str, Any]: List of SaaS API DLP rule events as raw dict. + + Raises: + ApiException: If the API request fails. + + Example: + >>> client = DLPRuleEventsClient() + >>> events = client.list_saas_api_events( + ... var_from="-7days", + ... to="now", + ... exposure="EXTERNAL" + ... ) + """ + api_client = self._get_api_client() + logger.info(f"Retrieving SaaS API DLP events from '{var_from}' to '{to}'...") + + try: + response = api_client.get_all_saa_sapi_events_without_preload_content( + var_from=var_from, + to=to, + action=action, + severity=severity, + identity_type=identity_type, + application_id=application_id, + application_category_id=application_category_id, + limit=limit, + offset=offset, + exposure=exposure, + ) + events = json.loads(response.data) + event_count = len(events.get("events", [])) + logger.info(f"Successfully retrieved {event_count} SaaS API DLP events") + return events + + except ApiException as e: + logger.error(f"API error listing SaaS API DLP events: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error listing SaaS API DLP events: {e}") + raise + + # ========================================================================= + # LIST AI GUARDRAILS DLP RULE EVENTS + # ========================================================================= + def list_ai_guardrails_events( + self, + var_from: str, + to: str, + action: Optional[str] = None, + severity: Optional[str] = None, + identity_type: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + ) -> Dict[str, Any]: + """ + List AI Guardrails DLP rule events. + + Get the AI Guardrails DLP rule events triggered by rules applied + to AI Guardrails use cases. + + Uses the without_preload_content variant to bypass SDK model validation. + + Args: + var_from (str): Timestamp or relative time string (e.g., '-1days'). + Filters for data after this time. + to (str): Timestamp or relative time string (e.g., 'now'). + Filters for data before this time. + action (Optional[str]): Filter by action (blocked, deleted, + monitored, quarantined, restored, revoked). + severity (Optional[str]): Filter by severity (INFO, WARNING, CRITICAL). + identity_type (Optional[str]): Filter by identity type + (e.g., directory_user, directory_group). + limit (Optional[int]): Maximum number of events to return (default: 50). + offset (Optional[int]): Index offset for pagination (default: 0). + + Returns: + Dict[str, Any]: List of AI Guardrails DLP rule events as raw dict. + + Raises: + ApiException: If the API request fails. + + Example: + >>> client = DLPRuleEventsClient() + >>> events = client.list_ai_guardrails_events( + ... var_from="-1days", + ... to="now" + ... ) + """ + api_client = self._get_api_client() + logger.info(f"Retrieving AI Guardrails DLP events from '{var_from}' to '{to}'...") + + try: + response = api_client.get_all_ai_guardrails_events_without_preload_content( + var_from=var_from, + to=to, + action=action, + severity=severity, + identity_type=identity_type, + limit=limit, + offset=offset, + ) + events = json.loads(response.data) + event_count = len(events.get("events", [])) + logger.info(f"Successfully retrieved {event_count} AI Guardrails DLP events") + return events + + except ApiException as e: + logger.error(f"API error listing AI Guardrails DLP events: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error listing AI Guardrails DLP events: {e}") + raise + + # ========================================================================= + # GET DLP EVENT DETAILS BY ID + # ========================================================================= + def get_event_details( + self, + event_type: str, + event_id: str, + ) -> Optional[Dict[str, Any]]: + """ + Get detailed information about a specific DLP event by its ID. + + Uses the without_preload_content variant to bypass SDK model validation. + + Args: + event_type (str): The type of event. Valid values: + - 'realTime' for Real-Time events + - 'saasApi' for SaaS API events + - 'aiGuardrails' for AI Guardrails events + event_id (str): The unique identifier of the event. + + Returns: + Optional[Dict[str, Any]]: The DLP event details as raw dict, or None if not found. + + Raises: + ApiException: If the API request fails. + ValueError: If event_type is not valid. + + Example: + >>> client = DLPRuleEventsClient() + >>> event = client.get_event_details( + ... event_type="realTime", + ... event_id="a1764e27-9e48-4dc4-8e93-e315472d42ed" + ... ) + >>> if event: + ... print(f"Event action: {event['action']}") + """ + if event_type not in VALID_EVENT_TYPES: + raise ValueError( + f"Invalid event_type '{event_type}'. " + f"Must be one of: {', '.join(VALID_EVENT_TYPES)}" + ) + + api_client = self._get_api_client() + logger.info(f"Retrieving {event_type} DLP event with ID: {event_id}") + + try: + response = api_client.get_dlp_event_details_by_id_without_preload_content( + event_type=event_type, + id=event_id, + ) + event = json.loads(response.data) + logger.info(f"Successfully retrieved DLP event details for ID: {event_id}") + return event + + except ApiException as e: + if e.status == 404: + logger.warning(f"DLP event {event_id} not found") + return None + logger.error(f"API error getting DLP event {event_id}: {e}") + raise + except Exception as e: + logger.error(f"Unexpected error getting DLP event {event_id}: {e}") + raise + + +# ============================================================================= +# COMMAND LINE INTERFACE +# ============================================================================= +def setup_argparse() -> argparse.ArgumentParser: + """ + Set up the argument parser for the CLI. + + Returns: + argparse.ArgumentParser: Configured argument parser. + """ + parser = argparse.ArgumentParser( + description="Cisco Secure Access DLP Rule Events Management CLI", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # List Real-Time DLP events from the last day + python dlp_rule_events.py list-realtime --from="-1days" --to="now" + + # List Real-Time DLP events with filters + python dlp_rule_events.py list-realtime --from="-7days" --to="now" \\ + --severity CRITICAL --action block --limit 10 + + # List SaaS API DLP events with exposure filter + python dlp_rule_events.py list-saas --from="-1days" --to="now" \\ + --exposure EXTERNAL + + # List AI Guardrails DLP events + python dlp_rule_events.py list-ai-guardrails --from="-1days" --to="now" + + # Use EU region + python dlp_rule_events.py --region eu list-realtime --from="-1days" --to="now" + + # Get specific event details + python dlp_rule_events.py get --event-type realTime \\ + --id "a1764e27-9e48-4dc4-8e93-e315472d42ed" + + # Output as JSON for scripting + python dlp_rule_events.py list-realtime --from="-1days" --to="now" --json + +Time Range Parameters: + - Use relative times like "-1days", "-7days", "-1hours" + - Use "now" for the current time + - Use Unix timestamps in milliseconds (e.g., "1639146300000") + - IMPORTANT: Use --from="-1days" syntax (with =) for negative values + +Region: + - US (default): https://api.sse.cisco.com/reports.us/v2 + - EU: https://api.sse.cisco.com/reports.eu/v2 + +API Documentation: + https://developer.cisco.com/docs/cloud-security/list-real-time-dlp-rule-events/ + https://developer.cisco.com/docs/cloud-security/list-saas-api-dlp-rule-events/ + https://developer.cisco.com/docs/cloud-security/list-ai-guardrails-dlp-rule-events/ + https://developer.cisco.com/docs/cloud-security/get-dlp-event-details/ +""" + ) + + # Global argument for region + parser.add_argument( + "--region", + type=str, + choices=VALID_REGIONS, + default="us", + help="API region: 'us' (default) or 'eu'" + ) + + subparsers = parser.add_subparsers(dest="command", help="Available commands") + + # ========================================================================= + # List Real-Time Events command + # ========================================================================= + realtime_parser = subparsers.add_parser( + "list-realtime", + help="List Real-Time DLP rule events", + description="Get Real-Time DLP events triggered by rules applied to private or internet resources." + ) + realtime_parser.add_argument( + "--from", + dest="var_from", + type=str, + required=True, + help="Start time: timestamp or relative time (e.g., '-1days', '-7days')" + ) + realtime_parser.add_argument( + "--to", + type=str, + required=True, + help="End time: timestamp or relative time (e.g., 'now')" + ) + realtime_parser.add_argument( + "--action", + type=str, + choices=VALID_ACTIONS, + help=f"Filter by action: {', '.join(VALID_ACTIONS)}" + ) + realtime_parser.add_argument( + "--severity", + type=str, + choices=VALID_SEVERITIES, + help=f"Filter by severity: {', '.join(VALID_SEVERITIES)}" + ) + realtime_parser.add_argument( + "--identity-type", + type=str, + help="Filter by identity type (e.g., directory_user, directory_group, network)" + ) + realtime_parser.add_argument( + "--application-id", + type=int, + help="Filter by application ID" + ) + realtime_parser.add_argument( + "--application-category-id", + type=int, + help="Filter by application category ID" + ) + realtime_parser.add_argument( + "--limit", + type=int, + default=50, + help="Maximum number of events to return (default: 50)" + ) + realtime_parser.add_argument( + "--offset", + type=int, + default=0, + help="Index offset for pagination (default: 0)" + ) + realtime_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + # ========================================================================= + # List SaaS API Events command + # ========================================================================= + saas_parser = subparsers.add_parser( + "list-saas", + help="List SaaS API DLP rule events", + description="Get SaaS API DLP events triggered by rules applied to SaaS applications." + ) + saas_parser.add_argument( + "--from", + dest="var_from", + type=str, + required=True, + help="Start time: timestamp or relative time (e.g., '-1days', '-7days')" + ) + saas_parser.add_argument( + "--to", + type=str, + required=True, + help="End time: timestamp or relative time (e.g., 'now')" + ) + saas_parser.add_argument( + "--action", + type=str, + choices=VALID_ACTIONS, + help=f"Filter by action: {', '.join(VALID_ACTIONS)}" + ) + saas_parser.add_argument( + "--severity", + type=str, + choices=VALID_SEVERITIES, + help=f"Filter by severity: {', '.join(VALID_SEVERITIES)}" + ) + saas_parser.add_argument( + "--identity-type", + type=str, + help="Filter by identity type (e.g., directory_user, directory_group, network)" + ) + saas_parser.add_argument( + "--application-id", + type=int, + help="Filter by application ID" + ) + saas_parser.add_argument( + "--application-category-id", + type=int, + help="Filter by application category ID" + ) + saas_parser.add_argument( + "--exposure", + type=str, + choices=VALID_EXPOSURES, + help=f"Filter by exposure level: {', '.join(VALID_EXPOSURES)}" + ) + saas_parser.add_argument( + "--limit", + type=int, + default=50, + help="Maximum number of events to return (default: 50)" + ) + saas_parser.add_argument( + "--offset", + type=int, + default=0, + help="Index offset for pagination (default: 0)" + ) + saas_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + # ========================================================================= + # List AI Guardrails Events command + # ========================================================================= + ai_parser = subparsers.add_parser( + "list-ai-guardrails", + help="List AI Guardrails DLP rule events", + description="Get AI Guardrails DLP events triggered by rules applied to AI Guardrails use cases." + ) + ai_parser.add_argument( + "--from", + dest="var_from", + type=str, + required=True, + help="Start time: timestamp or relative time (e.g., '-1days', '-7days')" + ) + ai_parser.add_argument( + "--to", + type=str, + required=True, + help="End time: timestamp or relative time (e.g., 'now')" + ) + ai_parser.add_argument( + "--action", + type=str, + choices=VALID_ACTIONS, + help=f"Filter by action: {', '.join(VALID_ACTIONS)}" + ) + ai_parser.add_argument( + "--severity", + type=str, + choices=VALID_SEVERITIES, + help=f"Filter by severity: {', '.join(VALID_SEVERITIES)}" + ) + ai_parser.add_argument( + "--identity-type", + type=str, + help="Filter by identity type (e.g., directory_user, directory_group, network)" + ) + ai_parser.add_argument( + "--limit", + type=int, + default=50, + help="Maximum number of events to return (default: 50)" + ) + ai_parser.add_argument( + "--offset", + type=int, + default=0, + help="Index offset for pagination (default: 0)" + ) + ai_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + # ========================================================================= + # Get Event Details command + # ========================================================================= + get_parser = subparsers.add_parser( + "get", + help="Get DLP event details by ID", + description="Get detailed information about a specific DLP event." + ) + get_parser.add_argument( + "--event-type", + type=str, + required=True, + choices=VALID_EVENT_TYPES, + help=f"Event type: {', '.join(VALID_EVENT_TYPES)}" + ) + get_parser.add_argument( + "--id", + type=str, + required=True, + help="The unique identifier of the event" + ) + get_parser.add_argument( + "--json", + action="store_true", + help="Output results as JSON" + ) + + return parser + + +def print_result(data: Any, as_json: bool = False) -> None: + """ + Print the result data in the specified format. + + Args: + data: The data to print (can be SDK models, dicts, or lists). + as_json: If True, output as JSON; otherwise, print normally. + """ + # Convert SDK models to dicts for serialization + def to_serializable(obj): + if hasattr(obj, 'to_dict'): + return obj.to_dict() + elif isinstance(obj, list): + return [to_serializable(item) for item in obj] + return obj + + serializable_data = to_serializable(data) + + if as_json: + print(json.dumps(serializable_data, indent=2, default=str)) + else: + if isinstance(serializable_data, dict): + # Handle the events wrapper + if 'events' in serializable_data: + events = serializable_data['events'] + if events: + for item in events: + print("-" * 80) + if isinstance(item, dict): + for key, value in item.items(): + print(f" {key}: {value}") + else: + print(f" {item}") + print("-" * 80) + else: + print("No events found.") + else: + # Single event details + for key, value in serializable_data.items(): + print(f" {key}: {value}") + elif isinstance(serializable_data, list): + for item in serializable_data: + print("-" * 80) + if isinstance(item, dict): + for key, value in item.items(): + print(f" {key}: {value}") + else: + print(f" {item}") + print("-" * 80) + else: + print(serializable_data) + + +def main() -> int: + """ + Main entry point for the CLI. + + Returns: + int: Exit code (0 for success, 1 for failure). + """ + parser = setup_argparse() + args = parser.parse_args() + + if not args.command: + parser.print_help() + return 1 + + try: + client = DLPRuleEventsClient(region=args.region) + + if args.command == "list-realtime": + events = client.list_realtime_events( + var_from=args.var_from, + to=args.to, + action=args.action, + severity=args.severity, + identity_type=args.identity_type, + application_id=args.application_id, + application_category_id=args.application_category_id, + limit=args.limit, + offset=args.offset, + ) + event_count = len(events.get("events", [])) + print(f"\nFound {event_count} Real-Time DLP event(s):\n") + print_result(events, args.json) + + elif args.command == "list-saas": + events = client.list_saas_api_events( + var_from=args.var_from, + to=args.to, + action=args.action, + severity=args.severity, + identity_type=args.identity_type, + application_id=args.application_id, + application_category_id=args.application_category_id, + limit=args.limit, + offset=args.offset, + exposure=args.exposure, + ) + event_count = len(events.get("events", [])) + print(f"\nFound {event_count} SaaS API DLP event(s):\n") + print_result(events, args.json) + + elif args.command == "list-ai-guardrails": + events = client.list_ai_guardrails_events( + var_from=args.var_from, + to=args.to, + action=args.action, + severity=args.severity, + identity_type=args.identity_type, + limit=args.limit, + offset=args.offset, + ) + event_count = len(events.get("events", [])) + print(f"\nFound {event_count} AI Guardrails DLP event(s):\n") + print_result(events, args.json) + + elif args.command == "get": + event = client.get_event_details( + event_type=args.event_type, + event_id=args.id, + ) + if event: + print(f"\nDLP Event Details ({args.event_type}):\n") + print_result(event, args.json) + else: + print(f"DLP event with ID '{args.id}' not found.") + return 1 + + return 0 + + except ValueError as e: + logger.error(f"Validation error: {e}") + return 1 + except ApiException as e: + logger.error(f"API error: {e}") + return 1 + except Exception as e: + logger.error(f"Unexpected error: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/key_admin_api.py b/examples/key_admin_api.py index 04201a4..d40b593 100644 --- a/examples/key_admin_api.py +++ b/examples/key_admin_api.py @@ -1,4 +1,4 @@ -# Copyright 2025 Cisco Systems, Inc. and its affiliates +# Copyright 2026 Cisco Systems, Inc. and its affiliates # # SPDX-License-Identifier: Apache-2.0 diff --git a/examples/roaming_computers_backup.py b/examples/roaming_computers_backup.py index 286fa2a..b19ca79 100644 --- a/examples/roaming_computers_backup.py +++ b/examples/roaming_computers_backup.py @@ -1,4 +1,4 @@ -# Copyright 2025 Cisco Systems, Inc. and its affiliates +# Copyright 2026 Cisco Systems, Inc. and its affiliates # # SPDX-License-Identifier: Apache-2.0 diff --git a/examples/top_identities_list.py b/examples/top_identities_list.py new file mode 100644 index 0000000..550023b --- /dev/null +++ b/examples/top_identities_list.py @@ -0,0 +1,425 @@ +# Copyright 2026 Cisco Systems, Inc. and its affiliates +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Fetch all top identities from Cisco Secure Access and write results to JSON (or CSV). + +Fetches every page concurrently (max 5 workers) using TopIdentitiesApi.get_top_identities, +then optionally renders a horizontal bar chart of identities ranked by request count. + +Usage: + python top_identities_list.py [--from TIMERANGE] [--to TIMERANGE] + [--identitytypes TYPES] + [--top-n N] + [--format json|csv] [--output FILE] + [--chart TYPE] [--chart-output FILE] + +Arguments: + --from Start of time range (default: -7days) + --to End of time range (default: now) + --identitytypes Identity type or comma-delimited list (e.g. "roaming computers") + --top-n Keep only the top N records after fetching (default: all) + --format Output format: json or csv (default: json) + --output Output file path (- for stdout, default: top_identities.json) + --chart Chart type: none | bar | horizontal_bar | line | pie (default: none) + --chart-output File path to save the chart PNG (empty = display interactively) + +Examples: + python top_identities_list.py + python top_identities_list.py --from -7days --to now --output top_identities.json + python top_identities_list.py --from 2025-01-01 --to 2025-01-31 --format csv --output results.csv + python top_identities_list.py --from -30days --to now --top-n 50 --chart horizontal_bar --chart-output chart.png + python top_identities_list.py --from -7days --to now --identitytypes "roaming computers" --chart horizontal_bar +""" + +import csv +import json +import logging +import argparse +import re +import sys +import time +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from typing import List, Optional + +from access_token import generate_access_token +from config import config +from secure_access.configuration import Configuration +from secure_access.api_client import ApiClient +from secure_access.api.top_identities_api import TopIdentitiesApi +from secure_access.exceptions import ApiException + +try: + import matplotlib + matplotlib.use("Agg") # non-interactive backend; switched to "TkAgg" for live display + import matplotlib.pyplot as plt + _MATPLOTLIB_AVAILABLE = True +except ImportError: + _MATPLOTLIB_AVAILABLE = False + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +# ── time helpers ────────────────────────────────────────────────────────────── + +_RELATIVE_SUFFIXES = ("days", "hours", "minutes") +_ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ" +_DATE_ONLY_FORMAT = "%Y-%m-%d" + +# Regex that matches relative time strings such as -7days, -30hours, -1minutes +_RELATIVE_RE = re.compile(r"^-\d+(?:days|hours|minutes)$") + + +def parse_time_arg(value: str) -> str: + """ + Normalise a time argument to an API-accepted string. + + Accepted formats: + Relative : -1days | -7days | -30days | -90days | -365days | now + ISO 8601 : 2025-01-01T00:00:00Z (validated, passed through unchanged) + Date-only : 2025-01-01 (converted to 2025-01-01T00:00:00Z) + """ + v = value.strip() + if v == "now": + return v + if v.startswith("-") and any(v.endswith(s) for s in _RELATIVE_SUFFIXES): + return v + try: + datetime.strptime(v, _ISO_FORMAT) + return v + except ValueError: + pass + try: + dt = datetime.strptime(v, _DATE_ONLY_FORMAT) + return dt.strftime(_ISO_FORMAT) + except ValueError: + pass + raise argparse.ArgumentTypeError( + f"Unrecognised time format: '{value}'. " + "Use a relative value (-7days, now), ISO 8601 (2025-01-01T00:00:00Z), " + "or date-only (2025-01-01)." + ) + + +# ── data helpers ────────────────────────────────────────────────────────────── + +def apply_top_n(records: list, top_n: Optional[int]) -> list: + """Slice to at most top_n records (preserving API rank order). None = keep all.""" + if top_n is None: + return records + return records[:top_n] + + +def _flatten_dict(d: dict, parent_key: str = "") -> dict: + """Flatten one level of nested dicts using dot notation for CSV output.""" + items = {} + for k, v in d.items(): + key = f"{parent_key}.{k}" if parent_key else k + if isinstance(v, dict): + items.update(_flatten_dict(v, key)) + else: + items[key] = v + return items + + +def write_json(records: list, path: str) -> None: + data = [r.to_dict() for r in records] + if path == "-": + print(json.dumps(data, indent=2)) + else: + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + logger.info("JSON results written to %s", path) + + +def write_csv(records: list, path: str) -> None: + if not records: + logger.warning("No records to write.") + return + flat_rows = [_flatten_dict(r.to_dict()) for r in records] + fieldnames = list(flat_rows[0].keys()) + dest = sys.stdout if path == "-" else open(path, "w", newline="", encoding="utf-8") + try: + writer = csv.DictWriter(dest, fieldnames=fieldnames, extrasaction="ignore") + writer.writeheader() + writer.writerows(flat_rows) + finally: + if path != "-": + dest.close() + logger.info("CSV results written to %s", path) + + +# ── visualization ───────────────────────────────────────────────────────────── + +def visualize(records: list, chart_type: str, chart_output: str) -> None: + """ + Render a chart from the TopIdentity record list using matplotlib. + + chart_type : bar | horizontal_bar | line | pie + chart_output : file path to save PNG (e.g. chart.png), or "" to display interactively + """ + if not _MATPLOTLIB_AVAILABLE: + logger.error("matplotlib is not installed. Run: pip install matplotlib") + return + if not records: + logger.warning("No data to visualize.") + return + + labels: List[str] = [] + values: List[float] = [] + for i, r in enumerate(records): + identity = getattr(r, "identity", None) + label = (getattr(identity, "label", None) if identity else None) or str(i + 1) + value = getattr(r, "requests", None) or 0 + labels.append(str(label)) + values.append(float(value) if value else 0.0) + + fig, ax = plt.subplots(figsize=(12, max(6, len(labels) * 0.35))) + + if chart_type == "bar": + ax.bar(labels, values, color="steelblue") + ax.set_xlabel("Identity") + ax.set_ylabel("Requests") + plt.xticks(rotation=45, ha="right") + + elif chart_type == "horizontal_bar": + # Reverse so rank 1 appears at the top + ax.barh(labels[::-1], values[::-1], color="steelblue") + ax.set_xlabel("Requests") + ax.set_ylabel("Identity") + + elif chart_type == "line": + ax.plot(labels, values, marker="o", color="steelblue", linewidth=2) + ax.set_xlabel("Identity") + ax.set_ylabel("Requests") + plt.xticks(rotation=45, ha="right") + + elif chart_type == "pie": + ax.pie(values, labels=labels, autopct="%1.1f%%", startangle=140) + + ax.set_title("Top Identities by Requests") + plt.tight_layout() + + if chart_output: + plt.savefig(chart_output, dpi=150, bbox_inches="tight") + logger.info("Chart saved to %s", chart_output) + else: + matplotlib.use("TkAgg") # switch to interactive backend for live display + plt.show() + + plt.close(fig) + + +# ── API client ──────────────────────────────────────────────────────────────── + +def build_client() -> TopIdentitiesApi: + access_token = generate_access_token() + configuration = Configuration( + access_token=access_token, + retries=config.get_retry(), + ignore_operation_servers=True, + server_index=1, + server_variables={"region": "us"}, + ) + api_client = ApiClient(configuration=configuration) + return TopIdentitiesApi(api_client=api_client) + + +# ── fetch (concurrent pagination) ──────────────────────────────────────────── + +def fetch_all(api: TopIdentitiesApi, args: argparse.Namespace) -> list: + """ + Fetch all records using concurrent batch pagination. + + Fetches up to 5 pages in parallel per batch (max_workers=5). + Stops as soon as any page in a batch returns fewer than 100 records, + which indicates the last page — no reliance on meta.total. + + Use --page-delay N to sleep N seconds between batches when hitting + 429 rate limits. + """ + limit = 100 + max_workers = 5 + page_delay: float = getattr(args, "page_delay", 0.0) or 0.0 + all_records: list = [] + offset = 0 + + def fetch_page(o: int): + return api.get_top_identities( + var_from=args.var_from, + to=args.to, + limit=limit, + offset=o, + identitytypes=args.identitytypes, + ) + + while True: + batch_offsets = [o for o in range(offset, offset + max_workers * limit, limit) if o <= 10000] + if not batch_offsets: + break # all offsets exceed API max (10000) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + pages = list(executor.map(fetch_page, batch_offsets)) + + done = False + for page in pages: + all_records.extend(page.data) + logger.info("Fetched %d records (offset=%d)", len(page.data), batch_offsets[pages.index(page)]) + if len(page.data) < limit: + done = True + break # last page reached — stop processing this batch + + # truncated batch means we've hit the API offset ceiling + if done or len(batch_offsets) < max_workers: + break + + offset += max_workers * limit + if page_delay > 0: + logger.info("Sleeping %.1fs between batches (--page-delay)", page_delay) + time.sleep(page_delay) + + return all_records + + +# ── main ────────────────────────────────────────────────────────────────────── + +def main() -> None: + # Pre-process argv so argparse doesn't misinterpret relative time strings + # (e.g. -7days) as flags. Convert --from -7days → --from=-7days. + argv = sys.argv[1:] + _time_flags = {"--from", "--to"} + processed: list = [] + i = 0 + while i < len(argv): + token = argv[i] + if token in _time_flags and i + 1 < len(argv) and _RELATIVE_RE.match(argv[i + 1]): + processed.append(f"{token}={argv[i + 1]}") + i += 2 + else: + processed.append(token) + i += 1 + + parser = argparse.ArgumentParser( + description="Fetch all top identities from Cisco Secure Access (last 7 days by default).", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Time range formats: + Relative : -1days | -7days | -30days | -90days | now + ISO 8601 : 2025-01-01T00:00:00Z + Date only: 2025-01-01 (treated as midnight UTC) + +Examples: + python top_identities_list.py + python top_identities_list.py --from -7days --to now --output top_identities.json + python top_identities_list.py --from 2025-01-01 --to 2025-01-31 --format csv --output results.csv + python top_identities_list.py --from -30days --to now --top-n 50 --chart horizontal_bar --chart-output chart.png + python top_identities_list.py --from -7days --identitytypes "roaming computers" --chart horizontal_bar + """, + ) + parser.add_argument( + "--from", + dest="var_from", + default="-7days", + type=parse_time_arg, + help="Start of time range (default: -7days)", + ) + parser.add_argument( + "--to", + default="now", + type=parse_time_arg, + help="End of time range (default: now)", + ) + parser.add_argument( + "--identitytypes", + default=None, + help="Identity type or comma-delimited list (e.g. 'roaming computers,users'). Default: all types.", + ) + parser.add_argument( + "--top-n", + dest="top_n", + type=int, + default=None, + help="Keep only the top N records after fetching (default: all)", + ) + parser.add_argument( + "--format", + dest="fmt", + default="json", + choices=["json", "csv"], + help="Output format: json or csv (default: json)", + ) + parser.add_argument( + "--output", + default="top_identities.json", + help="Output file path (- for stdout, default: top_identities.json)", + ) + parser.add_argument( + "--chart", + default="none", + choices=["none", "bar", "horizontal_bar", "line", "pie"], + help="Chart type for visualization (default: none)", + ) + parser.add_argument( + "--chart-output", + dest="chart_output", + default="top_identities_chart.png", + help="File path to save the chart PNG (empty = display interactively, default: top_identities_chart.png)", + ) + parser.add_argument( + "--page-delay", + dest="page_delay", + type=float, + default=0.0, + help="Seconds to sleep between page batches (default: 0). Use e.g. 1.0 to avoid 429 rate limits.", + ) + + args = parser.parse_args(processed) + + try: + api = build_client() + + logger.info("Fetching top identities from %s to %s", args.var_from, args.to) + if args.identitytypes: + logger.info("Filter — identitytypes: %s", args.identitytypes) + + records = fetch_all(api, args) + logger.info("Retrieved %d records total", len(records)) + + records = apply_top_n(records, args.top_n) + if args.top_n is not None: + logger.info("Limited to top %d records", len(records)) + + if args.fmt == "csv": + write_csv(records, args.output) + else: + write_json(records, args.output) + + if args.chart != "none": + visualize(records, args.chart, args.chart_output) + + except ApiException as e: + logger.error("API call failed — HTTP %s %s: %s", e.status, e.reason, e.body) + if e.status == 401: + logger.error("Authentication failed. Verify CLIENT_ID and CLIENT_SECRET.") + elif e.status == 403: + logger.error("Access forbidden. Verify the API scope permissions.") + elif e.status == 429: + logger.error("Rate limit exceeded. Retry configuration is active via config.get_retry().") + elif e.status and e.status >= 500: + logger.error("Server error. Try again later.") + raise SystemExit(1) + except ValueError as e: + logger.error("Invalid parameter: %s", e) + raise SystemExit(1) + except Exception as e: + logger.error("Unexpected error: %s", e) + raise SystemExit(1) + + +if __name__ == "__main__": + main()