add rabbitMQ demo
parent
de16f1cf12
commit
4c53ecbbe5
8
pom.xml
8
pom.xml
|
@ -285,6 +285,14 @@
|
|||
<version>${kotlin.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
|
||||
<dependency>
|
||||
<groupId>com.rabbitmq</groupId>
|
||||
<artifactId>amqp-client</artifactId>
|
||||
<version>4.0.2</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<build>
|
||||
<finalName>java-utils</finalName>
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
package me.ehlxr.rabbitmq;
|
||||
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class ConnectionUtil {
|
||||
public static Connection getConnection() throws IOException, TimeoutException {
|
||||
// 连接工厂
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
// 连接 5672 端口;注意 15672 为工具界面端口;25672 为集群端口
|
||||
factory.setPort(5672);
|
||||
/*
|
||||
* 当我们在创建用户时,会指定用户能访问一个虚拟机,并且该用户只能访问该虚拟机下的队列和交换机,如果没有指定,默认的是”/”;
|
||||
* 一个 rabbitmq 服务器上可以运行多个 vhost,以便于适用不同的业务需要,
|
||||
* 这样做既可以满足权限配置的要求,也可以避免不同业务之间队列、交换机的命名冲突问题,因为不同 vhost 之间是隔离的。
|
||||
*/
|
||||
factory.setVirtualHost("/tdd");
|
||||
factory.setUsername("ehlxr");
|
||||
factory.setPassword("123456");
|
||||
|
||||
//获取连接
|
||||
return factory.newConnection();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package me.ehlxr.rabbitmq.routing;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.QueueingConsumer;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* 发送消息到交换机并且要指定路由 key ,消费者将队列绑定到交换机时需要指定路由 key
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Receiver1 {
|
||||
private final static String QUEUE_NAME = "queue_routing";
|
||||
private final static String EXCHANGE_NAME = "exchange_direct";
|
||||
|
||||
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
|
||||
// 获取到连接以及mq通道
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");
|
||||
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
|
||||
|
||||
channel.basicQos(1);
|
||||
|
||||
QueueingConsumer consumer = new QueueingConsumer(channel);
|
||||
channel.basicConsume(QUEUE_NAME, false, consumer);
|
||||
|
||||
while (true) {
|
||||
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
|
||||
String message = new String(delivery.getBody());
|
||||
System.out.println("[x] Received1 " + message);
|
||||
Thread.sleep(10);
|
||||
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package me.ehlxr.rabbitmq.routing;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.QueueingConsumer;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* 发送消息到交换机并且要指定路由 key ,消费者将队列绑定到交换机时需要指定路由 key
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Receiver2 {
|
||||
private final static String QUEUE_NAME = "queue_routing2";
|
||||
private final static String EXCHANGE_NAME = "exchange_direct";
|
||||
|
||||
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
|
||||
// 获取到连接以及mq通道
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
|
||||
|
||||
channel.basicQos(1);
|
||||
|
||||
QueueingConsumer consumer = new QueueingConsumer(channel);
|
||||
channel.basicConsume(QUEUE_NAME, false, consumer);
|
||||
|
||||
while (true) {
|
||||
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
|
||||
String message = new String(delivery.getBody());
|
||||
System.out.println("[x] Received2 " + message);
|
||||
Thread.sleep(10);
|
||||
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package me.ehlxr.rabbitmq.routing;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* 发送消息到交换机并且要指定路由 key ,消费者将队列绑定到交换机时需要指定路由 key
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Sender {
|
||||
private final static String EXCHANGE_NAME = "exchange_direct";
|
||||
private final static String EXCHANGE_TYPE = "direct";
|
||||
|
||||
public static void main(String[] args) throws IOException, TimeoutException {
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
|
||||
|
||||
String message = "那一定是蓝色";
|
||||
channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());
|
||||
System.out.println("[x] Sent '" + message + "'");
|
||||
|
||||
channel.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package me.ehlxr.rabbitmq.simple;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.QueueingConsumer;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* 简单模式:一个生产者,一个消费者
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Receiver {
|
||||
private final static String QUEUE_NAME = "simple_queue";
|
||||
|
||||
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
|
||||
//获取连接
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
//获取通道
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
QueueingConsumer consumer = new QueueingConsumer(channel);
|
||||
channel.basicConsume(QUEUE_NAME, true, consumer);
|
||||
|
||||
while (true) {
|
||||
//该方法会阻塞
|
||||
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
|
||||
String message = new String(delivery.getBody());
|
||||
System.out.println("[x] Received '" + message + "'");
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
package me.ehlxr.rabbitmq.simple;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* 简单模式:一个生产者,一个消费者
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
|
||||
public class Sender {
|
||||
private final static String QUEUE_NAME = "simple_queue";
|
||||
|
||||
public static void main(String[] args) throws IOException, TimeoutException {
|
||||
//创建连接
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
//创建通道
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
|
||||
/*
|
||||
* 声明队列
|
||||
* 1. 队列名
|
||||
* 2. 是否持久化
|
||||
* 3. 是否排外(即只允许该channel访问该队列 一般等于true的话用于一个队列只能有一个消费者来消费的场景)
|
||||
* 4. 是否自动删除(消费完删除)
|
||||
* 6. 其他属性
|
||||
*
|
||||
*/
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
|
||||
/*
|
||||
* 消息内容
|
||||
* 1. 交换机
|
||||
* 2. 队列名
|
||||
* 3. 其他属性(路由)
|
||||
* 4. 消息body
|
||||
*/
|
||||
String message = "错的不是我,是这个世界~";
|
||||
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
|
||||
System.out.println("[x]Sent '" + message + "'");
|
||||
|
||||
//最后关闭通关和连接
|
||||
channel.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package me.ehlxr.rabbitmq.topic;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.QueueingConsumer;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* topic 模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#” 匹配一个词或多个词,“*” 只匹配一个词。
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Receiver1 {
|
||||
private final static String QUEUE_NAME = "queue_topic2";
|
||||
private final static String EXCHANGE_NAME = "exchange_topic";
|
||||
|
||||
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
|
||||
|
||||
channel.basicQos(1);
|
||||
|
||||
QueueingConsumer consumer = new QueueingConsumer(channel);
|
||||
channel.basicConsume(QUEUE_NAME, false, consumer);
|
||||
|
||||
while (true) {
|
||||
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
|
||||
String message = new String(delivery.getBody());
|
||||
System.out.println("[x] Received1 '" + message + "'");
|
||||
Thread.sleep(10);
|
||||
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package me.ehlxr.rabbitmq.topic;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.QueueingConsumer;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* topic 模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#” 匹配一个词或多个词,“*” 只匹配一个词。
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Receiver2 {
|
||||
private final static String QUEUE_NAME = "queue_topic2";
|
||||
private final static String EXCHANGE_NAME = "exchange_topic";
|
||||
|
||||
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
|
||||
|
||||
channel.basicQos(1);
|
||||
|
||||
QueueingConsumer consumer = new QueueingConsumer(channel);
|
||||
channel.basicConsume(QUEUE_NAME, false, consumer);
|
||||
|
||||
while (true) {
|
||||
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
|
||||
String message = new String(delivery.getBody());
|
||||
System.out.println("[x] Received2 '" + message + "'");
|
||||
Thread.sleep(10);
|
||||
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package me.ehlxr.rabbitmq.topic;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* topic 模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#” 匹配一个词或多个词,“*” 只匹配一个词。
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Sender {
|
||||
private final static String EXCHANGE_NAME = "exchange_topic";
|
||||
private final static String EXCHANGE_TYPE = "topic";
|
||||
|
||||
public static void main(String[] args) throws IOException, TimeoutException {
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
|
||||
|
||||
//消息内容
|
||||
String message = "如果真爱有颜色";
|
||||
channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
|
||||
System.out.println("[x] Sent '" + message + "'");
|
||||
|
||||
//关通道 关连接
|
||||
channel.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package me.ehlxr.rabbitmq.worker;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.QueueingConsumer;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* work 模式:一个生产者,多个消费者,每个消费者获取到的消息唯一
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Receiver1 {
|
||||
private final static String QUEUE_NAME = "queue_work";
|
||||
|
||||
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
|
||||
/*
|
||||
* 同一时刻服务器只会发送一条消息给消费者
|
||||
*
|
||||
* channel.basicQos: 是指通道 channel 每次能够接收的消费者最大值 https://www.rabbitmq.com/consumer-prefetch.html
|
||||
* 若将该行代码注释,则 channel 无限制,消息将很快发送完毕,只不过消息阻塞在队列中
|
||||
*
|
||||
*/
|
||||
channel.basicQos(1);
|
||||
|
||||
QueueingConsumer consumer = new QueueingConsumer(channel);
|
||||
channel.basicConsume(QUEUE_NAME, false, consumer);
|
||||
|
||||
while (true) {
|
||||
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
|
||||
String message = new String(delivery.getBody());
|
||||
System.out.println("[x] Received1 '" + message + "'");
|
||||
|
||||
Thread.sleep(10);
|
||||
//返回确认状态
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package me.ehlxr.rabbitmq.worker;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.QueueingConsumer;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* work 模式:一个生产者,多个消费者,每个消费者获取到的消息唯一
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Receiver2 {
|
||||
private final static String QUEUE_NAME = "queue_work";
|
||||
|
||||
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
channel.basicQos(1);
|
||||
|
||||
QueueingConsumer consumer = new QueueingConsumer(channel);
|
||||
channel.basicConsume(QUEUE_NAME, false, consumer);
|
||||
|
||||
while (true) {
|
||||
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
|
||||
String message = new String(delivery.getBody());
|
||||
System.out.println("[x] Received2 '" + message + "'");
|
||||
|
||||
Thread.sleep(10);
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package me.ehlxr.rabbitmq.worker;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import me.ehlxr.rabbitmq.ConnectionUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* work 模式:一个生产者,多个消费者,每个消费者获取到的消息唯一
|
||||
*
|
||||
* @author lixiangrong
|
||||
* @since 2019-01-22.
|
||||
*/
|
||||
public class Sender {
|
||||
private final static String QUEUE_NAME = "queue_work";
|
||||
|
||||
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
|
||||
Connection connection = ConnectionUtil.getConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
String message = "冬马小三" + i;
|
||||
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
|
||||
System.out.println("[x] Sent '" + message + "'");
|
||||
Thread.sleep(i * 10);
|
||||
}
|
||||
|
||||
channel.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue