关于RabbitMQ发送延迟队列的实现
生产者生产消息 => 死信队列 => 队列 =>消费者消费队列2中的消息
死信队列 中的消息经由过期转发到 队列 中 再由消费者消费
消息的TTL
死信路由
延迟队列实现代码
package com.yangkeai.rabbitmq.utils;
import com.alibaba.fastjson.JSON;
import com.yangkeai.common.exception.MyException;
import com.yangkeai.rabbitMQ.config.RabbitMQConfig;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* MQ工具类
*
* @author yang yang
* @create 2018/9/12
* @since 1.0.0
*/
public class RabbitMQUtils {
/**
* 获取连接
*
* @return
*/
public static Connection getConnection() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConfig.HOST);
factory.setPort(RabbitMQConfig.PORT);
factory.setUsername(RabbitMQConfig.USERNAME);
factory.setPassword(RabbitMQConfig.PASSWORD);
factory.setConnectionTimeout(RabbitMQConfig.CONNECTIONTIMEOUT);
factory.setShutdownTimeout(RabbitMQConfig.SHUTDOWNTIMEOUT);
try {
return factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
throw new MQException("获取连接失败");
}
/**
* 获取channel对象
*
* @return
*/
public static Channel getChannel() {
Connection connection = getConnection();
try {
return connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
}
throw new MQException("创建Channel失败");
}
/**
* 发送即时消息
*
* @param exchangeName 交换机
* @param queueName 队列名称
* @param payload 消息体
*/
public static void send(String exchangeName, String queueName, Object payload) {
Channel channel = getChannel();
try {
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, queueName);
channel.confirmSelect();
channel.basicPublish(exchangeName, queueName, true,
new AMQP.BasicProperties
.Builder()
.contentType("application/json")
.contentEncoding("UTF-8")
.build(),
JSON.toJSONString(payload).getBytes());
if (!channel.waitForConfirms()) {
throw new MQException("发送消息失败");
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
Connection connection = channel.getConnection();
closeChannel(channel, connection);
}
}
/**
* 发送延时消息
*
* @param exchangeName 交换机
* @param queueName 队列名称
* @param deadQueueName 死信队列名称
* @param payload 消息体
* @param delayTime 延时时间
*/
public static void sendDelay(String exchangeName, String queueName, String deadQueueName, Object payload, Long delayTime) {
Channel channel = getChannel();
try {
//交换器|队列|死信队列 创建与关系绑定
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-length", 10000);
arguments.put("x-dead-letter-exchange", exchangeName);
arguments.put("x-dead-letter-routing-key", queueName);
channel.queueDeclare(deadQueueName, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, queueName);
channel.queueBind(deadQueueName, exchangeName, deadQueueName);
//开启发送
channel.confirmSelect();
//消息的发送
channel.basicPublish(exchangeName, queueName, true,
new AMQP.BasicProperties
.Builder()
.contentType("application/json")
.contentEncoding("UTF-8")
.expiration(delayTime + "")
.build(),
JSON.toJSONString(payload).getBytes());
if (!channel.waitForConfirms()) {
throw new MQException("发送消息失败");
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
Connection connection = channel.getConnection();
closeChannel(channel, connection);
}
}
/**
* get方式获取消息 对应的是
*
* @param queueName 队列名称
* @return
*/
public static String get(String queueName) {
Channel channel = getChannel();
try {
channel.queueDeclare(queueName, true, false, false, null);
channel.basicQos(1);
GetResponse getResponse = channel.basicGet(queueName, false);
if (null != getResponse && null != getResponse.getEnvelope()) {
channel.basicAck(getResponse.getEnvelope().getDeliveryTag(), false);
return new String(getResponse.getBody());
} else {
throw new MQException("获取消息失败");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
Connection connection = channel.getConnection();
closeChannel(channel, connection);
}
throw new MQException("获取消息失败");
}
/**
* 消费者方式获取消息
*
* @param queueName 队列名称
*/
public static void consume(String queueName) {
final Channel channel = getChannel();
try {
channel.queueDeclare(queueName, true, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
if (null != envelope) {
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("队列中存储的消息 :" + new String(body));
}
}
};
channel.basicConsume(queueName, consumer);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
long count = channel.messageCount(queueName);
if (count <= 0) {
Connection connection = channel.getConnection();
closeChannel(channel, connection);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 关闭连接
*
* @param channel 当前通道
* @param connection 当前连接
*/
public static void closeChannel(Channel channel, Connection connection) {
try {
if (null != channel) {
channel.close();
}
if (null != connection) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
package com.yangkeai.rabbitmq.config;
import lombok.Builder;
import lombok.Data;
/**
* rabbitMQ相关的配置
*
* @author yang yang
* @create 2018/9/12
* @since 1.0.0
*/
@Data
@Builder
public class RabbitMQConfig {
public static String HOST = "127.0.0.1";
public static int PORT = 5672;
public static String USERNAME = "guest";
public static String PASSWORD = "guest";
public static int CONNECTIONTIMEOUT = 10_000;
public static int SHUTDOWNTIMEOUT = 10_000;
}