Skip to content

Commit 4f700f5

Browse files
committed
MB-52782 Use the correct credentials for sink REST requests
Prior to this change, 'cbrecovery' used the source credentials for operations on both the source and sink. This resulted in the inability to recover to/from clusters where the credentials were different. Change-Id: I05b0315ba3bd5b3a071754192498082516b5d994 Reviewed-on: https://review.couchbase.org/c/couchbase-cli/+/177091 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Maksimiljans Januska <maks.januska@couchbase.com> Well-Formed: Restriction Checker
1 parent 7682cc2 commit 4f700f5

4 files changed

Lines changed: 136 additions & 11 deletions

File tree

cbrecovery

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ class Recovery(pump_transfer.Transfer):
126126
sys.stderr.write(f'{total_vbucket} vbuckets recovered with elapsed time {delta:0.2f} seconds\n')
127127

128128
def pre_transfer(self, opts, source, sink):
129+
host, port, _, _, path = pump.parse_spec(opts, sink, 8091)
129130

130-
host, port, user, pwd, path = pump.parse_spec(opts, sink, 8091)
131+
# NOTE: Request is going to the destination cluster e.g. the one being recovered. Use the correct credentials.
132+
user, pwd = opts.username_dest, opts.password_dest
131133

132134
# retrieve a list of missing vbucket
133135
cmd = "startRecovery"
@@ -148,9 +150,8 @@ class Recovery(pump_transfer.Transfer):
148150

149151
def post_transfer(self, opts, source, sink, vbucket):
150152
if not self.sink_bucket:
151-
return "Should specify destionation bucket for restore"
153+
return "Should specify destination bucket for restore", None
152154

