Skip to content

Commit 4076e2b

Browse files
tsunghanjacktsaitsaitsung-han.tht
andauthored
[ISSUE #776] Add push consumer for normal/fifo message, namespace support, reentrant message receiving support in C# SDK (#777)
Add push consumer for normal/fifo message, namespace support, reentrant message receiving support in C# --------- Co-authored-by: tsaitsung-han.tht <tsaitsung-han.tht@alibaba-inc.com>
1 parent f4c3878 commit 4076e2b

68 files changed

Lines changed: 6110 additions & 78 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

csharp/examples/ProducerBenchmark.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public static class ProducerBenchmark
3434
private static long _successCounter;
3535
private static long _failureCounter;
3636

37+
private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
38+
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
39+
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
40+
3741
private static readonly BlockingCollection<Task<ISendReceipt>> Tasks =
3842
new BlockingCollection<Task<ISendReceipt>>();
3943

@@ -79,14 +83,11 @@ internal static async Task QuickStart()
7983
{
8084
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
8185
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
82-
const string accessKey = "yourAccessKey";
83-
const string secretKey = "yourSecretKey";
8486

8587
// Credential provider is optional for client configuration.
86-
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
87-
const string endpoints = "foobar.com:8080";
88+
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
8889
var clientConfig = new ClientConfig.Builder()
89-
.SetEndpoints(endpoints)
90+
.SetEndpoints(Endpoint)
9091
.SetCredentialsProvider(credentialsProvider)
9192
.Build();
9293

@@ -108,6 +109,7 @@ internal static async Task QuickStart()
108109
.SetTag(tag)
109110
// You could set multiple keys for the single message actually.
110111
.SetKeys("yourMessageKey-7044358f98fc")
112+
.SetMessageGroup("fifo-group")
111113
.Build();
112114

113115
DoStats();

csharp/examples/ProducerDelayMessageExample.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,19 @@ internal static class ProducerDelayMessageExample
2727
{
2828
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerDelayMessageExample).FullName);
2929

30+
private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
31+
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
32+
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
33+
3034
internal static async Task QuickStart()
3135
{
3236
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
3337
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
34-
const string accessKey = "yourAccessKey";
35-
const string secretKey = "yourSecretKey";
3638

3739
// Credential provider is optional for client configuration.
38-
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
39-
const string endpoints = "foobar.com:8080";
40+
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
4041
var clientConfig = new ClientConfig.Builder()
41-
.SetEndpoints(endpoints)
42+
.SetEndpoints(Endpoint)
4243
.SetCredentialsProvider(credentialsProvider)
4344
.Build();
4445

csharp/examples/ProducerFifoMessageExample.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
using System;
1819
using System.Text;
1920
using System.Threading.Tasks;
2021
using Microsoft.Extensions.Logging;
@@ -26,18 +27,19 @@ internal static class ProducerFifoMessageExample
2627
{
2728
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerFifoMessageExample).FullName);
2829

30+
private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
31+
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
32+
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
33+
2934
internal static async Task QuickStart()
3035
{
3136
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
3237
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
33-
const string accessKey = "yourAccessKey";
34-
const string secretKey = "yourSecretKey";
3538

3639
// Credential provider is optional for client configuration.
37-
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
38-
const string endpoints = "foobar.com:8080";
40+
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
3941
var clientConfig = new ClientConfig.Builder()
40-
.SetEndpoints(endpoints)
42+
.SetEndpoints(Endpoint)
4143
.SetCredentialsProvider(credentialsProvider)
4244
.Build();
4345

csharp/examples/ProducerNormalMessageExample.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
using System;
1819
using System.Text;
1920
using System.Threading.Tasks;
2021
using Microsoft.Extensions.Logging;
@@ -26,18 +27,19 @@ internal static class ProducerNormalMessageExample
2627
{
2728
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerNormalMessageExample).FullName);
2829

30+
private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
31+
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
32+
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
33+
2934
internal static async Task QuickStart()
3035
{
3136
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
3237
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
33-
const string accessKey = "yourAccessKey";
34-
const string secretKey = "yourSecretKey";
3538

3639
// Credential provider is optional for client configuration.
37-
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
38-
const string endpoints = "foobar.com:8080";
40+
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
3941
var clientConfig = new ClientConfig.Builder()
40-
.SetEndpoints(endpoints)
42+
.SetEndpoints(Endpoint)
4143
.SetCredentialsProvider(credentialsProvider)
4244
.Build();
4345

csharp/examples/ProducerTransactionMessageExample.cs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
using System;
1819
using System.Text;
1920
using System.Threading.Tasks;
2021
using Microsoft.Extensions.Logging;
@@ -26,6 +27,10 @@ internal static class ProducerTransactionMessageExample
2627
{
2728
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerTransactionMessageExample).FullName);
2829

30+
private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
31+
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
32+
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
33+
2934
private class TransactionChecker : ITransactionChecker
3035
{
3136
public TransactionResolution Check(MessageView messageView)
@@ -39,14 +44,11 @@ internal static async Task QuickStart()
3944
{
4045
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
4146
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
42-
const string accessKey = "yourAccessKey";
43-
const string secretKey = "yourSecretKey";
4447

4548
// Credential provider is optional for client configuration.
46-
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
47-
const string endpoints = "foobar.com:8080";
49+
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
4850
var clientConfig = new ClientConfig.Builder()
49-
.SetEndpoints(endpoints)
51+
.SetEndpoints(Endpoint)
5052
.SetCredentialsProvider(credentialsProvider)
5153
.Build();
5254

@@ -76,9 +78,9 @@ internal static async Task QuickStart()
7678
var sendReceipt = await producer.Send(message, transaction);
7779
Logger.LogInformation("Send transaction message successfully, messageId={}", sendReceipt.MessageId);
7880
// Commit the transaction.
79-
transaction.Commit();
81+
await transaction.Commit();
8082
// Or rollback the transaction.
81-
// transaction.Rollback();
83+
// await transaction.Rollback();
8284

8385
// Close the producer if you don't need it anymore.
8486
await producer.DisposeAsync();
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Threading;
20+
using System.Threading.Tasks;
21+
using Microsoft.Extensions.Logging;
22+
using Org.Apache.Rocketmq;
23+
24+
namespace examples
25+
{
26+
public class PushConsumerExample
27+
{
28+
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(PushConsumerExample).FullName);
29+
30+
private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
31+
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
32+
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");
33+
34+
internal static async Task QuickStart()
35+
{
36+
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
37+
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
38+
39+
// Credential provider is optional for client configuration.
40+
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
41+
var clientConfig = new ClientConfig.Builder()
42+
.SetEndpoints(Endpoint)
43+
.SetCredentialsProvider(credentialsProvider)
44+
.Build();
45+
46+
// Add your subscriptions.
47+
const string consumerGroup = "yourConsumerGroup";
48+
const string topic = "yourTopic";
49+
var subscription = new Dictionary<string, FilterExpression>
50+
{ { topic, new FilterExpression("*") } };
51+
52+
var pushConsumer = await new PushConsumer.Builder()
53+
.SetClientConfig(clientConfig)
54+
.SetConsumerGroup(consumerGroup)
55+
.SetSubscriptionExpression(subscription)
56+
.SetMessageListener(new CustomMessageListener())
57+
.Build();
58+
59+
Thread.Sleep(Timeout.Infinite);
60+
61+
// Close the push consumer if you don't need it anymore.
62+
// await pushConsumer.DisposeAsync();
63+
}
64+
65+
private class CustomMessageListener : IMessageListener
66+
{
67+
public ConsumeResult Consume(MessageView messageView)
68+
{
69+
// Handle the received message and return consume result.
70+
Logger.LogInformation($"Consume message={messageView}");
71+
return ConsumeResult.SUCCESS;
72+
}
73+
}
74+
}
75+
}

csharp/examples/QuickStart.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public static void Main()
3434
// ProducerFifoMessageExample.QuickStart().Wait();
3535
// ProducerDelayMessageExample.QuickStart().Wait();
3636
// ProducerTransactionMessageExample.QuickStart().Wait();
37+
// PushConsumerExample.QuickStart().Wait();
3738
// SimpleConsumerExample.QuickStart().Wait();
3839
// ProducerBenchmark.QuickStart().Wait();
3940
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
using System;
19+
using System.Collections.Generic;
20+
21+
namespace Org.Apache.Rocketmq
22+
{
23+
public class Assignment
24+
{
25+
public Assignment(MessageQueue messageQueue)
26+
{
27+
MessageQueue = messageQueue ?? throw new ArgumentNullException(nameof(messageQueue));
28+
}
29+
30+
public MessageQueue MessageQueue { get; }
31+
32+
public override bool Equals(object obj)
33+
{
34+
if (this == obj) return true;
35+
if (obj == null || GetType() != obj.GetType()) return false;
36+
37+
var other = (Assignment)obj;
38+
return EqualityComparer<MessageQueue>.Default.Equals(MessageQueue, other.MessageQueue);
39+
}
40+
41+
public override int GetHashCode()
42+
{
43+
return EqualityComparer<MessageQueue>.Default.GetHashCode(MessageQueue);
44+
}
45+
46+
public override string ToString()
47+
{
48+
return $"Assignment{{messageQueue={MessageQueue}}}";
49+
}
50+
}
51+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
using System;
19+
using System.Collections.Generic;
20+
using System.Linq;
21+
using Apache.Rocketmq.V2;
22+
23+
namespace Org.Apache.Rocketmq
24+
{
25+
public class Assignments
26+
{
27+
private readonly List<Assignment> _assignmentList;
28+
29+
public Assignments(List<Assignment> assignmentList)
30+
{
31+
_assignmentList = assignmentList;
32+
}
33+
34+
public override bool Equals(object obj)
35+
{
36+
if (this == obj)
37+
{
38+
return true;
39+
}
40+
41+
if (obj == null || GetType() != obj.GetType())
42+
{
43+
return false;
44+
}
45+
46+
var other = (Assignments)obj;
47+
return _assignmentList.SequenceEqual(other._assignmentList);
48+
}
49+
50+
public override int GetHashCode()
51+
{
52+
return HashCode.Combine(_assignmentList);
53+
}
54+
55+
public override string ToString()
56+
{
57+
return $"{nameof(Assignments)} {{ {nameof(_assignmentList)} = {_assignmentList} }}";
58+
}
59+
60+
public List<Assignment> GetAssignmentList()
61+
{
62+
return _assignmentList;
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)