Skip to content

Commit d7f77a6

Browse files
committed
Related to MassTransit#6127 - non-disposed shared context leaking cancellation token source
1 parent 4552c13 commit d7f77a6

7 files changed

Lines changed: 231 additions & 296 deletions

File tree

src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/ClientContextFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public ClientContextFactory(IConnectionContextSupervisor connectionContextSuperv
1616
_connectionContextSupervisor = connectionContextSupervisor;
1717
}
1818

19-
IPipeContextAgent<ClientContext> IPipeContextFactory<ClientContext>.CreateContext(ISupervisor supervisor)
19+
public IPipeContextAgent<ClientContext> CreateContext(ISupervisor supervisor)
2020
{
2121
IAsyncPipeContextAgent<ClientContext> asyncContext = supervisor.AddAsyncContext<ClientContext>();
2222

@@ -25,7 +25,7 @@ IPipeContextAgent<ClientContext> IPipeContextFactory<ClientContext>.CreateContex
2525
return asyncContext;
2626
}
2727

28-
IActivePipeContextAgent<ClientContext> IPipeContextFactory<ClientContext>.CreateActiveContext(ISupervisor supervisor,
28+
public IActivePipeContextAgent<ClientContext> CreateActiveContext(ISupervisor supervisor,
2929
PipeContextHandle<ClientContext> context, CancellationToken cancellationToken)
3030
{
3131
return supervisor.AddActiveContext(context, CreateSharedClientContext(context.Context, cancellationToken));

src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/SharedClientContext.cs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
namespace MassTransit.AmazonSqsTransport;
22

3-
using System;
43
using System.Collections.Generic;
54
using System.Threading;
65
using System.Threading.Tasks;
@@ -12,23 +11,19 @@
1211

1312
public class SharedClientContext :
1413
ProxyPipeContext,
15-
ClientContext,
16-
IDisposable
14+
ClientContext
1715
{
18-
readonly CancellationToken _cancellationToken;
1916
readonly ClientContext _context;
20-
CancellationTokenSource? _tokenSource;
2117

2218
public SharedClientContext(ClientContext context, CancellationToken cancellationToken)
2319
: base(context)
2420
{
2521
_context = context;
2622

27-
_cancellationToken = cancellationToken;
28-
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, cancellationToken);
23+
CancellationToken = cancellationToken;
2924
}
3025

31-
public override CancellationToken CancellationToken => _tokenSource?.Token ?? _cancellationToken;
26+
public override CancellationToken CancellationToken { get; }
3227

3328
public ConnectionContext ConnectionContext => _context.ConnectionContext;
3429

@@ -115,10 +110,4 @@ public async Task ChangeMessageVisibility(string queueUrl, string receiptHandle,
115110

116111
await _context.ChangeMessageVisibility(queueUrl, receiptHandle, seconds, tokenSource.Token).ConfigureAwait(false);
117112
}
118-
119-
public void Dispose()
120-
{
121-
_tokenSource?.Dispose();
122-
_tokenSource = null;
123-
}
124113
}

src/Transports/MassTransit.AmazonSqsTransport/AmazonSqsTransport/SharedConnectionContext.cs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,19 @@
99

1010
public class SharedConnectionContext :
1111
ProxyPipeContext,
12-
ConnectionContext,
13-
IDisposable
12+
ConnectionContext
1413
{
15-
readonly CancellationToken _cancellationToken;
1614
readonly ConnectionContext _context;
17-
CancellationTokenSource? _tokenSource;
1815

1916
public SharedConnectionContext(ConnectionContext context, CancellationToken cancellationToken)
2017
: base(context)
2118
{
2219
_context = context;
2320

24-
_cancellationToken = cancellationToken;
25-
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, cancellationToken);
21+
CancellationToken = cancellationToken;
2622
}
2723

28-
public override CancellationToken CancellationToken => _tokenSource?.Token ?? _cancellationToken;
24+
public override CancellationToken CancellationToken { get; }
2925

3026
public IConnection Connection => _context.Connection;
3127
public Uri HostAddress => _context.HostAddress;
@@ -73,10 +69,4 @@ public ClientContext CreateClientContext(CancellationToken cancellationToken)
7369
{
7470
return _context.CreateClientContext(cancellationToken);
7571
}
76-
77-
public void Dispose()
78-
{
79-
_tokenSource?.Dispose();
80-
_tokenSource = null;
81-
}
8272
}
Lines changed: 78 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,82 @@
1-
namespace MassTransit.AzureServiceBusTransport
1+
namespace MassTransit.AzureServiceBusTransport;
2+
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Azure.Messaging.ServiceBus;
7+
using Azure.Messaging.ServiceBus.Administration;
8+
using MassTransit.Middleware;
9+
10+
11+
public class SharedConnectionContext :
12+
ProxyPipeContext,
13+
ConnectionContext
214
{
3-
using System;
4-
using System.Threading;
5-
using System.Threading.Tasks;
6-
using Azure.Messaging.ServiceBus;
7-
using Azure.Messaging.ServiceBus.Administration;
8-
using MassTransit.Middleware;
9-
10-
11-
public class SharedConnectionContext :
12-
ProxyPipeContext,
13-
ConnectionContext,
14-
IDisposable
15+
readonly ConnectionContext _context;
16+
17+
public SharedConnectionContext(ConnectionContext context, CancellationToken cancellationToken)
18+
: base(context)
19+
{
20+
_context = context;
21+
22+
CancellationToken = cancellationToken;
23+
}
24+
25+
public override CancellationToken CancellationToken { get; }
26+
27+
public Uri Endpoint => _context.Endpoint;
28+
29+
public ServiceBusProcessor CreateQueueProcessor(ReceiveSettings settings)
30+
{
31+
return _context.CreateQueueProcessor(settings);
32+
}
33+
34+
public ServiceBusSessionProcessor CreateQueueSessionProcessor(ReceiveSettings settings)
35+
{
36+
return _context.CreateQueueSessionProcessor(settings);
37+
}
38+
39+
public ServiceBusProcessor CreateSubscriptionProcessor(SubscriptionSettings settings)
40+
{
41+
return _context.CreateSubscriptionProcessor(settings);
42+
}
43+
44+
public ServiceBusSessionProcessor CreateSubscriptionSessionProcessor(SubscriptionSettings settings)
1545
{
16-
readonly CancellationToken _cancellationToken;
17-
readonly ConnectionContext _context;
18-
CancellationTokenSource _tokenSource;
19-
20-
public SharedConnectionContext(ConnectionContext context, CancellationToken cancellationToken)
21-
: base(context)
22-
{
23-
_context = context;
24-
25-
_cancellationToken = cancellationToken;
26-
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, cancellationToken);
27-
}
28-
29-
public override CancellationToken CancellationToken => _tokenSource?.Token ?? _cancellationToken;
30-
31-
public Uri Endpoint => _context.Endpoint;
32-
33-
public ServiceBusProcessor CreateQueueProcessor(ReceiveSettings settings)
34-
{
35-
return _context.CreateQueueProcessor(settings);
36-
}
37-
38-
public ServiceBusSessionProcessor CreateQueueSessionProcessor(ReceiveSettings settings)
39-
{
40-
return _context.CreateQueueSessionProcessor(settings);
41-
}
42-
43-
public ServiceBusProcessor CreateSubscriptionProcessor(SubscriptionSettings settings)
44-
{
45-
return _context.CreateSubscriptionProcessor(settings);
46-
}
47-
48-
public ServiceBusSessionProcessor CreateSubscriptionSessionProcessor(SubscriptionSettings settings)
49-
{
50-
return _context.CreateSubscriptionSessionProcessor(settings);
51-
}
52-
53-
public ServiceBusSender CreateMessageSender(string entityPath)
54-
{
55-
return _context.CreateMessageSender(entityPath);
56-
}
57-
58-
public Task<QueueProperties> CreateQueue(CreateQueueOptions createQueueOptions, CancellationToken cancellationToken)
59-
{
60-
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
61-
62-
return _context.CreateQueue(createQueueOptions, tokenSource.Token);
63-
}
64-
65-
public Task<TopicProperties> CreateTopic(CreateTopicOptions createTopicOptions, CancellationToken cancellationToken)
66-
{
67-
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
68-
69-
return _context.CreateTopic(createTopicOptions, tokenSource.Token);
70-
}
71-
72-
public Task<SubscriptionProperties> CreateTopicSubscription(CreateSubscriptionOptions createSubscriptionOptions, CreateRuleOptions rule,
73-
RuleFilter filter, CancellationToken cancellationToken)
74-
{
75-
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
76-
77-
return _context.CreateTopicSubscription(createSubscriptionOptions, rule, filter, tokenSource.Token);
78-
}
79-
80-
public Task DeleteTopicSubscription(CreateSubscriptionOptions subscriptionOptions, CancellationToken cancellationToken)
81-
{
82-
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
83-
84-
return _context.DeleteTopicSubscription(subscriptionOptions, tokenSource.Token);
85-
}
86-
87-
public void Dispose()
88-
{
89-
_tokenSource?.Dispose();
90-
_tokenSource = null;
91-
}
46+
return _context.CreateSubscriptionSessionProcessor(settings);
47+
}
48+
49+
public ServiceBusSender CreateMessageSender(string entityPath)
50+
{
51+
return _context.CreateMessageSender(entityPath);
52+
}
53+
54+
public Task<QueueProperties> CreateQueue(CreateQueueOptions createQueueOptions, CancellationToken cancellationToken)
55+
{
56+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
57+
58+
return _context.CreateQueue(createQueueOptions, tokenSource.Token);
59+
}
60+
61+
public Task<TopicProperties> CreateTopic(CreateTopicOptions createTopicOptions, CancellationToken cancellationToken)
62+
{
63+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
64+
65+
return _context.CreateTopic(createTopicOptions, tokenSource.Token);
66+
}
67+
68+
public Task<SubscriptionProperties> CreateTopicSubscription(CreateSubscriptionOptions createSubscriptionOptions, CreateRuleOptions rule,
69+
RuleFilter filter, CancellationToken cancellationToken)
70+
{
71+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
72+
73+
return _context.CreateTopicSubscription(createSubscriptionOptions, rule, filter, tokenSource.Token);
74+
}
75+
76+
public Task DeleteTopicSubscription(CreateSubscriptionOptions subscriptionOptions, CancellationToken cancellationToken)
77+
{
78+
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken);
79+
80+
return _context.DeleteTopicSubscription(subscriptionOptions, tokenSource.Token);
9281
}
9382
}

