RabbitMQ的六种工作模式

一、普通队列模式

1.  一个消费者,一个队列,一个消费者。 2.  消息产生消息放入队列,消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端) 

image
22.gaodi.net/blog/1913282/202207/1913282-20220730231124618-1368574550.png)

  • 获取RabbitMQ连接帮助类

    后面代码,这部分创建连接共用

     public class RabbitMQHelper     {         /// <summary>         /// 获取RabbitMQ连接         /// </summary>         /// <returns></returns>         public static IConnection GetConnection()         {             //实例化连接工厂             var factory = new ConnectionFactory             {                 HostName = "127.0.0.1", //ip                 Port = 5672, // 端口                 UserName = "Admin", // 账户                 Password = "Admin", // 密码                 VirtualHost = "/"   // 虚拟主机             };              return factory.CreateConnection();         }     } 
  • 生产者

    public class Send {      public static void SendMessage()     {         string queueName = "normal";          //1.创建链接         using (var connection = RabbitMQHelper.GetConnection())         {             // 2.创建信道             using(var channel = connection.CreateModel())             {                 // 3.声明队列                 channel.QueueDeclare(queueName, false, false, false, null);                 // 没有绑定交换机,怎么找到路由队列的呢?                 for (int i = 1; i <= 30; i++)                 {                     //4.构建Byte消息数据包                     string message =$"第{i}条消息";                     var body = Encoding.UTF8.GetBytes(message);//消息以二进制形式传输                      // 发送消息到rabbitmq,使用rabbitmq中默认提供交换机路由,默认的路由Key和队列名称完全一致                     //5.发送数据包                     channel.BasicPublish(exchange: "", routingKey: queueName, null, body);                     Thread.Sleep(1000);//添加延迟                     Console.WriteLine("生产:" + message);                 }             }         }      }  } 
  • 消费者

    public class Receive {     public static void ReceiveMessage()     {         // 消费者消费是队列中消息         string queueName = "normal";         //1.建立链接链接         var connection = RabbitMQHelper.GetConnection();         {             //2.建立信道             var channel = connection.CreateModel();             {                 //3.声明队列:如果你先启动是消费端就会异常                 channel.QueueDeclare(queueName, false, false, false, null);                 //4.创建一个消费者实例                 var consumer = new EventingBasicConsumer(channel);                 //5.绑定消息接收后的事件委托                 consumer.Received +=(model, ea) => {                     var message = Encoding.UTF8.GetString(ea.Body.ToArray());                     Thread.Sleep(1000);                     Console.WriteLine(" Normal Received => {0}", message);                 };                  //6.启动消费者                 channel.BasicConsume( queue: queueName, autoAck:true, consumer);//开始消费             }          }      }  } 

二、工作队列模式

  1. 一个消费者,一个队列,多个消费者。但多个消费者中只会有一个会成功地消费消息

  2. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用。

  3. 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)
    image

  • 生产者

     public class WorkerSend     {          public static void SendMessage()         {             string queueName = "Worker_Queue";              using (var connection = RabbitMQHelper.GetConnection())             {                 using(var channel = connection.CreateModel())                 {                     channel.QueueDeclare(queueName, false, false, false, null);                     for (int i = 0; i < 30; i++)                     {                         string message = $"RabbitMQ Worker {i + 1} Message";                         var body = Encoding.UTF8.GetBytes(message);                         channel.BasicPublish("", queueName, null, body);                         Console.WriteLine("send Task {0} message",i + 1);                     }                                     }             }                      }      } 
  • 消费者

      public class WorkerReceive     {         public static void ReceiveMessage()         {             string queueName = "Worker_Queue";             var connection = RabbitMQHelper.GetConnection();             {                 var channel = connection.CreateModel();                 {                     channel.QueueDeclare(queueName, false, false, false, null);                     var consumer = new EventingBasicConsumer(channel);                     //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时,不再分配任务。                     channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);                     consumer.Received +=(model, ea) => {                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());                         Console.WriteLine(" Worker Queue Received => {0}", message);                     };                      channel.BasicConsume(queueName,true, consumer);                 }                             }                    }      } 

三、扇形队列模式(发布/订阅模式)

  1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。每个消费队列中消息一致,且每个消息消费者都从自己的消息队列的第一个消息开始消费,直到最后。

  2. 交换机为rabbitMQ中内部组件。消息生产者将消息发送给rabbitMQ后,rabbitMQ会根据订阅的消费者个数,生成对应数目的消息队列,这样每个消费者都能获取生产者发送的全部消息。

  3. 一旦消费者断开与rabbitMQ的连接,队列就会消失。如果消费者数目很多,对于rabbitMQ而言,也是个重大负担,订阅模式是个长连接,占用并发数,且每个消费者一个队列会占用大量空间

  4. 相关应用场景:邮件群发,群聊,广播
    image

  • 生产者
 public static void SendMessage()         {             //1.创建连接             using (var connection = RabbitMQHelper.GetConnection())             {                 //2.创建信道                 using(var channel = connection.CreateModel())                 {                     // 3.声明交换机对象                     channel.ExchangeDeclare("fanout_exchange", "fanout");                                         // 4.创建队列                     string queueName1 = "fanout_queue1";                     channel.QueueDeclare(queueName1, false, false, false, null);                     string queueName2 = "fanout_queue2";                     channel.QueueDeclare(queueName2, false, false, false, null);                     string queueName3 = "fanout_queue3";                     channel.QueueDeclare(queueName3, false, false, false, null);                                          // 5.绑定到交互机                     // fanout_exchange 绑定了 3个队列                      channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");//指定交换机                     channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");                     channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");                      for (int i = 0; i < 10; i++)                     {                         //6.构建消息byte数组                         string message = $"RabbitMQ Fanout {i + 1} Message";                         var body = Encoding.UTF8.GetBytes(message);                         //7.发送消息                         channel.BasicPublish("fanout_exchange", "", null, body);//同时把消息发送到订阅的三个队列                         Console.WriteLine("Send Fanout {0} message",i + 1);                     }                 }             }                      }      } 
  • 消费者
 public class FanoutConsumer     {         public static void ConsumerMessage()         {             //1.创建连接             var connection = RabbitMQHelper.GetConnection();             {                 //2,。创建信道                 var channel = connection.CreateModel();                 {                     //3.申明exchange                     channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");                                          // 4.创建队列                     string queueName1 = "fanout_queue1";                     channel.QueueDeclare(queueName1, false, false, false, null);                     string queueName2 = "fanout_queue2";                     channel.QueueDeclare(queueName2, false, false, false, null);                     string queueName3 = "fanout_queue3";                     channel.QueueDeclare(queueName3, false, false, false, null);                                          // 5.绑定到交互机                     channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");                     channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");                     channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");                      Console.WriteLine("[*] Waitting for fanout logs.");                      //6.申明consumer                     var consumer = new EventingBasicConsumer(channel);                     //绑定消息接收后的事件委托                     consumer.Received += (model, ea) => {                         var body = ea.Body;                         var message = Encoding.UTF8.GetString(body.ToArray());                         Console.WriteLine("[x] {0}", message);                      };                     //7.启动消费者                     channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);//只会消费队列queueName1中的消息,其他队列中订阅的消息仍然存在                     Console.WriteLine(" Press [enter] to exit.");                     Console.ReadLine();                 }             }         }     } 

四、直接队列模式(Routing路由模式)

  1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。

  2. 消息生产者将消息发送给交换机,交换机按照路由判断,将路由到的RouteKey的消息,推送与之绑定的队列,交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
    image

  • 生产者:
public static void SendMessage()         {             //1.创建连接             using (var connection = RabbitMQHelper.GetConnection())             {                 //2.创建信道                 using(var channel = connection.CreateModel())                 {                     // 3.声明Direct交换机                     channel.ExchangeDeclare("direct_exchange", "direct");                      // 4.创建队列                     string queueName1 = "direct_queue1";                     channel.QueueDeclare(queueName1, false, false, false, null);                     string queueName2 = "direct_queue2";                     channel.QueueDeclare(queueName2, false, false, false, null);                     string queueName3 = "direct_queue3";                     channel.QueueDeclare(queueName3, false, false, false, null);                      // 5.绑定到交互机 指定routingKey                     channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");                     channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");                     channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");                      for (int i = 0; i < 10; i++)                     {                         string message = $"RabbitMQ Direct {i + 1} Message =>green";                         var body = Encoding.UTF8.GetBytes(message);                         // 发送消息的时候需要指定routingKey发送                         channel.BasicPublish(exchange: "direct_exchange", routingKey: "green", null, body);//只发布到RouteKey:green的队列                         Console.WriteLine("Send Direct {0} message",i + 1);                     }                 }             }                      }      } 
  • 消费者
   public class DirectConsumer     {         public static void ConsumerMessage()         {             //1.创建连接             var connection = RabbitMQHelper.GetConnection();             //2.创建通信             var channel = connection.CreateModel();             //3.声明交换机             channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");             //4.绑定交换机             var queueName = "direct_queue2";//队列direct_queue3绑定有red,yellow,green共3个RouteKey             channel.QueueDeclare(queueName, false, false, false, null);             //此处消费通信没有必要绑定所有的RouteKey,根据前生产者通信的路由规则,每个队列中只会路由到一种消息             channel.QueueBind(queue: queueName,                                       exchange: "direct_exchange",                                       routingKey: "red");             channel.QueueBind(queue: queueName,                                       exchange: "direct_exchange",                                       routingKey: "yellow");             channel.QueueBind(queue: queueName,                                       exchange: "direct_exchange",                                       routingKey: "green");              Console.WriteLine(" [*] Waiting for messages.");              //5.实例化消费者             var consumer = new EventingBasicConsumer(channel);             //6.为消费者绑定消费委托事件             consumer.Received += (model, ea) =>             {                 var body = ea.Body;                 var message = Encoding.UTF8.GetString(body.ToArray());                 var routingKey = ea.RoutingKey;                 Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);                  // 消费完成后需要手动签收消息,如果不写该代码就容易导致重复消费问题                 //7.手动确认签收消息                 channel.BasicAck(ea.DeliveryTag, true); // 可以降低每次签收性能损耗             };              // 消息签收模式             // 手动签收 保证正确消费,不会丢消息(基于客户端而已)             // 自动签收 容易丢消息              // 签收:意味着消息从队列中删除             channel.BasicConsume(queue: queueName,                                  autoAck: false,                                  consumer: consumer);//设置为不自动签收,进行手动签收              Console.WriteLine(" Press [enter] to exit.");             Console.ReadLine();         }     } 

五、模糊匹配队列模式(Topic 主题模式)

  1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。

  2. 此时的自己唯一的Routekey,不是一个确定值,像我们熟悉的正则表达式对应的匹配规则。

  3. 生产者产生消息,把消息交给交换机,交换机根据RouteKey的模糊匹配到对应的队列,由队列监听消费者消费消息。

  4. 规则:

    和* 都是通配符,命名规则是多个单词用顿号(.)分隔开

    代表代表一个单词

    *代表多个单词
    image

  • 生产者:
      public static void SendMessage()         {             //1.创建连接             using (var connection = RabbitMQHelper.GetConnection())             {                 //2.创建信道                 using (var channel = connection.CreateModel())                 {                     //3.声明交换机                     channel.ExchangeDeclare("topic_exchange", "topic");                     //4.声明队列                     string queueName1 = "topic_queue1";                     channel.QueueDeclare(queueName1, false, false, false, null);                     string queueName2 = "topic_queue2";                     channel.QueueDeclare(queueName2, false, false, false, null);                     string queueName3 = "topic_queue3";                     channel.QueueDeclare(queueName3, false, false, false, null);                     //5.绑定到交互机                     channel.QueueBind(queue: queueName1, exchange: "topic_exchange", routingKey: "user.data.*");                     channel.QueueBind(queue: queueName2, exchange: "topic_exchange", routingKey: "user.data.delete");                     channel.QueueBind(queue: queueName3, exchange: "topic_exchange", routingKey: "user.data.update");                                      for (int i = 0; i < 10; i++)                     {                         //6.准备发送字节数组                         string message = $"RabbitMQ Topic {i + 1} Delete Message";                         var body = Encoding.UTF8.GetBytes(message);                         //7.根据RouteKey发布消息                         channel.BasicPublish("topic_exchange", "user.data.delete", null, body);//会发布到queueName1,queueName2                         Console.WriteLine("Send Topic {0} message", i + 1);                     }                 }             }          } 
  • 消费者:
 public static void ConsumerMessage()         {             //1.创建连接             var connection = RabbitMQHelper.GetConnection();             //2.创建通信             var channel = connection.CreateModel();             //3.声明交换机             channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");             //4.声明队列             var queueName = "topic_queue3";             channel.QueueDeclare(queueName, false, false, false, null);             //5.绑定交换机             channel.QueueBind(queue: queueName,                                       exchange: "topic_exchange",                                       routingKey: "user.data.*");              Console.WriteLine(" [*] Waiting for messages.");             //6.创建消费者             var consumer = new EventingBasicConsumer(channel);             //7.绑定消费委托事件             consumer.Received += (model, ea) =>             {                 var body = ea.Body;                 var message = Encoding.UTF8.GetString(body.ToArray());                 var routingKey = ea.RoutingKey;                 Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);             };              //8.启动消费             channel.BasicConsume(queue: queueName,                                  autoAck: true,                                  consumer: consumer);              Console.WriteLine(" Press [enter] to exit.");             Console.ReadLine();         } 

六、RPC 模式(了解)

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3、服务端将RPC方法 的结果发送到RPC响应队列。

4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

源码地址

商匡云商
Logo
注册新帐户
对比商品
  • 合计 (0)
对比
0
购物车