Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,98 @@ options:
-h, --help show this help message and exit
```

### Alert Rules Management
Manage alert rules (list, get, create, update, delete, update-status)
```sh
python examples/alert_rules.py -h
usage: alert_rules.py [-h] {list,get,create,update,delete,update-status} ...

Cisco Secure Access Alert Rules Management CLI

positional arguments:
{list,get,create,update,delete,update-status}
Available commands
list List all alert rules
get Get a specific alert rule by ID
create Create a new alert rule
update Update an existing alert rule
delete Delete one or more alert rules
update-status Update the status of alert rules

options:
-h, --help show this help message and exit
```

### Alert Integration
Create webhook integrations and associated alert rules end-to-end
```sh
python examples/alert_integration.py
```

### Complex Example
Class-based client with idempotent operations for destination lists, network tunnel groups, private resources, and access policies
```sh
python examples/complex_example.py -h
usage: complex_example.py [-h] -o {all,destination-list,network-tunnel-groups,private-resources,access-policy,list-network-tunnel-groups,list-private-resources,identities}
[--ntg-id NTG_ID] [--pr-id PR_ID] [-v]

Cisco Secure Access API Client - Create and manage resources with idempotent operations.

options:
-h, --help show this help message and exit
-o, --operation {all,destination-list,network-tunnel-groups,private-resources,access-policy,list-network-tunnel-groups,list-private-resources,identities}
Operation to perform
--ntg-id NTG_ID Network Tunnel Group ID (required for 'access-policy' operation when not running 'all')
--pr-id PR_ID Private Resource ID (required for 'access-policy' operation when not running 'all')
-v, --verbose Enable verbose/debug logging
```

### DLP Rule Events
Retrieve DLP rule events (Real-Time, SaaS API, AI Guardrails) with regional endpoint support
```sh
python examples/dlp_rule_events.py -h
usage: dlp_rule_events.py [-h] [--region {us,eu}] {list-realtime,list-saas,list-ai-guardrails,get} ...

Cisco Secure Access DLP Rule Events Management CLI

positional arguments:
{list-realtime,list-saas,list-ai-guardrails,get}
Available commands
list-realtime List Real-Time DLP rule events
list-saas List SaaS API DLP rule events
list-ai-guardrails List AI Guardrails DLP rule events
get Get DLP event details by ID

options:
-h, --help show this help message and exit
--region {us,eu} API region: 'us' (default) or 'eu'
```

### Top Identities List
Fetch top identities with pagination, export to JSON/CSV, and optional chart visualization
```sh
python examples/top_identities_list.py -h
usage: top_identities_list.py [-h] [--from FROM] [--to TO] [--identitytypes TYPES]
[--top-n N] [--format {json,csv}] [--output FILE]
[--chart {none,bar,horizontal_bar,line,pie}]
[--chart-output FILE] [--page-delay SECONDS]

Fetch all top identities from Cisco Secure Access (last 7 days by default).

options:
-h, --help show this help message and exit
--from FROM Start of time range (default: -7days)
--to TO End of time range (default: now)
--identitytypes TYPES Identity type or comma-delimited list (e.g. 'roaming computers,users')
--top-n N Keep only the top N records after fetching (default: all)
--format {json,csv} Output format: json or csv (default: json)
--output FILE Output file path (- for stdout, default: top_identities.json)
--chart {none,bar,horizontal_bar,line,pie}
Chart type for visualization (default: none)
--chart-output FILE File path to save the chart PNG (default: top_identities_chart.png)
--page-delay SECONDS Seconds to sleep between page batches (default: 0)
```

### Key Admin API Management
Manage API keys and administrative functions
```sh
Expand Down
2 changes: 1 addition & 1 deletion examples/access_rule_backup_restore.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2025 Cisco Systems, Inc. and its affiliates
# Copyright 2026 Cisco Systems, Inc. and its affiliates
#
# SPDX-License-Identifier: Apache-2.0

Expand Down
67 changes: 65 additions & 2 deletions examples/access_token.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2025 Cisco Systems, Inc. and its affiliates
# Copyright 2026 Cisco Systems, Inc. and its affiliates
#
# SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -27,19 +27,24 @@

import os
import base64
from time import time
from secure_access.api.token_api import TokenApi
from typing import Optional
import dotenv


def generate_access_token(
client_id: Optional[str] = None, client_secret: Optional[str] = None
client_id: Optional[str] = None, client_secret: Optional[str] = None,
save_to_file: bool = False, file_path: str = ".env"
) -> str:
"""
Generates an OAuth2 access token for Cisco SSE API authentication.

Args:
client_id (Optional[str]): The client ID to use. If not provided, uses CLIENT_ID env var.
client_secret (Optional[str]): The client secret to use. If not provided, uses CLIENT_SECRET env var.
save_to_file (bool): Whether to save the generated token to a file. Defaults to False.
file_path (str): The file path to save the token if save_to_file is True. Defaults to ".env".

Returns:
str: The access token string.
Expand All @@ -61,7 +66,65 @@ def generate_access_token(
grant_type="client_credentials",
_headers={"Authorization": f"Basic {base64_credentials}"},
)
if save_to_file:
save_access_token(response, file_path)
return response.access_token
except Exception as e:
print(f"An error occurred while creating the access token: {e}")
raise Exception("Failed to generate access token") from e

def save_access_token(response: dict, file_path: str = ".env") -> None:
"""
Saves the access token to a file.

