Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.

Commit f1f7c45

Browse files
feat: Add helper methods for async mTLS support for google-auth
Signed-off-by: Radhika Agrawal <agrawalradhika@google.com>
1 parent 9d5c0d8 commit f1f7c45

1 file changed

Lines changed: 131 additions & 0 deletions

File tree

google/auth/aio/transport/mtls.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Helper functions for mTLS in asyncio.
17+
"""
18+
19+
import asyncio
20+
import contextlib
21+
import logging
22+
import os
23+
from os import environ, getenv, path
24+
import ssl
25+
import tempfile
26+
from typing import Optional
27+
28+
from google.auth import exceptions
29+
30+
CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
31+
_LOGGER = logging.getLogger(__name__)
32+
33+
def _check_config_path(config_path):
34+
"""Checks for config file path. If it exists, returns the absolute path with user expansion;
35+
otherwise returns None.
36+
37+
Args:
38+
config_path (str): The config file path for certificate_config.json for example
39+
40+
Returns:
41+
str: absolute path if exists and None otherwise.
42+
"""
43+
config_path = path.expanduser(config_path)
44+
if not path.exists(config_path):
45+
_LOGGER.debug("%s is not found.", config_path)
46+
return None
47+
return config_path
48+
49+
50+
def has_default_client_cert_source():
51+
"""Check if default client SSL credentials exists on the device.
52+
53+
Returns:
54+
bool: indicating if the default client cert source exists.
55+
"""
56+
if (
57+
_check_config_path(CERTIFICATE_CONFIGURATION_DEFAULT_PATH)
58+
is not None
59+
):
60+
return True
61+
cert_config_path = getenv("GOOGLE_API_CERTIFICATE_CONFIG")
62+
if (
63+
cert_config_path
64+
and _check_config_path(cert_config_path) is not None
65+
):
66+
return True
67+
return False
68+
69+
70+
def get_client_ssl_credentials(
71+
generate_encrypted_key=False,
72+
certificate_config_path=None,
73+
):
74+
"""Returns the client side certificate, private key and passphrase.
75+
76+
We look for certificates and keys with the following order of priority:
77+
1. Certificate and key specified by certificate_config.json.
78+
Currently, only X.509 workload certificates are supported.
79+
80+
Args:
81+
generate_encrypted_key (bool): If set to True, encrypted private key
82+
and passphrase will be generated; otherwise, unencrypted private key
83+
will be generated and passphrase will be None. This option only
84+
affects keys obtained via context_aware_metadata.json.
85+
certificate_config_path (str): The certificate_config.json file path.
86+
87+
Returns:
88+
Tuple[bool, bytes, bytes, bytes]:
89+
A boolean indicating if cert, key and passphrase are obtained, the
90+
cert bytes and key bytes both in PEM format, and passphrase bytes.
91+
92+
Raises:
93+
google.auth.exceptions.ClientCertError: if problems occurs when getting
94+
the cert, key and passphrase.
95+
"""
96+
97+
# Attempt to retrieve X.509 Workload cert and key.
98+
cert, key = google.auth.transport._mtls_helper._get_workload_cert_and_key(certificate_config_path)
99+
if cert and key:
100+
return True, cert, key, None
101+
102+
return False, None, None, None
103+
104+
105+
def get_client_cert_and_key(client_cert_callback=None):
106+
"""Returns the client side certificate and private key. The function first
107+
tries to get certificate and key from client_cert_callback; if the callback
108+
is None or doesn't provide certificate and key, the function tries application
109+
default SSL credentials.
110+
111+
Args:
112+
client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An
113+
optional callback which returns client certificate bytes and private
114+
key bytes both in PEM format.
115+
116+
Returns:
117+
Tuple[bool, bytes, bytes]:
118+
A boolean indicating if cert and key are obtained, the cert bytes
119+
and key bytes both in PEM format.
120+
121+
Raises:
122+
google.auth.exceptions.ClientCertError: if problems occurs when getting
123+
the cert and key.
124+
"""
125+
if client_cert_callback:
126+
cert, key = client_cert_callback()
127+
return True, cert, key
128+
129+
has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False)
130+
return has_cert, cert, key
131+

0 commit comments

Comments
 (0)