RocketMQ .NET SDK
更新時間 2024-09-06 01:03:57
最近更新時間: 2024-09-06 01:03:57
分享文章
說明分布式消息服務RocketMQ兼容了社區版 HTTP SDK,您可以使用社區版 HTTP SDK接入分布式消息服務RocketMQ。
前提條件
- 下載社區 C# SDK到本地并解壓。
- 使用Visual Studio打開sln文件導入工程。
發送普通消息
using System;
using Aliyun.MQ;
using Aliyun.MQ.Model;
namespace MQ.Sample
{
public class Producer
{
// 填寫分布式消息服務RocketMQ控制臺HTTP接入點
private const string _endpoint = "${HTTP_ENDPOINT}";
// 填寫AccessKey,在管理控制臺創建
private const string _accessKeyId = "${ACCESS_KEY}";
// 填寫SecretKey 在管理控制臺創建
private const string _secretAccessKey = "${SECRET_KEY}";
// 消息所屬的Topic,在消息隊列RocketMQ版控制臺創建。
private const string _topicName = "${TOPIC}";
// Topic所屬實例ID,默認實例為空
private const string _instanceId = "${INSTANCE_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static void Main(string[] args)
{
try
{
// 循環發送4條消息。
for (int i = 0; i < 4; i++)
{
TopicMessage sendMsg;
// 消息內容。
sendMsg = new TopicMessage("hello mq");
// 設置消息的自定義屬性。
sendMsg.PutProperty("a", i.ToString());
// 設置消息的Key。
sendMsg.MessageKey = "MessageKey";
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("publish message success:" + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
}
消費普通消息
using System;using System.Collections.Generic;using System.Threading;using Aliyun.MQ.Model;using Aliyun.MQ.Model.Exp;
namespace MQ.Sample{
public class Consumer
{
// 填寫分布式消息服務RocketMQ控制臺HTTP接入點
private const string _endpoint = "${HTTP_ENDPOINT}";
// 填寫AccessKey,在管理控制臺創建
private const string _accessKeyId = "${ACCESS_KEY}";
// 填寫SecretKey 在管理控制臺創建
private const string _secretAccessKey = "${SECRET_KEY}";
// 所屬的 Topic
private const string _topicName = "${TOPIC}";
// Topic所屬實例ID,默認實例為空
private const string _instanceId = "${INSTANCE_ID}";
// 您在控制臺創建的 Consumer ID(Group ID)
private const string _groupId = "${GROUP_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// 在當前線程循環消費消息,建議是多開個幾個線程并發消費消息
while (true)
{
try
{
// 長輪詢消費消息
// 長輪詢表示如果topic沒有消息則請求會在服務端掛住3s,3s內如果有消息可以消費則立即返回
List<Message> messages = null;
try
{
messages = consumer.ConsumeMessage(
3, // 一次最多消費3條(最多可設置為16條)
3 // 長輪詢時間3秒(最多可設置為30秒)
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<>();
Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
// 處理業務邏輯
foreach (Message message in messages)
{
Console.WriteLine(message);
handlers.Add(message.ReceiptHandle);
}
// Message.nextConsumeTime前若不確認消息消費成功,則消息會重復消費
// 消息句柄有時間戳,同一條消息每次消費拿到的都不一樣
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack message success:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
// 某些消息的句柄可能超時了會導致確認不成功
if (exp2 is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exp2;
Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}}