-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathHelpers.cs
More file actions
202 lines (167 loc) · 7.35 KB
/
Helpers.cs
File metadata and controls
202 lines (167 loc) · 7.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
/*
The contents of this file are subject to the Mozilla Public License
Version 1.1 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License at
http://www.mozilla.org/MPL/
Software distributed under the License is distributed on an "AS IS"
basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
License for the specific language governing rights and limitations
under the License.
The Original Code is DataMangler Key-Value Store.
The Initial Developer of the Original Code is Mozilla Corporation.
Original Author: Kevin Gadd (kevin.gadd@gmail.com)
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Squared.Task;
namespace Squared.Data.Mangler {
internal struct BatchItem<T> {
public readonly bool AllowReplacement;
public readonly UpdateCallback<T> Callback;
public readonly DecisionUpdateCallback<T> DecisionCallback;
public TangleKey Key;
public T Value;
public BatchItem (TangleKey key, ref T value, bool allowReplacement) {
Key = key;
Value = value;
AllowReplacement = allowReplacement;
Callback = null;
DecisionCallback = null;
}
public BatchItem (TangleKey key, ref T value, UpdateCallback<T> callback) {
Key = key;
Value = value;
AllowReplacement = false;
Callback = callback;
DecisionCallback = null;
}
public BatchItem (TangleKey key, ref T value, DecisionUpdateCallback<T> callback) {
Key = key;
Value = value;
AllowReplacement = false;
Callback = null;
DecisionCallback = callback;
}
}
/// <summary>
/// A Batch allows you to apply multiple modifications to a Tangle as a single work item.
/// To use it, create an instance of the appropriate batch type, add modifications to it using its methods, and then Execute it on the Tangle.
/// Using a batch to modify a Tangle is faster than calling the Tangle's methods individually.
/// </summary>
/// <typeparam name="T">The type of the tangle's items.</typeparam>
public class Batch<T> : IBatch {
public readonly int Capacity;
public readonly Tangle<T> Tangle;
internal readonly BatchItem<T>[] Buffer;
private int _Count;
/// <summary>
/// Creates a batch.
/// </summary>
/// <param name="tangle">The tangle against which to issue the batch's commands.</param>
/// <param name="capacity">The maximum number of modifications that the batch can contain.</param>
public Batch (Tangle<T> tangle, int capacity) {
Tangle = tangle;
Capacity = capacity;
Buffer = new BatchItem<T>[capacity];
}
/// <summary>
/// The number of modifications currently contained by the batch.
/// </summary>
public int Count {
get {
return _Count;
}
}
public void Add (TangleKey key, T value) {
Add(key, ref value);
}
public void Add (TangleKey key, ref T value) {
if (_Count >= Capacity)
throw new IndexOutOfRangeException();
Buffer[_Count++] = new BatchItem<T>(key, ref value, false);
}
public void Set (TangleKey key, T value) {
Set(key, ref value);
}
public void Set (TangleKey key, ref T value) {
if (_Count >= Capacity)
throw new IndexOutOfRangeException();
Buffer[_Count++] = new BatchItem<T>(key, ref value, true);
}
public void AddOrUpdate (TangleKey key, T value, UpdateCallback<T> updateCallback) {
AddOrUpdate(key, ref value, updateCallback);
}
public void AddOrUpdate (TangleKey key, ref T value, UpdateCallback<T> updateCallback) {
if (_Count >= Capacity)
throw new IndexOutOfRangeException();
Buffer[_Count++] = new BatchItem<T>(key, ref value, updateCallback);
}
public void AddOrUpdate (TangleKey key, T value, DecisionUpdateCallback<T> updateCallback) {
AddOrUpdate(key, ref value, updateCallback);
}
public void AddOrUpdate (TangleKey key, ref T value, DecisionUpdateCallback<T> updateCallback) {
if (_Count >= Capacity)
throw new IndexOutOfRangeException();
Buffer[_Count++] = new BatchItem<T>(key, ref value, updateCallback);
}
/// <summary>
/// Adds the batch to the Tangle's work queue.
/// </summary>
/// <returns>A future that becomes completed once all the work items in the batch have completed.</returns>
public Future<int> Execute () {
return Tangle.QueueWorkItem(new Tangle<T>.BatchThunk(this));
}
}
/// <summary>
/// Represents a collection of barriers in one or more tangles.
/// </summary>
public class BarrierCollection : IBarrier {
private readonly List<IBarrier> Barriers = new List<IBarrier>();
private readonly IFuture Composite;
/// <summary>
/// Inserts barriers into one or more tangles and creates a barrier collection out of them.
/// </summary>
/// <param name="waitForAll">If true, the barrier collection will not become signaled until all the barriers within it are signaled. Otherwise, only one barrier within the collection must become signaled before the collection is signaled.</param>
/// <param name="tangles">The list of tangles to create barriers in.</param>
public BarrierCollection (bool waitForAll, IEnumerable<ITangle> tangles) {
foreach (var tangle in tangles)
Barriers.Add(tangle.CreateBarrier(false));
var futures = from barrier in Barriers select barrier.Future;
if (waitForAll)
Composite = Squared.Task.Future.WaitForAll(futures);
else
Composite = Squared.Task.Future.WaitForFirst(futures);
}
/// <summary>
/// Inserts barriers into one or more tangles and creates a barrier collection out of them.
/// </summary>
/// <param name="waitForAll">If true, the barrier collection will not become signaled until all the barriers within it are signaled. Otherwise, only one barrier within the collection must become signaled before the collection is signaled.</param>
/// <param name="tangles">The list of tangles to create barriers in.</param>
public BarrierCollection (bool waitForAll, params ITangle[] tangles)
: this(waitForAll, (IEnumerable<ITangle>)tangles) {
}
public void Dispose () {
foreach (var barrier in Barriers)
barrier.Dispose();
Barriers.Clear();
Composite.Dispose();
}
/// <summary>
/// Opens all the barriers within the collection.
/// </summary>
public void Open () {
foreach (var barrier in Barriers)
barrier.Open();
}
public IFuture Future {
get {
return Composite;
}
}
void ISchedulable.Schedule (TaskScheduler scheduler, IFuture future) {
future.Bind(Composite);
}
}
}