Args:
response (dict): The response containing the access token and expiration.
file_path (str): The path to the file where the token will be saved. Defaults to '.env'.
"""
try:
dotenv.set_key(file_path, "ACCESS_TOKEN", str(response.access_token))
dotenv.set_key(file_path, "EXPIRES_IN", str(response.expires_in))
dotenv.set_key(file_path, "TIMESTAMP", str(time()))
print(f"Access token saved to {file_path}")
except Exception as e:
print(f"An error occurred while saving the access token: {e}")
raise Exception("Failed to save access token") from e

def is_token_expired(file_path: str = ".env") -> bool:
"""
Checks if the access token is expired based on the saved timestamp and expiration time.

Args:
file_path (str): The path to the file where the token is saved. Defaults to '.env'.

Returns:
bool: True if the token is expired, False otherwise.
"""
try:
expires_in = int(dotenv.get_key(file_path, "EXPIRES_IN") or 0)
timestamp = float(dotenv.get_key(file_path, "TIMESTAMP") or 0)
current_time = time()
return current_time >= timestamp + expires_in
except Exception as e:
print(f"An error occurred while checking token expiration: {e}")
raise Exception("Failed to check token expiration") from e

def get_valid_access_token(file_path: str = ".env") -> str:
"""
Retrieves a valid access token, generating a new one if the current token is expired.

Args:
file_path (str): The path to the file where the token is saved. Defaults to '.env'.

Returns:
str: A valid access token string.
"""
try:
if is_token_expired(file_path):
print("Access token is expired. Generating a new one.")
return generate_access_token(save_to_file=True, file_path=file_path)
else:
return dotenv.get_key(file_path, "ACCESS_TOKEN") or ""
except Exception as e:
print(f"An error occurred while retrieving the access token: {e}")
raise Exception("Failed to retrieve access token") from e
104 changes: 104 additions & 0 deletions examples/alert_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2026 Cisco Systems, Inc. and its affiliates
#
# SPDX-License-Identifier: Apache-2.0

"""
Cisco Secure Access API Client - Class-based implementation with idempotent operations.

This module provides a class-based wrapper around the Cisco Secure Access API,
handling authentication, resource creation with idempotency checks, and proper logging.
"""

import logging
from typing import Any, List, Tuple, Optional

from secure_access import ConditionsAlertRule
from secure_access.models import NotificationInfoAlertRule, NotificationTypeAll, SeverityAlert, StatusAlertRule
from access_token import get_valid_access_token
from secure_access.models.create_alert_rule_request import CreateAlertRuleRequest
from secure_access.api.alert_rules_api import AlertRulesApi

from integrations import CiscoSecureAccessIntegrationClient

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class CiscoSecureAccessAlertIntegrationClient:
"""Client for interacting with Cisco Secure Access APIs with idempotent operations."""

def __init__(self) -> None:
"""Initialize the client with authentication token."""
try:
self.access_token = get_valid_access_token()
logger.info("Successfully obtained access token")
except Exception as e:
logger.error(f"Failed to obtain access token: {e}")
raise

def _set_authorization_header(self, api_client: Any) -> None:
"""
Set the Authorization header for an API client.

Args:
api_client: The API client instance to configure.
"""
api_client.api_client.set_default_header(
"Authorization", f"Bearer {self.access_token}"
)

def _build_alert_integration_payload(self, webhook_ids: List[str]) -> CreateAlertRuleRequest:
"""Build the payload for creating a push security events integration."""
return CreateAlertRuleRequest(
description="Alert rule created by CiscoSecureAccessAlertIntegrationClient",
name="Example Alert Rule",
notification_info=[NotificationInfoAlertRule(
webhook_ids=webhook_ids,
type=NotificationTypeAll("webhook"),
)],
rule_type_id=10,
severity=SeverityAlert(1),
status=StatusAlertRule(1),
conditions=ConditionsAlertRule(
match_type="all"
)
)

def create_alert_rule(self, webhook_ids: List[str]) -> None:
"""Create a push security events integration with idempotency check."""
api_client = AlertRulesApi()
self._set_authorization_header(api_client)

payload = self._build_alert_integration_payload(webhook_ids)

try:
response = api_client.list_alert_rules_without_preload_content()
if not response.status == 200:
logger.error(f"Failed to retrieve existing alert rules. Status code: {response.status}")
return None

existing_alert_rules = response.json()
logger.info(f"Retrieved existing alert rules: {existing_alert_rules}")

for alert_rule in existing_alert_rules:
if alert_rule.get("name") == payload.name:
logger.info(f"Alert rule '{payload.name}' already exists with ID: {alert_rule.get('id')}")
return alert_rule.get("id")

logger.info(f"Creating new alert rule '{payload.name}'...")
response = api_client.create_alert_rule_without_preload_content(payload)
alert_rule = response.json()
logger.info(f"Successfully created alert rule '{payload.name}': {alert_rule}")
except Exception as e:
logger.error(f"Failed to create alert rule '{payload.name}': {e}")
return None

if __name__ == "__main__":
client = CiscoSecureAccessAlertIntegrationClient()
client2 = CiscoSecureAccessIntegrationClient()
webhook_id = client2.create_webhook_integration()
client.create_alert_rule([webhook_id])
Loading