rabbitmq小记
发topic代码,创建交换器,数据直接按routingKey发送到交换器中,即不需要定义队列:
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 创建交换器
channel.ExchangeDeclare("logs3", ExchangeType.Topic);
// 发送消息
channel.BasicPublish(exchange: "logs3", routingKey: "red.green", basicProperties: null, body: Encoding.UTF8.GetBytes($"测试red.green"));
channel.BasicPublish(exchange: "logs3", routingKey: "red.yellow.green", basicProperties: null, body: Encoding.UTF8.GetBytes($"测试red.yellow.green"));
}
}
接收topic,需要定义队列并绑定到交换器,每个队列中的数据只能被一个消费者获取,即使两个消费都同时接收a.#,如果想多个消费者都收到这个消息,则不同消费者需定义不同的队列:
如果使用了不同的队列,要注意是否有消息不被消费的情况,造成消息累积永远没有人消费
private void button6_Click(object sender, EventArgs e)
{
//接收topic
string qname = Guid.NewGuid().ToString(); //如果多个消费者定义的队列名相同,则只能发到一个消费者
cn接收 = this.factory.CreateConnection();
mod接收 = cn接收.CreateModel();
mod接收.QueueDeclare(qname, false, false, true, null);
mod接收.QueueBind(qname, "logs3", "red.#");
var consumer = new EventingBasicConsumer(mod接收);
mod接收.BasicConsume(qname, true, consumer);
//该事件在接收到消息时触发
consumer.Received += Consumer_Received3;
}
private void Consumer_Received3(object sender, BasicDeliverEventArgs e)
{
byte[] body = e.Body.ToArray(); //消息字节数组
string message = e.RoutingKey + Encoding.UTF8.GetString(body); //消息内容
sb.AppendLine(message);
}
设置消息超时:
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Dictionary<string, object> d = new Dictionary<string, object>();
d["x-message-ttl"] = 10 * 1000; //10s超时
string qname = this.txt队列名.Text.Trim();
channel.QueueDeclare(qname, false, false, true, d);//创建一个名称为hello的消息队列
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1; //1表示不持久化,2表示持久化(默认)
properties.Headers = new Dictionary<string, object>();
properties.Headers["文件名"] = "经.mp4";
properties.Headers["文件长度"] = "2520";
string message = qname + DateTime.Now.ToString(); //传递的消息内容
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", qname, properties, body); //开始传递
}
}
浙公网安备 33010602011771号