@@ -168,7 +168,7 @@ static string GetDeviceMetadata()
168168 /// describing the <see cref="StepperDriver"/> device registers.
169169 /// </summary>
170170 [Description("Returns the contents of the metadata file describing the StepperDriver device registers.")]
171- public partial class GetMetadata : Source<string>
171+ public partial class GetDeviceMetadata : Source<string>
172172 {
173173 /// <summary>
174174 /// Returns an observable sequence with the contents of the metadata file
@@ -205,6 +205,156 @@ public override IObservable<IGroupedObservable<Type, HarpMessage>> Process(IObse
205205 }
206206 }
207207
208+ /// <summary>
209+ /// Represents an operator that writes the sequence of <see cref="StepperDriver"/>" messages
210+ /// to the standard Harp storage format.
211+ /// </summary>
212+ [Description("Writes the sequence of StepperDriver messages to the standard Harp storage format.")]
213+ public partial class DeviceDataWriter : Sink<HarpMessage>, INamedElement
214+ {
215+ const string BinaryExtension = ".bin";
216+ const string MetadataFileName = "device.yml";
217+ readonly Bonsai.Harp.MessageWriter writer = new();
218+
219+ string INamedElement.Name => nameof(StepperDriver) + "DataWriter";
220+
221+ /// <summary>
222+ /// Gets or sets the relative or absolute path on which to save the message data.
223+ /// </summary>
224+ [Description("The relative or absolute path of the directory on which to save the message data.")]
225+ [Editor("Bonsai.Design.SaveFileNameEditor, Bonsai.Design", DesignTypes.UITypeEditor)]
226+ public string Path
227+ {
228+ get => System.IO.Path.GetDirectoryName(writer.FileName);
229+ set => writer.FileName = System.IO.Path.Combine(value, nameof(StepperDriver) + BinaryExtension);
230+ }
231+
232+ /// <summary>
233+ /// Gets or sets a value indicating whether element writing should be buffered. If <see langword="true"/>,
234+ /// the write commands will be queued in memory as fast as possible and will be processed
235+ /// by the writer in a different thread. Otherwise, writing will be done in the same
236+ /// thread in which notifications arrive.
237+ /// </summary>
238+ [Description("Indicates whether writing should be buffered.")]
239+ public bool Buffered
240+ {
241+ get => writer.Buffered;
242+ set => writer.Buffered = value;
243+ }
244+
245+ /// <summary>
246+ /// Gets or sets a value indicating whether to overwrite the output file if it already exists.
247+ /// </summary>
248+ [Description("Indicates whether to overwrite the output file if it already exists.")]
249+ public bool Overwrite
250+ {
251+ get => writer.Overwrite;
252+ set => writer.Overwrite = value;
253+ }
254+
255+ /// <summary>
256+ /// Gets or sets a value specifying how the message filter will use the matching criteria.
257+ /// </summary>
258+ [Description("Specifies how the message filter will use the matching criteria.")]
259+ public FilterType FilterType
260+ {
261+ get => writer.FilterType;
262+ set => writer.FilterType = value;
263+ }
264+
265+ /// <summary>
266+ /// Gets or sets a value specifying the expected message type. If no value is
267+ /// specified, all messages will be accepted.
268+ /// </summary>
269+ [Description("Specifies the expected message type. If no value is specified, all messages will be accepted.")]
270+ public MessageType? MessageType
271+ {
272+ get => writer.MessageType;
273+ set => writer.MessageType = value;
274+ }
275+
276+ private IObservable<TSource> WriteDeviceMetadata<TSource>(IObservable<TSource> source)
277+ {
278+ var basePath = Path;
279+ if (string.IsNullOrEmpty(basePath))
280+ return source;
281+
282+ var metadataPath = System.IO.Path.Combine(basePath, MetadataFileName);
283+ return Observable.Create<TSource>(observer =>
284+ {
285+ Bonsai.IO.PathHelper.EnsureDirectory(metadataPath);
286+ if (System.IO.File.Exists(metadataPath) && !Overwrite)
287+ {
288+ throw new System.IO.IOException(string.Format("The file '{0}' already exists.", metadataPath));
289+ }
290+
291+ System.IO.File.WriteAllText(metadataPath, Device.Metadata);
292+ return source.SubscribeSafe(observer);
293+ });
294+ }
295+
296+ /// <summary>
297+ /// Writes each Harp message in the sequence to the specified binary file, and the
298+ /// contents of the device metadata file to a separate text file.
299+ /// </summary>
300+ /// <param name="source">The sequence of messages to write to the file.</param>
301+ /// <returns>
302+ /// An observable sequence that is identical to the <paramref name="source"/>
303+ /// sequence but where there is an additional side effect of writing the
304+ /// messages to a raw binary file, and the contents of the device metadata file
305+ /// to a separate text file.
306+ /// </returns>
307+ public override IObservable<HarpMessage> Process(IObservable<HarpMessage> source)
308+ {
309+ return source.Publish(ps => ps.Merge(
310+ WriteDeviceMetadata(writer.Process(ps.GroupBy(message => message.Address)))
311+ .IgnoreElements()
312+ .Cast<HarpMessage>()));
313+ }
314+
315+ /// <summary>
316+ /// Writes each Harp message in the sequence of observable groups to the
317+ /// corresponding binary file, where the name of each file is generated from
318+ /// the common group register address. The contents of the device metadata file are
319+ /// written to a separate text file.
320+ /// </summary>
321+ /// <param name="source">
322+ /// A sequence of observable groups, each of which corresponds to a unique register
323+ /// address.
324+ /// </param>
325+ /// <returns>
326+ /// An observable sequence that is identical to the <paramref name="source"/>
327+ /// sequence but where there is an additional side effect of writing the Harp
328+ /// messages in each group to the corresponding file, and the contents of the device
329+ /// metadata file to a separate text file.
330+ /// </returns>
331+ public IObservable<IGroupedObservable<int, HarpMessage>> Process(IObservable<IGroupedObservable<int, HarpMessage>> source)
332+ {
333+ return WriteDeviceMetadata(writer.Process(source));
334+ }
335+
336+ /// <summary>
337+ /// Writes each Harp message in the sequence of observable groups to the
338+ /// corresponding binary file, where the name of each file is generated from
339+ /// the common group register name. The contents of the device metadata file are
340+ /// written to a separate text file.
341+ /// </summary>
342+ /// <param name="source">
343+ /// A sequence of observable groups, each of which corresponds to a unique register
344+ /// type.
345+ /// </param>
346+ /// <returns>
347+ /// An observable sequence that is identical to the <paramref name="source"/>
348+ /// sequence but where there is an additional side effect of writing the Harp
349+ /// messages in each group to the corresponding file, and the contents of the device
350+ /// metadata file to a separate text file.
351+ /// </returns>
352+ public IObservable<IGroupedObservable<Type, HarpMessage>> Process(IObservable<IGroupedObservable<Type, HarpMessage>> source)
353+ {
354+ return WriteDeviceMetadata(writer.Process(source));
355+ }
356+ }
357+
208358 /// <summary>
209359 /// Represents an operator that filters register-specific messages
210360 /// reported by the <see cref="StepperDriver"/> device.
0 commit comments