Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/SampleBatch.Api/Controllers/BatchJobsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public async Task<ActionResult<Guid>> Post(int jobCount = 100, int activeThresho
{
BatchId = id,
InVar.Timestamp,
Action = BatchAction.CancelOrders,
Action = BatchActionEnum.CancelOrders,
OrderIds = orderIds.ToArray(),
ActiveThreshold = activeThreshold,
DelayInSeconds = delayInSeconds
Expand Down
73 changes: 14 additions & 59 deletions src/SampleBatch.Components/Consumers/ProcessBatchJobConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
namespace SampleBatch.Components.Consumers
{
using System;
using System.Threading.Tasks;
using Contracts;
using Contracts.Enums;
using MassTransit;
using MassTransit.Courier;
using MassTransit.Courier.Contracts;
Expand All @@ -24,64 +22,21 @@ public async Task Consume(ConsumeContext<ProcessBatchJob> context)
{
using (_log.BeginScope("ProcessBatchJob {BatchJobId}, {OrderId}", context.Message.BatchJobId, context.Message.OrderId))
{
var builder = new RoutingSlipBuilder(NewId.NextGuid());

switch (context.Message.Action)
var routingSlip = await context.Message.Action.SetupRoutingSlip(context, async builder =>
{
case BatchAction.CancelOrders:
builder.AddActivity(
"CancelOrder",
new Uri("queue:cancel-order_execute"),
new
{
context.Message.OrderId,
Reason = "Product discontinued"
});

await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.ActivityFaulted,
RoutingSlipEventContents.None,
"CancelOrder",
x => x.Send<BatchJobFailed>(new
{
context.Message.BatchJobId,
context.Message.BatchId,
context.Message.OrderId
}));
break;

case BatchAction.SuspendOrders:
builder.AddActivity(
"SuspendOrder",
new Uri("queue:suspend-order_execute"),
new {context.Message.OrderId});

await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.ActivityFaulted,
RoutingSlipEventContents.None,
"SuspendOrder",
x => x.Send<BatchJobFailed>(new
{
context.Message.BatchJobId,
context.Message.BatchId,
context.Message.OrderId
}));
break;
}

await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.Completed,
x => x.Send<BatchJobCompleted>(new
{
context.Message.BatchJobId,
context.Message.BatchId
}));

await context.Execute(builder.Build());
await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.Completed,
x => x.Send<BatchJobCompleted>(new
{
context.Message.BatchJobId,
context.Message.BatchId,
InVar.Timestamp
}));
});

await context.Execute(routingSlip);
}
}
}
}
}
4 changes: 1 addition & 3 deletions src/SampleBatch.Components/SampleBatchDbContext.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using Microsoft.EntityFrameworkCore;
using SampleBatch.Components.StateMachines;
using System;
using System.Collections.Generic;
using System.Text;


namespace SampleBatch.Components
{
Expand Down
10 changes: 5 additions & 5 deletions src/SampleBatch.Components/StateMachines/BatchState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ public class BatchState :

public DateTime? UpdateTimestamp { get; set; }

public BatchAction? Action { get; set; }
public BatchActionEnum Action { get; set; }

/// <summary>
/// The maximum amount of active Jobs allowed to be processing. Typically an amount larger than your Job Consumer can handle concurrently, to allow for some additional prefetch while the Batch Saga dispatches more
/// </summary>
public int? ActiveThreshold { get; set; } = 20;
/// <summary>
/// The maximum amount of active Jobs allowed to be processing. Typically an amount larger than your Job Consumer can handle concurrently, to allow for some additional prefetch while the Batch Saga dispatches more
/// </summary>
public int? ActiveThreshold { get; set; } = 20;

public int? Total { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
{
using System;
using System.Collections.Generic;
using System.Linq;
using Common;
using Contracts.Enums;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;


class BatchStateEntityConfiguration :
Expand All @@ -22,8 +22,9 @@ public void Configure(EntityTypeBuilder<BatchState> builder)

builder.Property(c => c.CurrentState).IsRequired();

builder.Property(c => c.Action)
.HasConversion(new EnumToStringConverter<BatchAction>());
builder.Property(p => p.Action)
.HasConversion(v => v.Value, i => BatchActionEnum.List().FirstOrDefault(e => e.Value == i));


builder.Property(c => c.UnprocessedOrderIds)
.HasConversion(new JsonValueConverter<Stack<Guid>>())
Expand All @@ -34,4 +35,4 @@ public void Configure(EntityTypeBuilder<BatchState> builder)
.Metadata.SetValueComparer(new JsonValueComparer<Dictionary<Guid, Guid>>());
}
}
}
}
2 changes: 1 addition & 1 deletion src/SampleBatch.Components/StateMachines/JobState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class JobState :

public DateTime? UpdateTimestamp { get; set; }

public BatchAction Action { get; set; }
public BatchActionEnum Action { get; set; }

