Skip to content

Commit ab44a21

Browse files
authored
PYTHON-5780 Increase code coverage for pyopenssl_context.py (#2773)
1 parent a13842f commit ab44a21

1 file changed

Lines changed: 271 additions & 0 deletions

File tree

test/test_pyopenssl_context.py

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
# Copyright 2026-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Unit tests for pyopenssl_context.py.
16+
17+
These tests require PyOpenSSL (install via: pip install pymongo[ocsp]).
18+
Tests are automatically skipped when PyOpenSSL is not available.
19+
"""
20+
21+
from __future__ import annotations
22+
23+
import ssl
24+
import sys
25+
from unittest.mock import patch
26+
27+
sys.path[0:0] = [""]
28+
29+
from test import unittest
30+
31+
try:
32+
from pymongo import pyopenssl_context as _ctx_module
33+
from pymongo.pyopenssl_context import (
34+
PROTOCOL_SSLv23,
35+
SSLContext,
36+
_is_ip_address,
37+
_ragged_eof,
38+
)
39+
40+
_HAVE_PYOPENSSL = True
41+
except ImportError:
42+
_HAVE_PYOPENSSL = False
43+
44+
45+
# ---------------------------------------------------------------------------
46+
# Pure functions (no SSL context required)
47+
# ---------------------------------------------------------------------------
48+
49+
50+
class TestIsIpAddress(unittest.TestCase):
51+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
52+
def test_ipv4(self):
53+
self.assertTrue(_is_ip_address("192.168.1.1"))
54+
55+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
56+
def test_ipv6(self):
57+
self.assertTrue(_is_ip_address("::1"))
58+
self.assertTrue(_is_ip_address("2001:db8::1"))
59+
60+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
61+
def test_hostname_is_not_ip(self):
62+
self.assertFalse(_is_ip_address("example.com"))
63+
self.assertFalse(_is_ip_address("localhost"))
64+
65+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
66+
def test_invalid_string_returns_false(self):
67+
self.assertFalse(_is_ip_address("not-an-ip"))
68+
69+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
70+
def test_unicode_error_returns_false(self):
71+
# UnicodeError path: some inputs that can't be decoded.
72+
# ip_address raises UnicodeError for byte strings with non-ASCII.
73+
self.assertFalse(_is_ip_address(b"\xff\xfe"))
74+
75+
76+
class TestRaggedEof(unittest.TestCase):
77+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
78+
def test_matching_args_returns_true(self):
79+
from OpenSSL.SSL import SysCallError
80+
81+
exc = SysCallError(-1, "Unexpected EOF")
82+
self.assertTrue(_ragged_eof(exc))
83+
84+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
85+
def test_non_matching_args_returns_false(self):
86+
from OpenSSL.SSL import SysCallError
87+
88+
exc = SysCallError(0, "something else")
89+
self.assertFalse(_ragged_eof(exc))
90+
91+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
92+
def test_wrong_code_returns_false(self):
93+
from OpenSSL.SSL import SysCallError
94+
95+
exc = SysCallError(5, "Unexpected EOF")
96+
self.assertFalse(_ragged_eof(exc))
97+
98+
99+
# ---------------------------------------------------------------------------
100+
# SSLContext — construction and properties
101+
# ---------------------------------------------------------------------------
102+
103+
104+
class TestSSLContextConstruction(unittest.TestCase):
105+
def _make(self):
106+
return SSLContext(PROTOCOL_SSLv23)
107+
108+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
109+
def test_protocol_property(self):
110+
ctx = self._make()
111+
self.assertEqual(ctx.protocol, PROTOCOL_SSLv23)
112+
113+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
114+
def test_default_check_hostname(self):
115+
ctx = self._make()
116+
self.assertTrue(ctx.check_hostname)
117+
118+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
119+
def test_set_check_hostname_false(self):
120+
ctx = self._make()
121+
ctx.check_hostname = False
122+
self.assertFalse(ctx.check_hostname)
123+
124+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
125+
def test_set_check_hostname_invalid_raises(self):
126+
ctx = self._make()
127+
with self.assertRaises(TypeError):
128+
ctx.check_hostname = "yes"
129+
130+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
131+
def test_default_check_ocsp_endpoint(self):
132+
ctx = self._make()
133+
self.assertTrue(ctx.check_ocsp_endpoint)
134+
135+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
136+
def test_set_check_ocsp_endpoint_false(self):
137+
ctx = self._make()
138+
ctx.check_ocsp_endpoint = False
139+
self.assertFalse(ctx.check_ocsp_endpoint)
140+
141+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
142+
def test_verify_mode_roundtrip(self):
143+
ctx = self._make()
144+
ctx.verify_mode = ssl.CERT_REQUIRED
145+
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
146+
147+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
148+
def test_verify_mode_cert_none(self):
149+
ctx = self._make()
150+
ctx.verify_mode = ssl.CERT_NONE
151+
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
152+
153+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
154+
def test_options_setter_and_getter(self):
155+
ctx = self._make()
156+
from pymongo.pyopenssl_context import OP_NO_SSLv3
157+
158+
ctx.options = OP_NO_SSLv3
159+
self.assertTrue(ctx.options & OP_NO_SSLv3)
160+
161+
162+
# ---------------------------------------------------------------------------
163+
# SSLContext._load_certifi
164+
# ---------------------------------------------------------------------------
165+
166+
167+
class TestLoadCertifi(unittest.TestCase):
168+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
169+
def test_raises_when_certifi_unavailable(self):
170+
from pymongo.errors import ConfigurationError
171+
172+
ctx = SSLContext(PROTOCOL_SSLv23)
173+
with patch.object(_ctx_module, "_HAVE_CERTIFI", False):
174+
with self.assertRaises(ConfigurationError) as exc_ctx:
175+
ctx._load_certifi()
176+
self.assertIn("certifi", str(exc_ctx.exception))
177+
178+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
179+
def test_loads_when_certifi_available(self):
180+
if not _ctx_module._HAVE_CERTIFI:
181+
self.skipTest("certifi not installed")
182+
ctx = SSLContext(PROTOCOL_SSLv23)
183+
ctx.verify_mode = ssl.CERT_NONE
184+
# Should not raise.
185+
ctx._load_certifi()
186+
187+
188+
# ---------------------------------------------------------------------------
189+
# SSLContext.load_default_certs — platform branching
190+
# ---------------------------------------------------------------------------
191+
192+
193+
class TestLoadDefaultCerts(unittest.TestCase):
194+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
195+
def test_darwin_calls_load_certifi(self):
196+
with patch.object(_ctx_module._sys, "platform", "darwin"):
197+
with patch.object(SSLContext, "_load_certifi") as mock_certifi:
198+
with patch("OpenSSL.SSL.Context.set_default_verify_paths"):
199+
ctx = SSLContext(PROTOCOL_SSLv23)
200+
ctx.load_default_certs()
201+
mock_certifi.assert_called()
202+
203+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
204+
def test_win32_calls_load_wincerts(self):
205+
with patch.object(_ctx_module._sys, "platform", "win32"):
206+
with patch.object(SSLContext, "_load_wincerts") as mock_wincerts:
207+
with patch("OpenSSL.SSL.Context.set_default_verify_paths"):
208+
ctx = SSLContext(PROTOCOL_SSLv23)
209+
ctx.load_default_certs()
210+
calls = [call.args[0] for call in mock_wincerts.call_args_list]
211+
self.assertIn("CA", calls)
212+
self.assertIn("ROOT", calls)
213+
214+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
215+
def test_win32_falls_back_to_certifi_on_exception(self):
216+
with patch.object(_ctx_module._sys, "platform", "win32"):
217+
with patch.object(SSLContext, "_load_wincerts", side_effect=Exception("no certs")):
218+
with patch.object(SSLContext, "_load_certifi") as mock_certifi:
219+
with patch("OpenSSL.SSL.Context.set_default_verify_paths"):
220+
ctx = SSLContext(PROTOCOL_SSLv23)
221+
ctx.load_default_certs()
222+
mock_certifi.assert_called()
223+
224+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
225+
def test_linux_no_certifi_call(self):
226+
with patch.object(_ctx_module._sys, "platform", "linux"):
227+
with patch.object(SSLContext, "_load_certifi") as mock_certifi:
228+
with patch("OpenSSL.SSL.Context.set_default_verify_paths"):
229+
ctx = SSLContext(PROTOCOL_SSLv23)
230+
ctx.load_default_certs()
231+
mock_certifi.assert_not_called()
232+
233+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
234+
def test_calls_set_default_verify_paths(self):
235+
with patch.object(_ctx_module._sys, "platform", "linux"):
236+
ctx = SSLContext(PROTOCOL_SSLv23)
237+
with patch.object(ctx._ctx, "set_default_verify_paths") as mock_sdvp:
238+
ctx.load_default_certs()
239+
mock_sdvp.assert_called_once()
240+
241+
242+
# ---------------------------------------------------------------------------
243+
# SSLContext.set_default_verify_paths
244+
# ---------------------------------------------------------------------------
245+
246+
247+
class TestSetDefaultVerifyPaths(unittest.TestCase):
248+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
249+
def test_delegates_to_ctx(self):
250+
ctx = SSLContext(PROTOCOL_SSLv23)
251+
with patch.object(ctx._ctx, "set_default_verify_paths") as mock_sdvp:
252+
ctx.set_default_verify_paths()
253+
mock_sdvp.assert_called_once()
254+
255+
256+
# ---------------------------------------------------------------------------
257+
# SSLContext.load_verify_locations
258+
# ---------------------------------------------------------------------------
259+
260+
261+
class TestLoadVerifyLocations(unittest.TestCase):
262+
@unittest.skipUnless(_HAVE_PYOPENSSL, "PyOpenSSL is not available.")
263+
def test_delegates_to_ctx(self):
264+
ctx = SSLContext(PROTOCOL_SSLv23)
265+
with patch.object(ctx._ctx, "load_verify_locations") as mock_lvl:
266+
ctx.load_verify_locations(cafile="/tmp/ca.pem")
267+
mock_lvl.assert_called_once_with("/tmp/ca.pem", None)
268+
269+
270+
if __name__ == "__main__":
271+
unittest.main()

0 commit comments

Comments
 (0)