消息中间件—Rabbitmq

一、介绍

rabbitmq是基于生产者和消费者的一个模型,主要有解耦,异步通信,削峰等作用。

image-20230718234957528

二、下载

官网:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ

具体安装rabbitmq可以参考官方文档:下载并安装 RabbitMQ — 兔子MQ

或者可以参考B站视频:https://www.bilibili.com/video/BV1dE411K7MG?p=2

三、rabbitmq的启动和图形界面

Windows:

image-20230718234004523

上传成功界面:

image-20230718234039887

Linux:

image-20230718234130292

图形界面:

image-20230718234348782

四、rabbitmq的简单使用

1、导入依赖

1
2
3
4
5
><dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.1</version>
></dependency>

2、构建虚拟主机

image-20230718235950254

3、创建用户

image-20230719000222472

4、用户配置虚拟主机

先点击用户名

image-20230719000422633

image-20230719000544676

5、生产者代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class helloworldSender {
@Test
public void testSend() throws IOException, TimeoutException {
//创建连接工厂对象,并且设置相关数据
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("1234");

//获取对象,并连接
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();

//通过通道绑定对应消息队列
//参数1:队列名称如果队列不存在自动创建
//参数2:用来定义队列特性是否要持久化true持久化队列 false不持久化
//参数3:exclusive是否独占队列 true独占队列 false 不独占
//参数4: autoDelete:是否在消费完成后自动删除队列 true自动删除 false 不自动删除
// 参数5:额外附加参数
channel.queueDeclare("hello",false,false,false,null);

//发布消息
//参数1:交换机名称参数2:队列名称参数3:传递消息额外设置参数4:消息的具体内容
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
channel.close();
connection.close();
}
}

运行截图:

image-20230719002600150

创建消费者,因为消费者要不断监听生产者发的消息,所以使用main函数来监听,(而且在消费者中不要关闭连接)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂对象,并且设置相关数据
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("1234");

//获取对象,并连接
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();

//通过通道绑定对应消息队列
//参数1:队列名称如果队列不存在自动创建
//参数2:用来定义队列特性是否要持久化true持久化队列 false不持久化
//参数3:exclusive是否独占队列 true独占队列 false 不独占
//参数4: autoDelete:是否在消费完成后自动删除队列 true自动删除 false 不自动删除
// 参数5:额外附加参数
channel.queueDeclare("hello",false,false,false,null);

//消费消息
//参数1:队列名称 参数2:开始消息的自动确认机制 参数3:消费时的回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = "+ new String(body));
}
});
// channel.close();
// connection.close();
}

image-20230719222034455

6、direct模型图(direct)

image-20230719224438079

五、封装rabbitmq中连接工具类封装

工具类代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static {
//重量级资源,类加载执行一次
connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/ems");
connectionFactory.setPort(5672);
connectionFactory.setHost("localhost");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("1234");
}
//定义提供连接对象的方法
public static Connection getConnection(){
try{
return connectionFactory.newConnection();
}catch (Exception e){
e.printStackTrace();
}
return null;
}
//关闭通道和连接工具方法
public static void closeConnectionAndChannel(Channel channel,Connection connection){
try{
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}

}

六、Work Queues

1、模型

image-20230719224647379

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare("work",true,false,false,null);
for(int i = 0;i<10;i++) {
channel.basicPublish("","work",null,(i + "hello work queue").getBytes());
}
RabbitMQUtils.closeConnectionAndChannel(channel,connection);
}
}

消费者一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1: "+ new String(body));
}
});

}
}

消费者二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-2: "+ new String(body));
}
});

}
}

运行截图:

image-20230719230008611

image-20230719230021167

结论:平均分配

公平调度

您可能已经注意到调度仍然无法正常工作 如我们所愿。例如,在有两个工人的情况下,当所有 奇数消息很重,偶数消息很轻,一个工作人员将是 经常忙碌,另一个几乎不会做任何工作。井 RabbitMQ对此一无所知,仍然会调度 消息均匀。

发生这种情况是因为 RabbitMQ 只是在消息时调度消息 进入队列。它不看未确认的数量 面向消费者的消息。它只是盲目地发送每 n 条消息 给第 n 个消费者。

生产者 -> 队列 -> 消耗:RabbitMQ 调度消息。

为了解决这个问题,我们可以将 basicQos 方法与预取计数 = 1 设置一起使用。这告诉 RabbitMQ 不要付出更多 一次给工作人员一条消息。或者,换句话说,不要调度 给工作人员的新消息,直到它处理并确认 前一个。相反,它会将其调度给下一个尚未繁忙的工作人员。

1
2
3
4
5
int prefetchCount = 1;
channel.basicQos(prefetchCount);//设置消费者消费消息一次只能消费一个,直到确认后才能执行下一个
channel.basicConsume("work",false,new DefaultConsumer(channel){})//设置成false,不让自动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//手动确认,哪一个消费者先执行,直接确认删除队列

七、Fanout(发布订阅)Publish/Subscribe

1、模型图:

image-20230719231846993

image-20230719232104543

2、开发生产者:

1
2
3
4
5
6
7
8
9
10
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//将通道声明交换机 参数一:交换机名称 参数二:交换机类型 fanout
channel.exchangeDeclare("logs","fanout");
channel.basicPublish("logs","",null, "hello fanout".getBytes());
RabbitMQUtils.closeConnectionAndChannel(channel,connection);
}
}

3、消费者:

消费者-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","fanout");
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"logs","");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1: "+ new String(body));
}
});

}
}

消费者-2:

消费者-3:

消费者2和消费者3代码一样

运行截图:

image-20230719233622595

image-20230719233634174

image-20230719233648200

八、Routing

1、模型图:

image-20230719233808379

2、开发生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"logs_direct","error");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1: "+ new String(body));
}
});

}
}

消费者-2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"logs_direct","info");
channel.queueBind(queueName,"logs_direct","error");

channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-2: "+ new String(body));
}
});

}
}

消费者-3:

运行截图:

image-20230719234952664

image-20230719235005767

九、Topic

1、模型图:

image-20230720000158001

  • *(星号)可以代替一个词。
  • #(哈希)可以替换零个或多个单词。

在这个例子中,我们将发送所有描述的消息 动物。消息将使用路由密钥发送,该路由密钥包含 三个字(两个点)。路由密钥中的第一个单词 将描述一个名人,第二个是颜色,第三个是物种: “..”。

我们创建了三个绑定:Q1 绑定绑定键“.orange.”。 Q2 带有“..rabbit”和“lazy.#”。

2、开发生产者:

1
2
3
4
5
6
7
8
9
10
11
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//将通道声明交换机 参数一:交换机名称 参数二:交换机类型 fanout
channel.exchangeDeclare("topics","topic");
String routingKry = "user.save";
channel.basicPublish("topics",routingKry,null, ("hello topic ["+routingKry+"]").getBytes());
RabbitMQUtils.closeConnectionAndChannel(channel,connection);
}
}

消费者-2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"topics","user.#");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1: "+ new String(body));
}
});

}
}

消费者-3:

运行截图:

image-20230720001236253

image-20230720001245861

十、RPC框架

RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ官网自己学习

结束了!!!

rabbitmq的基础学习已完成!!!!

C7C008A6C8F76E701BE47B52769B9D46