public string ExceptionMessage { get; set; }

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
namespace SampleBatch.Components.StateMachines
{
using System.Linq;
using Contracts.Enums;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;


class JobStateEntityConfiguration :
Expand All @@ -19,8 +19,8 @@ public void Configure(EntityTypeBuilder<JobState> builder)

builder.Property(c => c.CurrentState).IsRequired();

builder.Property(c => c.Action)
.HasConversion(new EnumToStringConverter<BatchAction>());
builder.Property(p => p.Action)
.HasConversion(v => v.Value, i => BatchActionEnum.List().FirstOrDefault(e => e.Value == i));
}
}
}
}
2 changes: 1 addition & 1 deletion src/SampleBatch.Contracts/BatchJobReceived.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ public interface BatchJobReceived
Guid BatchId { get; }
Guid OrderId { get; }
DateTime Timestamp { get; }
BatchAction Action { get; }
BatchActionEnum Action { get; }
}
}
2 changes: 1 addition & 1 deletion src/SampleBatch.Contracts/BatchReceived.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public interface BatchReceived
{
Guid BatchId { get; }
DateTime Timestamp { get; }
BatchAction Action { get; }
BatchActionEnum Action { get; }
Guid[] OrderIds { get; }
int ActiveThreshold { get; }

Expand Down
8 changes: 0 additions & 8 deletions src/SampleBatch.Contracts/Enums/BatchAction.cs

This file was deleted.

50 changes: 50 additions & 0 deletions src/SampleBatch.Contracts/Enums/BatchActionEnum.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
namespace SampleBatch.Contracts.Enums
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Converter;
using Internal;
using MassTransit;
using MassTransit.Courier;
using MassTransit.Courier.Contracts;
using Newtonsoft.Json;


[JsonConverter(typeof(BatchActionEnumConverter))]
public abstract class BatchActionEnum
{
public static readonly BatchActionEnum CancelOrders = new CancelOrdersEnum();
public static readonly BatchActionEnum SuspendOrders = new SuspendOrdersEnum();

public int Value { get; private set; }
public string Name { get; private set; }

public static IEnumerable<BatchActionEnum> List()
{
yield return CancelOrders;
yield return SuspendOrders;
}

protected BatchActionEnum(int value, string name)
{
Value = value;
Name = name;
}

public async Task<RoutingSlip> SetupRoutingSlip(ConsumeContext<ProcessBatchJob> context, Func<RoutingSlipBuilder, Task> commonAction)
{
var builder = new RoutingSlipBuilder(NewId.NextGuid());

await SetupRoutingSlip(builder, context);

await commonAction?.Invoke(builder);

return builder.Build();

}

protected abstract Task SetupRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<ProcessBatchJob> context);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
namespace SampleBatch.Contracts.Enums.Converter
{
using System;
using Internal;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;


public class BaseSpecifiedConcreteClassConverter : DefaultContractResolver
{
protected override JsonConverter ResolveContractConverter(Type objectType)
{
if (typeof(BatchActionEnum).IsAssignableFrom(objectType) && !objectType.IsAbstract)
return null; // pretend TableSortRuleConvert is not specified (thus avoiding a stack overflow)
return base.ResolveContractConverter(objectType);
}
}

public class BatchActionEnumConverter : JsonConverter
{
static readonly JsonSerializerSettings SpecifiedSubclassConversion = new JsonSerializerSettings() { ContractResolver = new BaseSpecifiedConcreteClassConverter() };

public override bool CanConvert(Type objectType)
{
return (objectType == typeof(BatchActionEnum));
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
JObject jo = JObject.Load(reader);
switch (jo["value"].Value<int>())
{
case 1:
return JsonConvert.DeserializeObject<CancelOrdersEnum>(jo.ToString(), SpecifiedSubclassConversion);
case 2:
return JsonConvert.DeserializeObject<SuspendOrdersEnum>(jo.ToString(), SpecifiedSubclassConversion);
default:
throw new Exception();
}
throw new NotImplementedException();
}

public override bool CanWrite {
get { return false; }
}

public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
throw new NotImplementedException(); // won't be called because CanWrite returns false
}
}
}
41 changes: 41 additions & 0 deletions src/SampleBatch.Contracts/Enums/Internal/CancelOrdersEnum.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace SampleBatch.Contracts.Enums.Internal
{
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.Courier;
using MassTransit.Courier.Contracts;


class CancelOrdersEnum : BatchActionEnum
{
public CancelOrdersEnum()
: base(1, "Cancel Orders")
{
}

protected override async Task SetupRoutingSlip( RoutingSlipBuilder builder, ConsumeContext<ProcessBatchJob> context)
{
builder.AddActivity(
"CancelOrder",
new Uri("queue:cancel-order_execute"),
new
{
context.Message.OrderId,
Reason = "Product discontinued"
});

await builder.AddSubscription(
context.SourceAddress,
RoutingSlipEvents.ActivityFaulted,
RoutingSlipEventContents.None,
"CancelOrder",
x => x.Send<BatchJobFailed>(new
{
context.Message.BatchJobId,
context.Message.BatchId,
context.Message.OrderId
}));
}
}
}
Loading