Skip to content

Commit 8213863

Browse files
null-sleepMukul Murthy
authored andcommitted
Pipelines CLI Fixes (#298)
This PR makes the following changes to the Pipelines CLI: * A successful spec deployment prints the URL to the pipeline. * If the provided spec file does not have an ID, one is generated and the spec file is updated with the new ID. * Adds a new exception eater to print cleaner error messages from the pipelines API * Adds exception handing for specs with invalid json * A validation check for pipeline id: only -, _ and alphanumeric characters are allowed Testing: * Unit Tests * Manually verified deployment URLs, new exception formatting, validation of pipeline ids
1 parent 5efc0f6 commit 8213863

4 files changed

Lines changed: 207 additions & 11 deletions

File tree

databricks_cli/pipelines/cli.py

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,31 @@
2121
# See the License for the specific language governing permissions and
2222
# limitations under the License.
2323

24-
from json import loads as json_loads
2524
import os
25+
import uuid
26+
import json
27+
import string
28+
29+
try:
30+
from urlparse import urlparse, urljoin
31+
except ImportError:
32+
from urllib.parse import urlparse, urljoin
2633

2734
import click
2835

2936
from databricks_cli.click_types import PipelineSpecClickType, PipelineIdClickType
30-
from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS, pretty_format
3137
from databricks_cli.version import print_version_callback, version
3238
from databricks_cli.pipelines.api import PipelinesApi
3339
from databricks_cli.configure.config import provide_api_client, profile_option, debug_option
40+
from databricks_cli.utils import pipelines_exception_eater, CONTEXT_SETTINGS, pretty_format, \
41+
error_and_quit
42+
43+
try:
44+
json_parse_exception = json.decoder.JSONDecodeError
45+
except AttributeError: # Python 2
46+
json_parse_exception = ValueError
47+
48+
PIPELINE_ID_PERMITTED_CHARACTERS = set(string.ascii_letters + string.digits + '-_')
3449

3550

3651
@click.command(context_settings=CONTEXT_SETTINGS,
@@ -39,7 +54,7 @@
3954
@click.option('--spec', default=None, type=PipelineSpecClickType(), help=PipelineSpecClickType.help)
4055
@debug_option
4156
@profile_option
42-
@eat_exceptions
57+
@pipelines_exception_eater
4358
@provide_api_client
4459
def deploy_cli(api_client, spec_arg, spec):
4560
"""
@@ -59,8 +74,19 @@ def deploy_cli(api_client, spec_arg, spec):
5974
raise RuntimeError('The spec should be provided either by an option or argument')
6075
src = spec_arg if bool(spec_arg) else spec
6176
spec_obj = _read_spec(src)
77+
if 'id' not in spec_obj:
78+
pipeline_id = str(uuid.uuid4())
79+
click.echo("Updating spec at {} with id: {}".format(src, pipeline_id))
80+
spec_obj['id'] = pipeline_id
81+
_write_spec(src, spec_obj)
82+
_validate_pipeline_id(spec_obj['id'])
6283
PipelinesApi(api_client).deploy(spec_obj)
6384

85+
pipeline_id = spec_obj['id']
86+
base_url = "{0.scheme}://{0.netloc}/".format(urlparse(api_client.url))
87+
pipeline_url = urljoin(base_url, "#joblist/pipelines/{}".format(pipeline_id))
88+
click.echo("Pipeline successfully deployed: {}".format(pipeline_url))
89+
6490

6591
@click.command(context_settings=CONTEXT_SETTINGS,
6692
short_help='Stops a delta pipeline and deletes its associated Databricks resources')
@@ -70,7 +96,7 @@ def deploy_cli(api_client, spec_arg, spec):
7096
help=PipelineIdClickType.help)
7197
@debug_option
7298
@profile_option
73-
@eat_exceptions
99+
@pipelines_exception_eater
74100
@provide_api_client
75101
def delete_cli(api_client, spec_arg, spec, pipeline_id):
76102
"""
@@ -91,6 +117,7 @@ def delete_cli(api_client, spec_arg, spec, pipeline_id):
91117
"""
92118
pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id)
93119
PipelinesApi(api_client).delete(pipeline_id)
120+
click.echo("Pipeline {} deleted".format(pipeline_id))
94121

95122

96123
@click.command(context_settings=CONTEXT_SETTINGS,
@@ -101,7 +128,7 @@ def delete_cli(api_client, spec_arg, spec, pipeline_id):
101128
help=PipelineIdClickType.help)
102129
@debug_option
103130
@profile_option
104-
@eat_exceptions
131+
@pipelines_exception_eater
105132
@provide_api_client
106133
def get_cli(api_client, spec_arg, spec, pipeline_id):
107134
"""
@@ -131,7 +158,7 @@ def get_cli(api_client, spec_arg, spec, pipeline_id):
131158
help=PipelineIdClickType.help)
132159
@debug_option
133160
@profile_option
134-
@eat_exceptions
161+
@pipelines_exception_eater
135162
@provide_api_client
136163
def reset_cli(api_client, spec_arg, spec, pipeline_id):
137164
"""
@@ -152,6 +179,7 @@ def reset_cli(api_client, spec_arg, spec, pipeline_id):
152179
"""
153180
pipeline_id = _get_pipeline_id(spec_arg=spec_arg, spec=spec, pipeline_id=pipeline_id)
154181
PipelinesApi(api_client).reset(pipeline_id)
182+
click.echo("Reset triggered for pipeline {}".format(pipeline_id))
155183

156184

157185
def _read_spec(src):
@@ -161,13 +189,25 @@ def _read_spec(src):
161189
"""
162190
extension = os.path.splitext(src)[1]
163191
if extension.lower() == '.json':
164-
with open(src, 'r') as f:
165-
json = f.read()
166-
return json_loads(json)
192+
try:
193+
with open(src, 'r') as f:
194+
data = f.read()
195+
return json.loads(data)
196+
except json_parse_exception as e:
197+
error_and_quit("Invalid JSON provided in spec\n{}".format(e))
167198
else:
168199
raise RuntimeError('The provided file extension for the spec is not supported')
169200

170201

202+
def _write_spec(src, spec):
203+
"""
204+
Writes the spec at src as JSON.
205+
"""
206+
data = json.dumps(spec, indent=2) + '\n'
207+
with open(src, 'w') as f:
208+
f.write(data)
209+
210+
171211
def _get_pipeline_id(spec_arg, spec, pipeline_id):
172212
"""
173213
Ensures that the user has either specified a spec (either through argument or option) or a
@@ -180,9 +220,22 @@ def _get_pipeline_id(spec_arg, spec, pipeline_id):
180220
if bool(spec_arg) or bool(spec):
181221
src = spec_arg if bool(spec_arg) else spec
182222
pipeline_id = _read_spec(src)["id"]
223+
_validate_pipeline_id(pipeline_id)
183224
return pipeline_id
184225

185226

227+
def _validate_pipeline_id(pipeline_id):
228+
"""
229+
Checks if the pipeline_id only contain -, _ and alphanumeric characters
230+
"""
231+
if len(pipeline_id) == 0:
232+
error_and_quit(u'Empty pipeline id provided')
233+
if not set(pipeline_id) <= PIPELINE_ID_PERMITTED_CHARACTERS:
234+
message = u'Pipeline id {} has invalid character(s)\n'.format(pipeline_id)
235+
message += u'Valid characters are: _ - a-z A-Z 0-9'
236+
error_and_quit(message)
237+
238+
186239
@click.group(context_settings=CONTEXT_SETTINGS,
187240
short_help='Utility to interact with the Databricks Delta Pipelines.')
188241
@click.option('--version', '-v', is_flag=True, callback=print_version_callback,

databricks_cli/utils.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,41 @@ def decorator(*args, **kwargs):
4646
'reconfigure with ``dbfs configure``')
4747
else:
4848
error_and_quit(exception.response.content)
49-
except Exception as exception: # noqa
49+
except Exception as exception: # noqa
5050
if not DEBUG_MODE:
5151
error_and_quit('{}: {}'.format(type(exception).__name__, str(exception)))
52+
53+
decorator.__doc__ = function.__doc__
54+
return decorator
55+
56+
57+
def pipelines_exception_eater(function):
58+
"""
59+
Formats error messages from the pipelines API while keeping the existing
60+
behavior of eat_exception
61+
"""
62+
63+
@six.wraps(function)
64+
def decorator(*args, **kwargs):
65+
try:
66+
return function(*args, **kwargs)
67+
except HTTPError as exception: # noqa
68+
if exception.response.status_code == 401:
69+
error_and_quit('Your authentication information may be incorrect. Please '
70+
+ 'reconfigure with ``dbfs configure``')
71+
else:
72+
try:
73+
exp_context = json_loads(exception.response.content.decode('utf-8'))
74+
message = exception.response.content
75+
if 'error_code' in exp_context and 'message' in exp_context:
76+
message = exp_context['error_code'] + '\n' + exp_context['message']
77+
error_and_quit(message)
78+
except Exception: # noqa
79+
error_and_quit(exception.response.content)
80+
except Exception as exception: # noqa
81+
if not DEBUG_MODE:
82+
error_and_quit('{}: {}'.format(type(exception).__name__, str(exception)))
83+
5284
decorator.__doc__ = function.__doc__
5385
return decorator
5486

@@ -58,7 +90,7 @@ def error_and_quit(message):
5890
context_object = ctx.ensure_object(ContextObject)
5991
if context_object.debug_mode:
6092
traceback.print_exc()
61-
click.echo('Error: {}'.format(message))
93+
click.echo(u'Error: {}'.format(message))
6294
sys.exit(1)
6395

6496

tests/pipelines/test_cli.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
# pylint:disable=redefined-outer-name
2525

2626
import json
27+
2728
import mock
2829
import pytest
30+
from click import Context, Command
2931
from click.testing import CliRunner
3032

3133
import databricks_cli.pipelines.cli as cli
@@ -43,6 +45,15 @@ def pipelines_api_mock():
4345
yield _pipelines_api_mock
4446

4547

48+
@pytest.fixture()
49+
def click_ctx():
50+
"""
51+
A dummy Click context to allow testing of methods that raise exceptions. Fixes Click capturing
52+
actual exceptions and raising its own `RuntimeError: There is no active click context`.
53+
"""
54+
return Context(Command('cmd'))
55+
56+
4657
@provide_conf
4758
def test_deploy_cli_spec_arg(pipelines_api_mock, tmpdir):
4859
path = tmpdir.join('/spec.json').strpath
@@ -124,6 +135,31 @@ def test_delete_cli_incorrect_parameters(pipelines_api_mock, tmpdir):
124135
assert pipelines_api_mock.delete.call_count == 0
125136

126137

138+
@provide_conf
139+
def test_deploy_spec_updated_with_id_if_pipeline_id_not_in_spec(tmpdir):
140+
path = tmpdir.join('/spec.json').strpath
141+
spec_with_no_id = '{"name": "no id"}'
142+
with open(path, 'w') as f:
143+
f.write(spec_with_no_id)
144+
runner = CliRunner()
145+
runner.invoke(cli.deploy_cli, ['--spec', path])
146+
with open(path, 'r') as f:
147+
updated_spec = json.loads(f.read())
148+
assert 'id' in updated_spec
149+
150+
151+
@provide_conf
152+
def test_deploy_spec_pipeline_id_is_not_changed_if_provided_in_spec(tmpdir):
153+
path = tmpdir.join('/spec.json').strpath
154+
with open(path, 'w') as f:
155+
f.write(DEPLOY_SPEC)
156+
runner = CliRunner()
157+
runner.invoke(cli.deploy_cli, ['--spec', path])
158+
with open(path, 'r') as f:
159+
spec = json.loads(f.read())
160+
assert spec['id'] == '123'
161+
162+
127163
@provide_conf
128164
def test_deploy_delete_cli_incorrect_spec_extension(pipelines_api_mock, tmpdir):
129165
path = tmpdir.join('/spec.wrong_ext').strpath
@@ -244,3 +280,15 @@ def test_get_cli_no_id(pipelines_api_mock):
244280
result = runner.invoke(cli.get_cli, [])
245281
assert result.exit_code == 1
246282
assert pipelines_api_mock.get.call_count == 0
283+
284+
285+
def test_validate_pipeline_id(click_ctx):
286+
empty_pipeline_id = ''
287+
pipeline_id_with_unicode = b'pipeline_id-\xe2\x9d\x8c-123'.decode('utf-8')
288+
invalid_pipline_ids = ['pipeline_id-?-123', 'pipeline_id-\\-\'-123', 'pipeline_id-/-123',
289+
pipeline_id_with_unicode, empty_pipeline_id]
290+
with click_ctx:
291+
for pipline_id in invalid_pipline_ids:
292+
with pytest.raises(SystemExit):
293+
cli._validate_pipeline_id(pipline_id)
294+
assert cli._validate_pipeline_id('pipeline_id-ac345cd1') is None

tests/test_utils.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def test_eat_exceptions_normal_case():
3333
"""
3434
If no exceptions, this wrapper should do nothing.
3535
"""
36+
3637
@utils.eat_exceptions
3738
def test_function(x):
3839
return x
@@ -50,11 +51,73 @@ def test_function():
5051
resp = Response()
5152
resp.status_code = 401
5253
raise HTTPError(response=resp)
54+
55+
test_function()
56+
assert error_and_quit_mock.call_count == 1
57+
assert 'Your authentication information' in error_and_quit_mock.call_args[0][0]
58+
59+
60+
def test_pipelines_exception_eater_normal_case():
61+
"""
62+
If no exceptions, this wrapper should do nothing.
63+
"""
64+
65+
@utils.pipelines_exception_eater
66+
def test_function(x):
67+
return x
68+
69+
assert test_function(1) == 1
70+
71+
72+
def test_pipelines_exception_eater_http_error_401():
73+
"""
74+
If wrapped function returns 401 HTTPError, then print special error message.
75+
"""
76+
with mock.patch('databricks_cli.utils.error_and_quit') as error_and_quit_mock:
77+
@utils.pipelines_exception_eater
78+
def test_function():
79+
resp = Response()
80+
resp.status_code = 401
81+
raise HTTPError(response=resp)
82+
5383
test_function()
5484
assert error_and_quit_mock.call_count == 1
5585
assert 'Your authentication information' in error_and_quit_mock.call_args[0][0]
5686

5787

88+
def test_pipelines_exception_eater_non_401_http_error():
89+
"""
90+
If wrapped function returns a non 401 HTTPError, then try to parse json response
91+
to print a formatted error message.
92+
"""
93+
with mock.patch('databricks_cli.utils.error_and_quit') as error_and_quit_mock:
94+
@utils.pipelines_exception_eater
95+
def test_function(content):
96+
resp = Response()
97+
resp.status_code = 400
98+
resp._content_consumed = True
99+
resp._content = content
100+
raise HTTPError(response=resp)
101+
102+
test_function(content=b'{"error_code":"TEST_ERROR_CODE","message":"test message"}')
103+
assert error_and_quit_mock.call_count == 1
104+
assert error_and_quit_mock.call_args[0][0] == 'TEST_ERROR_CODE\ntest message'
105+
test_function(content=b'{"message":"test message"}')
106+
assert error_and_quit_mock.call_count == 2
107+
assert error_and_quit_mock.call_args[0][0] == b'{"message":"test message"}'
108+
109+
110+
def test_pipelines_exception_eater_non_http_error_exceptions():
111+
with mock.patch('databricks_cli.utils.error_and_quit') as error_and_quit_mock:
112+
@utils.pipelines_exception_eater
113+
def test_function():
114+
raise ValueError('value error test message')
115+
116+
test_function()
117+
assert error_and_quit_mock.call_count == 1
118+
assert error_and_quit_mock.call_args[0][0] == 'ValueError: value error test message'
119+
120+
58121
def test_json_cli_base_both_args():
59122
with pytest.raises(RuntimeError):
60123
utils.json_cli_base('a', 'b', mock.Mock())

0 commit comments

Comments
 (0)