写点什么

Day177

  • 2022 年 5 月 11 日
  • 本文字数:3397 字

    阅读完需:约 11 分钟



(3) 异步发送如何确认发送成功



public class JmsProduce_asynsc {


public static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616";


public static final String QUEUE_NAME = "asynsc01";


public static void main(String[] args) throws JMSException {


ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);


mqConnectionFactory.setUseAsyncSend(true);//设置异步投递


Connection conn = mqConnectionFactory.createConnection();


conn.start();


Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);


Queue queue = session.createQueue(QUEUE_NAME);


ActiveMQMessageProducer mqMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);


mqMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);


TextMessage textMessage = null;


for (int i = 1; i <= 3; i++) {


textMessage = session.createTextMessage("msg--asynsc01-----" + i);


textMessage.setJMSMessageID(UUID.randomUUID().toString().substring(0,3)+"---orderAchang");


String msgID = textMessage.getJMSMessageID();


//new AsyncCallback()设置异步投送的回调函数


mqMessageProducer.send(textMessage, new AsyncCallback() {


@Override


public void onSuccess() {


//发送成功的情况


System.out.println(msgID+"has benn ok send");


}


@Override


public void onException(JMSException exception) {


//发送失败的情况


System.out.println(msgID+"fail to send to mq");


}


});


}


mqMessageProducer.close();


session.close();


conn.close();


System.out.println("========消息发布到 MQ 完成===========");


}


}


控制台观察发送消息的信息:


![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-erwdgyTo-1611823661237)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130728377.png)]](https:/ 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 /img-blog.csdnimg.cn/20210128164820757.png)





[](()2 延迟投递和定时投递




(1) 介绍


官网文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html




(2) 修改配置文件并重启




</bean>


<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >


<destinationPolicy>


之后重启 activemq




(3) 代码实现


生产者代码:


public class JmsProduce_delayAndschedule {


public static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616";


public static final String QUEUE_NAME = "asynsc01";


public static void main(String[] args) throws JMSException {


ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);


mqConnectionFactory.setUseAsyncSend(true);//设置异步投递


Connection conn = mqConnectionFactory.createConnection();


conn.start();


Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);


Queue queue = session.createQueue(QUEUE_NAME);


ActiveMQMessageProducer mqMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);


==================================================================================


long delay = 3 * 1000; //延迟 3 秒


long period = 4 * 1000; //四秒钟投递一次


int repeat = 5; //投递次数


TextMessage textMessage = null;


for (int i = 1; i <= 3; i++) {


textMessage = session.createTextMessage("msg--delay--正文内容-----" + i);


//消息属性设置


textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);


textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);


textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);


// 此处的意思:该条消息,等待 3 秒,之后每 4 秒发送一次,重复发送 5 次。


=====================================================================================


mqMessageProducer.send(textMessage);


}


mqMessageProducer.close();


session.close();


conn.close();


System.out.println("========消息发布到 MQ 完成===========");


}


}


消费者代码:与之前一样




[](()3 消息消费的重试机制




(1) 是什么


官网文档:http://activemq.apache.org/redelivery-policy


是什么: 消费者收到消息,之后出现异常了,没有告诉 broker 确认收到该消息,broker 会尝试再将该消息发送给消费者。尝试 n 次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后 broker 不会再将该消息发送给消费者。


(2) 具体哪些情况会引发消息重发


① Client 用了 transactions 且再 session 中调用了 rollback


② Client 用了 transactions 且再调用 commit 之前关闭或者没有 commit


③ Client 再 CLIENT_ACKNOWLEDGE 的传递模式下,session 中调用了 recover


(3) 请说说消息重发时间间隔和重发次数


间隔:1


次数:6


每秒发 6 次


(4) 有毒消息 Poison ACK


一个消息被 redelivedred 超过默认的最大重发次数(默认 6 次)时,消费的回个 MQ 发一个“poison ack”表示这个消息有毒,告诉 broker 不要再发了。这个时候 broker 会把这个消息放到 DLQ(死信队列)。


(5) 属性说明



(6) 代码验证


生产者。发送 3 条数据。代码省略…


消费者。开启事务,却没有 commit。重启消费者,前 6 次都能收到消息,到第 7 次,不会再收到消息。代码:


public class Jms_TX_Consumer {


private static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616";


private static final String ACTIVEMQ_QUEUE_NAME = "dead01";


public static void main(String[] args) throws JMSException, IOException {


ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);


Connection connection = activeMQConnectionFactory.createConnection();


connection.start();


final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);


Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);


MessageConsumer messageConsumer = session.createConsumer(queue);


messageConsumer.setMessageListener(new MessageListener() {


public void onMessage(Message message) {


if (message instanceof TextMessage) {


TextMessage textMessage = (TextMessage) message;


try {


System.out.println("***消费者接收到的消息: " + textMessage.getText());


//session.commit();


}catch (Exception e){


e.printStackTrace();


}


}


}


});


//关闭资源


System.in.read();


messageConsumer.close();


session.close();


connection.close();


}


}


activemq 管理后台。多了一个名为 ActiveMQ.DLQ 队列,里面多了 3 条消息。





(7) 代码修改默认参数


修改重试次数为 3。更多的设置请参考官网文档。


消费者代码


public class Jms_TX_Consumer {


private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";


private static final String ACTIVEMQ_QUEUE_NAME = "dead01";


public static void main(String[] args) throws JMSException, IOException {


ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);


=====================================================================


// 修改默认参数,设置消息消费重试 3 次


RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();


redeliveryPolicy.setMaximumRedeliveries(3);


activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);


=====================================================================


Connection connection = activeMQConnectionFactory.createConnection();


connection.start();


final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);


Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);


MessageConsumer messageConsumer = session.createConsumer(queue);


messageConsumer.setMessageListener(new MessageListener() {

用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
Day177_Java_爱好编程进阶_InfoQ写作社区