153-
host, port, user, pwd, path = pump.parse_spec(opts, sink, 8091)
154155
if opts.dry_run:
155156
cmd = "stopRecovery"
156157
reason_msg = "Stop recovery"
@@ -161,6 +162,12 @@ class Recovery(pump_transfer.Transfer):
161162
params = urllib.parse.urlencode({"vbucket": vbucket})
162163
else:
163164
params = None
165+
166+
host, port, _, _, path = pump.parse_spec(opts, sink, 8091)
167+
168+
# NOTE: Request is going to the destination cluster e.g. the one being recovered. Use the correct credentials.
169+
user, pwd = opts.username_dest, opts.password_dest
170+
164171
url = f'/pools/default/buckets/{self.sink_bucket}/controller/{cmd}?recovery_uuid={self.recovery_uuid}'
165172
post_headers = {"Content-type": "application/x-www-form-urlencoded"}
166173
err, _, _ = pump.rest_request(host, int(port), user, pwd, False, url, method='POST', body=params,

test/mock_server.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from cryptography.hazmat.primitives import hashes, serialization
1717
from cryptography.hazmat.primitives.asymmetric import rsa
1818
from cryptography.x509.oid import NameOID
19+
from requests.auth import HTTPBasicAuth
1920

2021

2122
# generate_self_signed_cert generates a key/self signed certificate pair which will be written to key.pem/cert.pem in
@@ -65,6 +66,10 @@ class RequestHandler(BaseHTTPRequestHandler):
6566
def do_GET(self):
6667
parsed = urlparse(self.path)
6768
self.server.rest_server.trace.append(f'GET:{parsed.path}')
69+
70+
if not self.authenticated():
71+
return
72+
6873
for (endpoint, fns) in endpoints:
6974
if re.search(endpoint, parsed.path) is not None and 'GET' in fns:
7075
return self.handle_fn(fns['GET'], parsed.path, parsed.query)
@@ -74,6 +79,10 @@ def do_GET(self):
7479
def do_POST(self):
7580
parsed = urlparse(self.path)
7681
self.server.rest_server.trace.append(f'POST:{parsed.path}')
82+
83+
if not self.authenticated():
84+
return
85+
7786
for (endpoint, fns) in endpoints:
7887
if re.search(endpoint, parsed.path) is not None and 'POST' in fns:
7988
return self.handle_fn(fns['POST'], parsed.path, parsed.query)
@@ -83,6 +92,10 @@ def do_POST(self):
8392
def do_PATCH(self):
8493
parsed = urlparse(self.path)
8594
self.server.rest_server.trace.append(f'PATCH:{parsed.path}')
95+
96+
if not self.authenticated():
97+
return
98+
8699
for (endpoint, fns) in endpoints:
87100
if re.search(endpoint, parsed.path) is not None and 'PATCH' in fns:
88101
return self.handle_fn(fns['PATCH'], parsed.path)
@@ -92,6 +105,10 @@ def do_PATCH(self):
92105
def do_PUT(self):
93106
parsed = urlparse(self.path)
94107
self.server.rest_server.trace.append(f'PUT:{parsed.path}')
108+
109+
if not self.authenticated():
110+
return
111+
95112
for (endpoint, fns) in endpoints:
96113
if re.search(endpoint, parsed.path) is not None and 'PUT' in fns:
97114
return self.handle_fn(fns['PUT'], parsed.path)
@@ -104,11 +121,23 @@ def do_DELETE(self):
104121
if parsed.query:
105122
self.server.rest_server.queries.append(parsed.query)
106123

124+
if not self.authenticated():
125+
return
126+
107127
for (endpoint, fns) in endpoints:
108128
if re.search(endpoint, parsed.path) is not None and 'DELETE' in fns:
109129
return self.handle_fn(fns['DELETE'], parsed.path, parsed.query)
110130
self.not_found()
111131

132+
def authenticated(self):
133+
# Username/password == Administrator/asdasd
134+
if self.headers.get('Authorization') == f"Basic QWRtaW5pc3RyYXRvcjphc2Rhc2Q=":
135+
return True
136+
137+
self.send_response(401)
138+
139+
return False
140+
112141
def not_found(self):
113142
self.send_response(404)
114143
self.finish()
@@ -117,7 +146,7 @@ def handle_fn(self, fn, path, params=None):
117146
content_len = int(self.headers.get('content-length', 0))
118147
post_body = self.rfile.read(content_len).decode('utf-8')
119148

120-
if self.headers.get('Content-Type', 'application/x-www-form-urlencoded') == 'application/json':
149+
if post_body and self.headers.get('Content-Type', 'application/x-www-form-urlencoded') == 'application/json':
121150
# to help with verifying later on we are going to load this json and then dump it again but with sorted keys
122151
# to ensure stable serializing so then we can do string comparison on the results
123152
self.server.rest_server.rest_params.append(json.dumps(json.loads(post_body), sort_keys=True))
@@ -215,7 +244,7 @@ def shutdown(self):
215244

216245
def _close(self, url, server, t):
217246
try:
218-
requests.get(f'{url}/close', timeout=0.2, verify=False)
247+
requests.get(f'{url}/close', auth=HTTPBasicAuth('Administrator', 'asdasd'), timeout=0.2, verify=False)
219248
except Exception:
220249
pass
221250

@@ -541,6 +570,9 @@ def export_eventing_functions(rest_params=None, server_args=None, path="", endpo
541570
(r'/controller/uploadClusterCA', {'POST': do_nothing}),
542571
(r'/controller/regenerateCertificate', {'POST': get_by_path}),
543572

573+
# cbrecovery
574+
(r'/pools/default/buckets/\w+/controller/startRecovery', {'POST': get_by_path}),
575+
(r'/pools/default/buckets/\w+/controller/commitVBucket', {'POST': get_by_path}),
544576

545577
# index api
546578
(r'/getIndexMetadata$', {'GET': get_by_path}),

test/test_cbrecovery.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#!/usr/bin/env python3
2+
3+
# Copyright 2022 Couchbase Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import imp
18+
import json
19+
import os
20+
import unittest
21+
from optparse import Values
22+
23+
from mock_server import MockRESTServer, generate_self_signed_cert
24+
25+
cbrecovery = imp.load_source('cbrecovery', './cbrecovery')
26+
27+
host = '127.0.0.1'
28+
port = 6789
29+
30+
# setUpModule will generate new certificates for the mock HTTPS server used during unit testing.
31+
#
32+
# NOTE: We don't remove the key/cert once generated since they're included in the '.gitignore' file.
33+
34+
35+
def setUpModule():
36+
generate_self_signed_cert(os.path.dirname(os.path.abspath(__file__)))
37+
38+
39+
class RecoveryTest(unittest.TestCase):
40+
def test_pre_transfer(self):
41+
server = MockRESTServer(host, port)
42+
server.set_args({"/pools/default/buckets/bucket/controller/startRecovery": {}})
43+
server.run()
44+
45+
recovery = cbrecovery.Recovery()
46+
recovery.sink_bucket = "bucket"
47+
48+
opts = Values({
49+
"username": "",
50+
"password": "",
51+
# Assert that we use these credentials, and not the empty source credentials
52+
"username_dest": "Administrator",
53+
"password_dest": "asdasd",
54+
"ssl": False,
55+
})
56+
57+
err, _ = recovery.pre_transfer(opts, "", f"http://{host}:{port}")
58+
self.assertEqual("Missing recovery map from response", err)
59+
self.assertIn('POST:/pools/default/buckets/bucket/controller/startRecovery', server.trace)
60+
61+
server.shutdown()
62+
63+
def test_post_transfer(self):
64+
server = MockRESTServer(host, port)
65+
server.run()
66+
67+
recovery = cbrecovery.Recovery()
68+
recovery.sink_bucket = "bucket"
69+
70+
opts = Values({
71+
"username": "",
72+
"password": "",
73+
# Assert that we use these credentials, and not the empty source credentials
74+
"username_dest": "Administrator",
75+
"password_dest": "asdasd",
76+
"dry_run": False,
77+
"ssl": False,
78+
})
79+
80+
err, _ = recovery.post_transfer(opts, "", f"http://{host}:{port}", 0)
81+
self.assertIn('POST:/pools/default/buckets/bucket/controller/commitVBucket', server.trace)
82+
self.assertIsNone(err)
83+
84+
server.shutdown()

test/test_pumps.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,8 @@ def test_provide_index(self):
546546
'/getIndexMetadata': {'result': 'INDEX'}}
547547
server.set_args(server_args)
548548
server.run()
549-
opts = Ditto({'username': 'admin', 'password': 'asdasd', 'ssl': False, 'no_ssl_verify': True, 'cacert': None})
549+
opts = Ditto({'username': 'Administrator', 'password': 'asdasd',
550+
'ssl': False, 'no_ssl_verify': True, 'cacert': None})
550551
error, index = DCPStreamSource.provide_index(opts, 'http://127.0.0.1:9211', {'name': 'default'}, None)
551552
server.shutdown()
552553
self.assertEqual(0, error, 'Got unexpected error: {}'.format(error))
@@ -562,7 +563,8 @@ def test_provide_fts_index(self):
562563
}}}}
563564
server.set_args(server_args)
564565
server.run()
565-
opts = Ditto({'username': 'admin', 'password': 'asdasd', 'ssl': False, 'no_ssl_verify': True, 'cacert': None})
566+
opts = Ditto({'username': 'Administrator', 'password': 'asdasd',
567+
'ssl': False, 'no_ssl_verify': True, 'cacert': None})
566568
error, index = DCPStreamSource.provide_fts_index(opts, 'http://127.0.0.1:9211', {'name': 'default'}, None)
567569
server.shutdown()
568570
self.assertEqual(0, error, 'Got unexpected error: {}'.format(error))
@@ -1248,9 +1250,9 @@ def handle_sasl_auth_plain(self, data, req, log=False):
12481250
cas=cas)
12491251
return
12501252
user_pass = body[keylen:]
1251-
if user_pass != b'\x00admin\x00asdasd':
1253+
if user_pass != b'\x00Administrator\x00asdasd':
12521254
self.failed = True
1253-
self.reason = "Received {} expected {}".format(user_pass, b'\x00admin\x00asdasd')
1255+
self.reason = "Received {} expected {}".format(user_pass, b'\x00Administrator\x00asdasd')
12541256
self.res(req, cbcs.CMD_SASL_AUTH, errcode=cbcs.ERR_EINTERNAL, body=self.reason.encode(), opaque=opaque,
12551257
cas=cas)
12561258
return
@@ -1339,7 +1341,7 @@ class TestMemcachedClient(unittest.TestCase):
13391341
def setUp(self):
13401342
self.server = MockMemcachedServer(debug=False)
13411343
self.host, self.port = self.server.get_host_address()
1342-
self.helper_class = MCHelper({'response': b'res_data', 'user': 'admin', 'pass': 'asdasd'})
1344+
self.helper_class = MCHelper({'response': b'res_data', 'user': 'Administrator', 'pass': 'asdasd'})
13431345

13441346
def tearDown(self):
13451347
self.server.stop()
@@ -1397,7 +1399,7 @@ def test_sasl_auth_plain(self):
13971399

13981400
try:
13991401
client = MemcachedClient(self.host, self.port)
1400-
opaque_o, cas_o, data = client.sasl_auth_plain('admin', 'asdasd')
1402+
opaque_o, cas_o, data = client.sasl_auth_plain('Administrator', 'asdasd')
14011403
except Exception as e:
14021404
client.close()
14031405
self.server.stop()

0 commit comments

Comments
 (0)