|
35 | 35 | Runtime, |
36 | 36 | TelemetryConfig, |
37 | 37 | ) |
38 | | -from temporalio.service import RPCError |
| 38 | +from temporalio.service import RPCError, RPCStatusCode |
39 | 39 | from temporalio.testing import WorkflowEnvironment |
40 | 40 | from temporalio.worker import ( |
41 | 41 | ActivitySlotInfo, |
@@ -1192,14 +1192,40 @@ async def mk_call() -> DescribeWorkerDeploymentResponse: |
1192 | 1192 | async def set_current_deployment_version( |
1193 | 1193 | client: Client, conflict_token: bytes, version: WorkerDeploymentVersion |
1194 | 1194 | ) -> SetWorkerDeploymentCurrentVersionResponse: |
1195 | | - return await client.workflow_service.set_worker_deployment_current_version( |
1196 | | - SetWorkerDeploymentCurrentVersionRequest( |
1197 | | - namespace=client.namespace, |
1198 | | - deployment_name=version.deployment_name, |
1199 | | - version=version.to_canonical_string(), |
1200 | | - conflict_token=conflict_token, |
1201 | | - ) |
1202 | | - ) |
| 1195 | + async def mk_call() -> SetWorkerDeploymentCurrentVersionResponse: |
| 1196 | + nonlocal conflict_token |
| 1197 | + try: |
| 1198 | + return await client.workflow_service.set_worker_deployment_current_version( |
| 1199 | + SetWorkerDeploymentCurrentVersionRequest( |
| 1200 | + namespace=client.namespace, |
| 1201 | + deployment_name=version.deployment_name, |
| 1202 | + version=version.to_canonical_string(), |
| 1203 | + conflict_token=conflict_token, |
| 1204 | + ) |
| 1205 | + ) |
| 1206 | + except RPCError as err: |
| 1207 | + if err.status != RPCStatusCode.CANCELLED: |
| 1208 | + raise |
| 1209 | + describe_resp = await client.workflow_service.describe_worker_deployment( |
| 1210 | + DescribeWorkerDeploymentRequest( |
| 1211 | + namespace=client.namespace, |
| 1212 | + deployment_name=version.deployment_name, |
| 1213 | + ) |
| 1214 | + ) |
| 1215 | + current_version = ( |
| 1216 | + describe_resp.worker_deployment_info.routing_config.current_deployment_version |
| 1217 | + ) |
| 1218 | + if ( |
| 1219 | + current_version.deployment_name == version.deployment_name |
| 1220 | + and current_version.build_id == version.build_id |
| 1221 | + ): |
| 1222 | + return SetWorkerDeploymentCurrentVersionResponse( |
| 1223 | + conflict_token=describe_resp.conflict_token |
| 1224 | + ) |
| 1225 | + conflict_token = describe_resp.conflict_token |
| 1226 | + assert False |
| 1227 | + |
| 1228 | + return await assert_eventually(mk_call, retry_on_rpc_cancelled=False) |
1203 | 1229 |
|
1204 | 1230 |
|
1205 | 1231 | async def set_ramping_version( |
|
0 commit comments