Skip to content

Commit f047ba8

Browse files
authored
Interpret pipeline libraries relative to spec (#368)
The library paths in a pipeline spec should be interpreted as relative to the spec's location, and not relative to the current working directory.
1 parent 3ed4868 commit f047ba8

4 files changed

Lines changed: 36 additions & 29 deletions

File tree

databricks_cli/pipelines/api.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ def __init__(self, api_client):
4040
self.client = DeltaPipelinesService(api_client)
4141
self.dbfs_client = DbfsApi(api_client)
4242

43-
def create(self, spec, allow_duplicate_names, headers=None):
44-
data = self._upload_libraries_and_update_spec(spec)
43+
def create(self, spec, spec_dir, allow_duplicate_names, headers=None):
44+
data = self._upload_libraries_and_update_spec(spec, spec_dir)
4545
data['allow_duplicate_names'] = allow_duplicate_names
4646
return self.client.client.perform_query('POST', '/pipelines', data=data,
4747
headers=headers)
4848

49-
def deploy(self, spec, allow_duplicate_names, headers=None):
50-
data = self._upload_libraries_and_update_spec(spec)
49+
def deploy(self, spec, spec_dir, allow_duplicate_names, headers=None):
50+
data = self._upload_libraries_and_update_spec(spec, spec_dir)
5151
data['allow_duplicate_names'] = allow_duplicate_names
5252
pipeline_id = data['id']
5353
self.client.client.perform_query('PUT', '/pipelines/{}'.format(pipeline_id), data=data,
@@ -89,13 +89,13 @@ def run(self, pipeline_id, headers=None):
8989
def stop(self, pipeline_id, headers=None):
9090
self.client.stop(pipeline_id, headers)
9191

92-
def _upload_libraries_and_update_spec(self, spec):
92+
def _upload_libraries_and_update_spec(self, spec, spec_dir):
9393
spec = copy.deepcopy(spec)
9494
lib_objects = LibraryObject.from_json(spec.get('libraries', []))
9595
local_lib_objects, external_lib_objects = self._identify_local_libraries(lib_objects)
9696

97-
spec['libraries'] = LibraryObject.to_json(external_lib_objects +
98-
self._upload_local_libraries(local_lib_objects))
97+
spec['libraries'] = LibraryObject.to_json(
98+
external_lib_objects + self._upload_local_libraries(spec_dir, local_lib_objects))
9999
return spec
100100

101101
@staticmethod
@@ -127,14 +127,15 @@ def _identify_local_libraries(lib_objects):
127127
external_lib_objects.append(lib_object)
128128
return local_lib_objects, external_lib_objects
129129

130-
def _upload_local_libraries(self, local_lib_objects):
131-
remote_lib_objects = [LibraryObject(llo.lib_type, self._get_hashed_path(llo.path))
132-
for llo in local_lib_objects]
133-
130+
def _upload_local_libraries(self, spec_dir, local_lib_objects):
131+
relative_local_lib_objects = [LibraryObject(llo.lib_type, os.path.join(spec_dir, llo.path))
132+
for llo in local_lib_objects]
133+
remote_lib_objects = [LibraryObject(rllo.lib_type, self._get_hashed_path(rllo.path))
134+
for rllo in relative_local_lib_objects]
134135
transformed_remote_lib_objects = [LibraryObject(rlo.lib_type, DbfsPath(rlo.path))
135136
for rlo in remote_lib_objects]
136137
upload_files = [llo_tuple for llo_tuple in
137-
zip(local_lib_objects, transformed_remote_lib_objects)
138+
zip(relative_local_lib_objects, transformed_remote_lib_objects)
138139
if not self.dbfs_client.file_exists(llo_tuple[1].path)]
139140

140141
for llo, rlo in upload_files:

databricks_cli/pipelines/cli.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,10 @@ def deploy_cli(api_client, spec_arg, spec, allow_duplicate_names, pipeline_id):
9191
raise ValueError('The spec should be provided either by an option or argument')
9292
src = spec_arg if bool(spec_arg) else spec
9393
spec_obj = _read_spec(src)
94+
spec_dir = os.path.dirname(src)
9495
if not pipeline_id and 'id' not in spec_obj:
9596
try:
96-
response = PipelinesApi(api_client).create(spec_obj, allow_duplicate_names)
97+
response = PipelinesApi(api_client).create(spec_obj, spec_dir, allow_duplicate_names)
9798
except requests.exceptions.HTTPError as e:
9899
_handle_duplicate_name_exception(spec_obj, e)
99100

@@ -116,7 +117,7 @@ def deploy_cli(api_client, spec_arg, spec, allow_duplicate_names, pipeline_id):
116117
_validate_pipeline_id(spec_obj['id'])
117118

118119
try:
119-
PipelinesApi(api_client).deploy(spec_obj, allow_duplicate_names)
120+
PipelinesApi(api_client).deploy(spec_obj, spec_dir, allow_duplicate_names)
120121
except requests.exceptions.HTTPError as e:
121122
_handle_duplicate_name_exception(spec_obj, e)
122123
click.echo("Successfully deployed pipeline: {}".format(
@@ -292,7 +293,9 @@ def _handle_duplicate_name_exception(spec, exception):
292293

293294
if error_code == 'RESOURCE_CONFLICT':
294295
raise ValueError("Pipeline with name '{}' already exists. ".format(spec['name']) +
295-
"You can use the --allow-duplicate-names option to skip this check.")
296+
"If you are updating an existing pipeline, provide the pipeline " +
297+
"id using --pipeline-id. Otherwise, " +
298+
"you can use the --allow-duplicate-names option to skip this check. ")
296299
raise exception
297300

298301

tests/pipelines/test_api.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,21 @@ def _test_library_uploads(pipelines_api, api_method, spec, put_file_mock, dbfs_p
103103
# set-up the test
104104
jar1 = tmpdir.join('jar1.jar').strpath
105105
jar2 = tmpdir.join('jar2.jar').strpath
106-
jar3 = tmpdir.join('jar3.jar').strpath
106+
jar3dir = 'some/relative/path'
107+
jar3absdir = tmpdir.join(jar3dir).strpath
108+
jar3relpath = os.path.join(jar3dir, 'jar3.jar')
109+
jar3abspath = tmpdir.join(jar3relpath).strpath
107110
jar4 = tmpdir.join('jar4.jar').strpath
108111
wheel1 = tmpdir.join('wheel-name-conv.whl').strpath
109-
jar3_relpath = os.path.relpath(jar3, os.getcwd())
110112
jar4_file_prefix = 'file:{}'.format(jar4)
111113
remote_path_456 = 'dbfs:/pipelines/code/51eac6b471a284d3341d8c0c63d0f1a286262a18.jar'
112114

113115
with open(jar1, 'w') as f:
114116
f.write('123')
115117
with open(jar2, 'w') as f:
116118
f.write('456')
117-
with open(jar3, 'w') as f:
119+
os.makedirs(jar3absdir)
120+
with open(jar3abspath, 'w') as f:
118121
f.write('456')
119122
with open(jar4, 'w') as f:
120123
f.write('456')
@@ -127,7 +130,7 @@ def _test_library_uploads(pipelines_api, api_method, spec, put_file_mock, dbfs_p
127130
{'unknown': '/foo/bar'},
128131
{'jar': jar1},
129132
{'jar': jar2},
130-
{'jar': jar3_relpath},
133+
{'jar': jar3relpath},
131134
{'jar': jar4_file_prefix},
132135
{'whl': wheel1},
133136
]
@@ -152,12 +155,12 @@ def _test_library_uploads(pipelines_api, api_method, spec, put_file_mock, dbfs_p
152155
]
153156
expected_data['allow_duplicate_names'] = allow_duplicate_names
154157

155-
api_method(spec, allow_duplicate_names)
158+
api_method(spec, tmpdir.strpath, allow_duplicate_names)
156159
assert dbfs_path_validate.call_count == 5
157160
assert put_file_mock.call_count == 4
158161
assert put_file_mock.call_args_list[0][0][0] == jar2
159162
assert put_file_mock.call_args_list[0][0][1].absolute_path == remote_path_456
160-
assert put_file_mock.call_args_list[1][0][0] == jar3_relpath
163+
assert put_file_mock.call_args_list[1][0][0] == jar3abspath
161164
assert put_file_mock.call_args_list[2][0][0] == jar4
162165
assert put_file_mock.call_args_list[3][0][0] == wheel1
163166
client_mock = pipelines_api.client.client.perform_query
@@ -171,13 +174,13 @@ def test_create(pipelines_api):
171174
spec = copy.deepcopy(SPEC_WITHOUT_ID)
172175
spec['libraries'] = []
173176

174-
pipelines_api.create(spec, allow_duplicate_names=False)
177+
pipelines_api.create(spec, spec_dir='.', allow_duplicate_names=False)
175178
data = copy.deepcopy(spec)
176179
data['allow_duplicate_names'] = False
177180
client_mock.assert_called_with("POST", "/pipelines", data=data, headers=None)
178181
assert client_mock.call_count == 1
179182

180-
pipelines_api.create(spec, allow_duplicate_names=True, headers=HEADERS)
183+
pipelines_api.create(spec, spec_dir='.', allow_duplicate_names=True, headers=HEADERS)
181184
data = copy.deepcopy(spec)
182185
data['allow_duplicate_names'] = True
183186
client_mock.assert_called_with("POST", "/pipelines", data=data, headers=HEADERS)
@@ -190,13 +193,13 @@ def test_deploy(pipelines_api):
190193
spec = copy.deepcopy(SPEC)
191194
spec['libraries'] = []
192195

193-
pipelines_api.deploy(spec, allow_duplicate_names=False)
196+
pipelines_api.deploy(spec, spec_dir='.', allow_duplicate_names=False)
194197
data = copy.deepcopy(spec)
195198
data['allow_duplicate_names'] = False
196199
client_mock.assert_called_with("PUT", "/pipelines/" + PIPELINE_ID, data=data, headers=None)
197200
assert client_mock.call_count == 1
198201

199-
pipelines_api.deploy(spec, allow_duplicate_names=True, headers=HEADERS)
202+
pipelines_api.deploy(spec, spec_dir='.', allow_duplicate_names=True, headers=HEADERS)
200203
data = copy.deepcopy(spec)
201204
data['allow_duplicate_names'] = True
202205
client_mock.assert_called_with("PUT", "/pipelines/" + PIPELINE_ID, data=data, headers=HEADERS)

tests/pipelines/test_cli.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,19 +273,19 @@ def test_allow_duplicate_names_flag(pipelines_api_mock, tmpdir):
273273
f.write(DEPLOY_SPEC_NO_ID)
274274
runner = CliRunner()
275275
runner.invoke(cli.deploy_cli, [path])
276-
assert pipelines_api_mock.create.call_args_list[0][0][1] is False
276+
assert pipelines_api_mock.create.call_args_list[0][0][2] is False
277277

278278
runner.invoke(cli.deploy_cli, [path, "--allow-duplicate-names"])
279-
assert pipelines_api_mock.create.call_args_list[1][0][1] is True
279+
assert pipelines_api_mock.create.call_args_list[1][0][2] is True
280280

281281
with open(path, 'w') as f:
282282
f.write(DEPLOY_SPEC)
283283

284284
runner.invoke(cli.deploy_cli, [path])
285-
assert pipelines_api_mock.deploy.call_args_list[0][0][1] is False
285+
assert pipelines_api_mock.deploy.call_args_list[0][0][2] is False
286286

287287
runner.invoke(cli.deploy_cli, [path, "--allow-duplicate-names"])
288-
assert pipelines_api_mock.deploy.call_args_list[1][0][1] is True
288+
assert pipelines_api_mock.deploy.call_args_list[1][0][2] is True
289289

290290

291291
@provide_conf

0 commit comments

Comments
 (0)