Skip to content

Commit d5199a5

Browse files
committed
Add support for cloud queues (fixes #204)
1 parent df13724 commit d5199a5

30 files changed

Lines changed: 3732 additions & 0 deletions
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
namespace net.openstack.Core.Domain
2+
{
3+
using System.Collections.ObjectModel;
4+
using Newtonsoft.Json;
5+
6+
/// <summary>
7+
/// This class models the authentication requirements resource hints
8+
/// of the home document described by <strong>Home Documents for HTTP APIs</strong>.
9+
/// </summary>
10+
/// <seealso cref="ResourceHints.AuthenticationRequirements"/>
11+
/// <seealso href="http://tools.ietf.org/html/draft-nottingham-json-home-03#section-4.9">Resource Hints: auth-req (Home Documents for HTTP APIs - draft-nottingham-json-home-03)</seealso>
12+
/// <threadsafety static="true" instance="false"/>
13+
/// <preliminary/>
14+
[JsonObject(MemberSerialization.OptIn)]
15+
public class AuthenticationRequirement
16+
{
17+
#pragma warning disable 649 // Field 'fieldName' is never assigned to, and will always have its default value {value}
18+
/// <summary>
19+
/// This is the backing field for the <see cref="Scheme"/> property.
20+
/// </summary>
21+
[JsonProperty("scheme")]
22+
private string _scheme;
23+
24+
/// <summary>
25+
/// This is the backing field for the <see cref="Realms"/> property.
26+
/// </summary>
27+
[JsonProperty("realms")]
28+
private string[] _realms;
29+
#pragma warning restore 649
30+
31+
/// <summary>
32+
/// Gets the HTTP authentication scheme.
33+
/// </summary>
34+
public string Scheme
35+
{
36+
get
37+
{
38+
return _scheme;
39+
}
40+
}
41+
42+
/// <summary>
43+
/// Gets an optional collection of identity protection spaces the resource is a member of.
44+
/// </summary>
45+
public ReadOnlyCollection<string> Realms
46+
{
47+
get
48+
{
49+
if (_realms == null)
50+
return null;
51+
52+
return new ReadOnlyCollection<string>(_realms);
53+
}
54+
}
55+
}
56+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
namespace net.openstack.Core.Domain
2+
{
3+
using System.Collections.Generic;
4+
using Newtonsoft.Json;
5+
6+
/// <summary>
7+
/// This class models the root object of the home document described by
8+
/// <strong>Home Documents for HTTP APIs</strong>.
9+
/// </summary>
10+
/// <seealso href="http://tools.ietf.org/html/draft-nottingham-json-home-03#section-2">JSON Home Documents (Home Documents for HTTP APIs - draft-nottingham-json-home-03)</seealso>
11+
/// <threadsafety static="true" instance="false"/>
12+
/// <preliminary/>
13+
[JsonObject(MemberSerialization.OptIn)]
14+
public class HomeDocument
15+
{
16+
#pragma warning disable 649 // Field 'fieldName' is never assigned to, and will always have its default value {value}
17+
/// <summary>
18+
/// The backing field for the <see cref="Resources"/> property.
19+
/// </summary>
20+
[JsonProperty("resources")]
21+
private Dictionary<string, ResourceObject> _resources;
22+
#pragma warning restore 649
23+
24+
/// <summary>
25+
/// Gets the resources. The keys of this dictionary are link relation types
26+
/// (as defined by <see href="http://tools.ietf.org/html/rfc5988">RFC5988</see>).
27+
/// </summary>
28+
public Dictionary<string, ResourceObject> Resources
29+
{
30+
get
31+
{
32+
return _resources;
33+
}
34+
}
35+
}
36+
}
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
namespace net.openstack.Core.Domain.Queues
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Collections.ObjectModel;
6+
using System.Linq;
7+
using System.Threading.Tasks;
8+
using net.openstack.Core;
9+
using net.openstack.Core.Providers;
10+
using CancellationToken = System.Threading.CancellationToken;
11+
12+
/// <summary>
13+
/// Represents a claim of messages in a queue.
14+
/// </summary>
15+
/// <remarks>
16+
/// The claim is released when <see cref="Dispose()"/> or <see cref="DisposeAsync"/>
17+
/// is called. At that time, any messages belonging to this claim which have not
18+
/// been deleted will be eligible for claiming by another node in the system.
19+
/// Messages belonging to this claim may be deleted by calling
20+
/// <see cref="IQueueingService.DeleteMessageAsync"/> or
21+
/// <see cref="IQueueingService.DeleteMessagesAsync"/>.
22+
/// </remarks>
23+
/// <seealso cref="IQueueingService"/>
24+
/// <threadsafety static="true" instance="false"/>
25+
/// <preliminary/>
26+
public class Claim : IDisposable
27+
{
28+
/// <summary>
29+
/// A private object used to ensure <see cref="_releaseTask"/> is only
30+
/// initialized once in <see cref="DisposeAsync"/>.
31+
/// </summary>
32+
private readonly object _lock = new object();
33+
34+
/// <summary>
35+
/// The queueing service instance used for commands related to this claim.
36+
/// </summary>
37+
private readonly IQueueingService _service;
38+
39+
/// <summary>
40+
/// The name of the queue this claim belongs to.
41+
/// </summary>
42+
private readonly QueueName _queueName;
43+
44+
/// <summary>
45+
/// The backing field for the <see cref="Location"/> property.
46+
/// </summary>
47+
private readonly Uri _location;
48+
49+
/// <summary>
50+
/// The backing field for the <see cref="Age"/> property.
51+
/// </summary>
52+
private TimeSpan _age;
53+
54+
/// <summary>
55+
/// The backing field for the <see cref="TimeToLive"/> property.
56+
/// </summary>
57+
private TimeSpan _timeToLive;
58+
59+
/// <summary>
60+
/// The backing field for the <see cref="Messages"/> property.
61+
/// </summary>
62+
private QueuedMessage[] _messages;
63+
64+
/// <summary>
65+
/// The <see cref="Task"/> object representing the asynchronous release of this claim.
66+
/// Prior to calling <see cref="Dispose()"/> or <see cref="DisposeAsync"/>, the value of
67+
/// this field is <c>null</c>.
68+
/// </summary>
69+
private Task _releaseTask;
70+
71+
/// <summary>
72+
/// Initializes a new instance of the <see cref="Claim"/> class using the provided values.
73+
/// </summary>
74+
/// <param name="service">The queueing service.</param>
75+
/// <param name="queueName">The name of the queue.</param>
76+
/// <param name="location">The absolute URI of the claim resource. If no claim was allocated by the server, this value is <c>null</c>.</param>
77+
/// <param name="timeToLive">The time to live of the claim.</param>
78+
/// <param name="age">The age of the claim.</param>
79+
/// <param name="owner"><c>true</c> if the current instance owns the claim (and is responsible for releasing it); otherwise, <c>false</c>.</param>
80+
/// <param name="messages">A collection of messages belonging to the claim.</param>
81+
/// <exception cref="ArgumentNullException">
82+
/// If <paramref name="service"/> is <c>null</c>.
83+
/// <para>-or-</para>
84+
/// <para>If <paramref name="queueName"/> is <c>null</c>.</para>
85+
/// <para>-or-</para>
86+
/// <para>If <paramref name="messages"/> is <c>null</c>.</para>
87+
/// </exception>
88+
public Claim(IQueueingService service, QueueName queueName, Uri location, TimeSpan timeToLive, TimeSpan age, bool owner, IEnumerable<QueuedMessage> messages)
89+
{
90+
if (service == null)
91+
throw new ArgumentNullException("service");
92+
if (queueName == null)
93+
throw new ArgumentNullException("queueName");
94+
if (messages == null)
95+
throw new ArgumentNullException("messages");
96+
97+
_service = service;
98+
_queueName = queueName;
99+
_location = location;
100+
_timeToLive = timeToLive;
101+
_age = age;
102+
_messages = messages.ToArray();
103+
if (!owner)
104+
{
105+
// prevent this object from releasing the resource
106+
_releaseTask = InternalTaskExtensions.CompletedTask();
107+
}
108+
}
109+
110+
/// <summary>
111+
/// Gets the claim ID.
112+
/// </summary>
113+
/// <remarks>
114+
/// The claim ID is derived from the <see cref="Location"/> property according to the
115+
/// URI template documented in the <see href="https://wiki.openstack.org/w/index.php?title=Marconi/specs/api/v1">OpenStack Marconi API v1 Blueprint</see>.
116+
/// </remarks>
117+
/// <value>
118+
/// The ID of this claim. If the claim is empty (i.e. the queue did not have any unclaimed messages), this value is <c>null</c>.
119+
/// </value>
120+
public ClaimId Id
121+
{
122+
get
123+
{
124+
if (_location == null)
125+
return null;
126+
127+
string locationPath = _location.AbsolutePath;
128+
return new ClaimId(locationPath.Substring(locationPath.LastIndexOf('/') + 1));
129+
}
130+
}
131+
132+
/// <summary>
133+
/// Gets the absolute URI for this claim.
134+
/// </summary>
135+
/// <value>
136+
/// The absolute URI of this claim. If the claim is empty (i.e. the queue did not have any unclaimed messages), this value is <c>null</c>.
137+
/// </value>
138+
public Uri Location
139+
{
140+
get
141+
{
142+
return _location;
143+
}
144+
}
145+
146+
/// <summary>
147+
/// Gets the age of the claim as returned by the server.
148+
/// </summary>
149+
/// <remarks>
150+
/// This value does not automatically update. To obtain the age of a claim after a period of time elapses,
151+
/// use <see cref="IQueueingService.QueryClaimAsync"/>.
152+
/// </remarks>
153+
public TimeSpan Age
154+
{
155+
get
156+
{
157+
return _age;
158+
}
159+
}
160+
161+
/// <summary>
162+
/// Gets the Time To Live (TTL) of the claim.
163+
/// </summary>
164+
public TimeSpan TimeToLive
165+
{
166+
get
167+
{
168+
return _timeToLive;
169+
}
170+
171+
private set
172+
{
173+
_timeToLive = value;
174+
}
175+
}
176+
177+
/// <summary>
178+
/// Gets the messages which are included in this claim.
179+
/// </summary>
180+
public ReadOnlyCollection<QueuedMessage> Messages
181+
{
182+
get
183+
{
184+
return new ReadOnlyCollection<QueuedMessage>(_messages);
185+
}
186+
}
187+
188+
/// <summary>
189+
/// Refreshes the current claim.
190+
/// </summary>
191+
/// <remarks>
192+
/// This method calls <see cref="IQueueingService.QueryClaimAsync"/> to obtain updated
193+
/// information about the current claim, and then synchronously invokes <see cref="RefreshImpl"/>
194+
/// to update the current instance to match the results.
195+
/// </remarks>
196+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> that the task will observe.</param>
197+
/// <returns>A <see cref="Task"/> object representing the asynchronous operation.</returns>
198+
public Task RefreshAsync(CancellationToken cancellationToken)
199+
{
200+
Action<Task<Claim>> applyChanges = task => RefreshImpl(task.Result);
201+
return _service.QueryClaimAsync(_queueName, this, cancellationToken).ContinueWith(applyChanges, TaskContinuationOptions.ExecuteSynchronously);
202+
}
203+
204+
/// <summary>
205+
/// Renews the claim by resetting the age and updating the TTL for the claim.
206+
/// </summary>
207+
/// <remarks>
208+
/// This method calls <see cref="IQueueingService.UpdateClaimAsync"/> to renew the
209+
/// current claim, and then synchronously updates the current instance to reflect
210+
/// the new age and time-to-live values.
211+
/// </remarks>
212+
/// <param name="timeToLive">
213+
/// The new Time-To-Live value for the claim. This value may differ from the original TTL of the claim.
214+
/// </param>
215+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> that the task will observe.</param>
216+
/// <returns>A <see cref="Task"/> object representing the asynchronous operation.</returns>
217+
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="timeToLive"/> is negative or <see cref="TimeSpan.Zero"/>.</exception>
218+
/// <exception cref="InvalidOperationException">If the claim is empty (i.e. <see cref="Messages"/> is empty).</exception>
219+
public Task RenewAsync(TimeSpan timeToLive, CancellationToken cancellationToken)
220+
{
221+
if (timeToLive <= TimeSpan.Zero)
222+
throw new ArgumentOutOfRangeException("timeToLive");
223+
if (_location == null)
224+
throw new InvalidOperationException("Empty claims cannot be renewed.");
225+
226+
Action<Task> applyChanges =
227+
task =>
228+
{
229+
task.PropagateExceptions();
230+
_age = TimeSpan.Zero;
231+
TimeToLive = timeToLive;
232+
};
233+
return _service.UpdateClaimAsync(_queueName, this, timeToLive, cancellationToken).ContinueWith(applyChanges, TaskContinuationOptions.ExecuteSynchronously);
234+
}
235+
236+
/// <inheritdoc/>
237+
/// <remarks>
238+
/// This method calls <see cref="IQueueingService.ReleaseClaimAsync"/> to release messages
239+
/// claimed by this claim. To prevent other subscribers from re-claiming the messages, make
240+
/// sure to delete the messages before calling <see cref="Dispose()"/>.
241+
/// </remarks>
242+
/// <seealso cref="IQueueingService.ReleaseClaimAsync"/>
243+
public void Dispose()
244+
{
245+
Dispose(true);
246+
GC.SuppressFinalize(this);
247+
}
248+
249+
/// <summary>
250+
/// Asynchronously releases resources owned by this <see cref="Claim"/>.
251+
/// </summary>
252+
/// <remarks>
253+
/// This method calls <see cref="IQueueingService.ReleaseClaimAsync"/> to release messages
254+
/// claimed by this claim. To prevent other subscribers from re-claiming the messages, make
255+
/// sure to delete the messages before calling <see cref="DisposeAsync"/>.
256+
/// </remarks>
257+
/// <returns>A <see cref="Task"/> object representing the asynchronous operation.</returns>
258+
public Task DisposeAsync(CancellationToken cancellationToken)
259+
{
260+
lock (_lock)
261+
{
262+
if (_releaseTask == null)
263+
{
264+
if (_messages.Length == 0)
265+
_releaseTask = InternalTaskExtensions.CompletedTask();
266+
else
267+
_releaseTask = _service.ReleaseClaimAsync(_queueName, this, cancellationToken);
268+
}
269+
}
270+
271+
return _releaseTask;
272+
}
273+
274+
/// <summary>
275+
/// Releases resources owned by this <see cref="Claim"/>.
276+
/// </summary>
277+
/// <param name="disposing"><c>true</c> if this method was called from <see cref="Dispose()"/>; otherwise, <c>false</c> if this method was called from a finalizer.</param>
278+
protected virtual void Dispose(bool disposing)
279+
{
280+
if (disposing)
281+
{
282+
DisposeAsync(CancellationToken.None).Wait();
283+
}
284+
}
285+
286+
/// <summary>
287+
/// Refresh the current claim to match the updated information in <paramref name="claim"/>.
288+
/// </summary>
289+
/// <param name="claim">A <see cref="Claim"/> object containing updated claim information.</param>
290+
/// <exception cref="ArgumentNullException">If <paramref name="claim"/> is <c>null</c>.</exception>
291+
/// <exception cref="ArgumentException">If the specified <paramref name="claim"/> does not represent the same claim as the current instance.</exception>
292+
protected virtual void RefreshImpl(Claim claim)
293+
{
294+
if (claim == null)
295+
throw new ArgumentNullException("claim");
296+
if (Location != claim.Location)
297+
throw new ArgumentException("The specified claim does not represent the same claim as the current instance.", "claim");
298+
299+
this._age = claim._age;
300+
this._messages = claim._messages;
301+
this._timeToLive = claim._timeToLive;
302+
}
303+
}
304+
}

0 commit comments

Comments
 (0)