-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathOperatorController.cs
More file actions
95 lines (85 loc) · 4.37 KB
/
OperatorController.cs
File metadata and controls
95 lines (85 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace Synapse.Operator.Services;
/// <summary>
/// Represents the default implementation of the <see cref="IOperatorController"/> interface
/// </summary>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
/// <param name="options">The current <see cref="OperatorOptions"/></param>
public class OperatorController(IResourceRepository repository, IOptionsMonitor<OperatorOptions> options)
: IOperatorController
{
/// <summary>
/// Gets the service used to manage <see cref="IResource"/>s
/// </summary>
protected IResourceRepository Repository { get; } = repository;
/// <summary>
/// Gets the current <see cref="OperatorOptions"/>
/// </summary>
protected OperatorOptions Options => options.CurrentValue;
/// <inheritdoc/>
public IResourceMonitor<Resources.Operator> Operator { get; protected set; } = null!;
/// <inheritdoc/>
public virtual async Task StartAsync(CancellationToken cancellationToken)
{
Resources.Operator? @operator = null;
try
{
@operator = await this.Repository.GetAsync<Resources.Operator>(this.Options.Name, this.Options.Namespace, cancellationToken).ConfigureAwait(false);
}
catch (ProblemDetailsException ex) when (ex.Problem.Status == (int)HttpStatusCode.NotFound) { }
finally
{
if (@operator == null)
{
@operator = new Resources.Operator(new ResourceMetadata(this.Options.Name, this.Options.Namespace), new OperatorSpec()
{
Runner = this.Options.Runner
});
await this.Repository.AddAsync(@operator, false, cancellationToken).ConfigureAwait(false);
}
this.Operator = await this.Repository.MonitorAsync<Resources.Operator>(this.Options.Name, this.Options.Namespace, false, cancellationToken).ConfigureAwait(false);
await this.SetOperatorStatusPhaseAsync(OperatorStatusPhase.Running, cancellationToken).ConfigureAwait(false);
this.Operator.Where(e => e.Type == ResourceWatchEventType.Updated).Select(o => o.Resource.Spec).DistinctUntilChanged().Subscribe(_ => this.OnOperatorSpecChanged(), token: cancellationToken);
this.OnOperatorSpecChanged();
}
}
/// <summary>
/// Sets the <see cref="Resources.Operator"/>'s status phase
/// </summary>
/// <param name="phase">The <see cref="Resources.Operator"/>'s status phase</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task SetOperatorStatusPhaseAsync(string phase, CancellationToken cancellationToken = default)
{
if (this.Operator.Resource.Status?.Phase == phase) return;
var updatedResource = this.Operator.Resource.Clone()!;
var originalResource = this.Operator.Resource.Clone()!;
updatedResource.Status ??= new();
updatedResource.Status.Phase = phase;
var patch = JsonPatchUtility.CreateJsonPatchFromDiff(originalResource, updatedResource);
await this.Repository.PatchStatusAsync<Resources.Operator>(new(PatchType.JsonPatch, patch), updatedResource.GetName(), updatedResource.GetNamespace(), null, false, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Handles changes to the operator spec, and update the operator options accordingly
/// </summary>
protected virtual void OnOperatorSpecChanged()
{
this.Options.Runner = this.Operator.Resource.Spec.Runner;
}
/// <inheritdoc/>
public async Task StopAsync(CancellationToken cancellationToken)
{
await this.SetOperatorStatusPhaseAsync(OperatorStatusPhase.Stopped, cancellationToken).ConfigureAwait(false);
}
}