Description
Before Creating the Bug Report
-
I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
服务端:虚拟机(Red Hat Enterprise Linux Server 7.4 (Maipo))
消费端:windows10
RocketMQ version
rockermq版本:5.3.2
rockermq集群模式:2m-2s-sync(双主双从同步模式)
JDK Version
linxu:
java version "1.8.0_144"
Java(TM) SE Runtime Environment (build 1.8.0_144-b01)
Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
windows10:
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
Describe the Bug
1、测试延迟消息消费时,出现 System.currentTimeMillis() < msg.getStoreTimestamp() 的情况(即消费时间早于存储时间),且延时等级为 1 分钟(延时等级 5 对应 1 分钟),但实际消费逻辑正确(1 分钟后消费)
2、时间差异为负数
Steps to Reproduce
1、启动消费者端
`package com.it.wyh.mq.rocketmq.delay;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
-
延迟消息消费者
-
@author yunhui
-
@Date 2025/5/13 20:16
-
@SInCE 1.0
*/
public class Consumer {
public static void main(String[] args) throws Exception {//1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group3"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.254.178.102:9876;192.254.178.103:9876"); //3.订阅主题Topic和Tag consumer.subscribe("DelayTopic","*"); //4.注册监听器,设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 接收到消息后,返回一个ConsumeConcurrentlyStatus状态。 * @param msgs * @param context * @return ConsumeConcurrentlyStatus */ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { // Print approximate delay time period // 消费时刻 long currentTimeMillis = System.currentTimeMillis(); System.out.println("Receive message[消息Id=" + msg.getMsgId() + ", 消息内容=" + new String(msg.getBody()) + "] " + (currentTimeMillis - msg.getStoreTimestamp()) + "ms later" + ",消费时刻:" + formatMillis(currentTimeMillis) + ",消费存储时刻:"+ formatMillis(msg.getStoreTimestamp())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者consumer consumer.start(); System.out.printf("Consumer Started.%n");
}
/**
- 将毫秒格式化为时:分:秒.毫秒的形式
- 例如:
- 1234567 毫秒 -> 00:20:34.567
*/
public static String formatMillis(long millis) {
long hours = millis / 3600000;
long minutes = (millis % 3600000) / 60000;
long seconds = (millis % 60000) / 1000;
long ms = millis % 1000;
return String.format("%02d:%02d:%02d.%03d", hours, minutes, seconds, ms);
}
}
2、启动发送端
package com.it.wyh.mq.rocketmq.delay;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
/**
-
延迟消息生产者
-
注意:延时消息是延迟投递,不是发了后延迟消费
-
@author yunhui
-
@Date 2025/5/13 20:16
-
@SInCE 1.0
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group3");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.254.178.102:9876;192.254.178.103:9876");
//3.启动producer
producer.start();for (int i = 0; i < 10; i++) { try { //4.创建消息对象,指定主题Topic、Tag和消息体 /* 参数一:消息主题Topic 参数二:消息Tag 参数三:消息内容 */ Message msg = new Message("DelayTopic", "Tag1", ("Hello RocketMQ,我是延迟消息" + i).getBytes(StandardCharsets.UTF_8)); // 发送前设置延时等级,延时等级对应消息的延迟时间,等级3是10秒 msg.setDelayTimeLevel(5); //5.发送延时消息 // 发送前时间 long startTimeMillis = System.currentTimeMillis(); SendResult sendResult = producer.send(msg); long endTimeMillis = System.currentTimeMillis(); long elapsedMillis = endTimeMillis - startTimeMillis; System.out.println(sendResult + ",发送前时间: " + formatMillis(startTimeMillis) + ",发送后时间: " + formatMillis(endTimeMillis) + ",时间差异: " + elapsedMillis + "ms"); //避免因为频繁调用导致的报错,模拟休眠 TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } } //6.关闭生产者producer producer.shutdown();
}
/**
- 将毫秒格式化为时:分:秒.毫秒的形式
- 例如:
- 1234567 毫秒 -> 00:20:34.567
*/
public static String formatMillis(long millis) {
long hours = millis / 3600000;
long minutes = (millis % 3600000) / 60000;
long seconds = (millis % 60000) / 1000;
long ms = millis % 1000;
return String.format("%02d:%02d:%02d.%03d", hours, minutes, seconds, ms);
}
}
3、发送端消息结果
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:866)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1603)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1510)
at org.apache.rocketmq.client.producer.DefaultMQProducer.sendDirect(DefaultMQProducer.java:759)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:471)
at com.it.wyh.mq.rocketmq.delay.Producer.main(Producer.java:43)
SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A40610001, offsetMsgId=C0FEB26600002A9F000000000000CDD4, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=1], queueOffset=18, recallHandle=null],发送前时间: 485330:02:28.321,发送后时间: 485330:02:29.411,时间差异: 1090ms
SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A4C740002, offsetMsgId=C0FEB26600002A9F000000000000CF0D, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=2], queueOffset=19, recallHandle=null],发送前时间: 485330:02:31.412,发送后时间: 485330:02:31.419,时间差异: 7ms
SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A544C0003, offsetMsgId=C0FEB26600002A9F000000000000D046, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=3], queueOffset=20, recallHandle=null],发送前时间: 485330:02:33.420,发送后时间: 485330:02:33.439,时间差异: 19ms
SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A5C300004, offsetMsgId=C0FEB26700002A9F000000000000C3AA, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-b, queueId=0], queueOffset=21, recallHandle=null],发送前时间: 485330:02:35.440,发送后时间: 485330:02:35.446,时间差异: 6ms
SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A64070005, offsetMsgId=C0FEB26700002A9F000000000000C4E3, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-b, queueId=1], queueOffset=22, recallHandle=null],发送前时间: 485330:02:37.447,发送后时间: 485330:02:37.450,时间差异: 3ms
SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A6BDA0006, offsetMsgId=C0FEB26700002A9F000000000000C61C, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-b, queueId=2], queueOffset=23, recallHandle=null],发送前时间: 485330:02:39.450,发送后时间: 485330:02:39.453,时间差异: 3ms
SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A73AE0007, offsetMsgId=C0FEB26700002A9F000000000000C755, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-b, queueId=3], queueOffset=24, recallHandle=null],发送前时间: 485330:02:41.454,发送后时间: 485330:02:41.457,时间差异: 3ms
SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A7B810008, offsetMsgId=C0FEB26600002A9F000000000000D17F, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=0], queueOffset=21, recallHandle=null],发送前时间: 485330:02:43.457,发送后时间: 485330:02:43.468,时间差异: 11ms
SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A835D0009, offsetMsgId=C0FEB26600002A9F000000000000D2B8, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=1], queueOffset=22, recallHandle=null],发送前时间: 485330:02:45.469,发送后时间: 485330:02:45.482,时间差异: 13ms
4、消费端消费结果
Consumer Started.
Receive message[消息Id=A9FE8101269418B4AAC2451A40610001, 消息内容=Hello RocketMQ,我是延迟消息1] -767ms later,消费时刻:485330:03:29.505,消费存储时刻:485330:03:30.272
Receive message[消息Id=A9FE8101269418B4AAC2451A4C740002, 消息内容=Hello RocketMQ,我是延迟消息2] -774ms later,消费时刻:485330:03:31.511,消费存储时刻:485330:03:32.285
Receive message[消息Id=A9FE8101269418B4AAC2451A544C0003, 消息内容=Hello RocketMQ,我是延迟消息3] -774ms later,消费时刻:485330:03:33.440,消费存储时刻:485330:03:34.214
Receive message[消息Id=A9FE8101269418B4AAC2451A5C300004, 消息内容=Hello RocketMQ,我是延迟消息4] -747ms later,消费时刻:485330:03:35.521,消费存储时刻:485330:03:36.268
Receive message[消息Id=A9FE8101269418B4AAC2451A64070005, 消息内容=Hello RocketMQ,我是延迟消息5] -779ms later,消费时刻:485330:03:37.511,消费存储时刻:485330:03:38.290
Receive message[消息Id=A9FE8101269418B4AAC2451A6BDA0006, 消息内容=Hello RocketMQ,我是延迟消息6] -779ms later,消费时刻:485330:03:39.527,消费存储时刻:485330:03:40.306
Receive message[消息Id=A9FE8101269418B4AAC2451A73AE0007, 消息内容=Hello RocketMQ,我是延迟消息7] -777ms later,消费时刻:485330:03:41.536,消费存储时刻:485330:03:42.313
Receive message[消息Id=A9FE8101269418B4AAC2451A7B810008, 消息内容=Hello RocketMQ,我是延迟消息8] -769ms later,消费时刻:485330:03:43.520,消费存储时刻:485330:03:44.289
Receive message[消息Id=A9FE8101269418B4AAC2451A835D0009, 消息内容=Hello RocketMQ,我是延迟消息9] -727ms later,消费时刻:485330:03:45.571,消费存储时刻:485330:03:46.298
`
What Did You Expect to See?
1、System.currentTimeMillis()>< msg.getStoreTimestamp()
2、差异为正数
What Did You See Instead?
Additional Context
1、做过linux和windows时钟同步
2、深入分析过原理
3、未做深入debug调试