src/Transports/MassTransit.Azure.ServiceBus.Core/AzureServiceBusTransport/Contexts/SharedSendEndpointContext.cs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,19 @@ namespace MassTransit.AzureServiceBusTransport
99

1010
public class SharedSendEndpointContext :
1111
ProxyPipeContext,
12-
SendEndpointContext,
13-
IDisposable
12+
SendEndpointContext
1413
{
15-
readonly CancellationToken _cancellationToken;
1614
readonly SendEndpointContext _context;
17-
CancellationTokenSource _tokenSource;
1815

1916
public SharedSendEndpointContext(SendEndpointContext context, CancellationToken cancellationToken)
2017
: base(context)
2118
{
2219
_context = context;
2320

24-
_cancellationToken = cancellationToken;
25-
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, cancellationToken);
21+
CancellationToken = cancellationToken;
2622
}
2723

28-
public void Dispose()
29-
{
30-
_tokenSource?.Dispose();
31-
_tokenSource = null;
32-
}
33-
34-
public override CancellationToken CancellationToken => _tokenSource?.Token ?? _cancellationToken;
24+
public override CancellationToken CancellationToken { get; }
3525

3626
public ConnectionContext ConnectionContext => _context.ConnectionContext;
3727

0 commit comments

Comments
 (0)