Skip to content

延时消息消费时出现 System.currentTimeMillis() < msg.getStoreTimestamp() 的情况(即消费时间早于存储时间) #9406

Open
@iSourceyou

Description

@iSourceyou

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

服务端:虚拟机(Red Hat Enterprise Linux Server 7.4 (Maipo))
消费端:windows10

RocketMQ version

rockermq版本:5.3.2
rockermq集群模式:2m-2s-sync(双主双从同步模式)

序号 | IP | 角色 | 架构模式 -- | -- | -- | -- 1 | 192.254.178.102 | nameserver、brokerserver | Master1、Slave2 2 | 192.254.178.103 | nameserver、brokerserver | Master2、Slave1

JDK Version

linxu:
java version "1.8.0_144"
Java(TM) SE Runtime Environment (build 1.8.0_144-b01)
Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
windows10:
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

Describe the Bug

1、测试延迟消息消费时,出现 System.currentTimeMillis() < msg.getStoreTimestamp() 的情况(即消费时间早于存储时间),且延时等级为 1 分钟(延时等级 5 对应 1 分钟),但实际消费逻辑正确(1 分钟后消费)
2、时间差异为负数

Steps to Reproduce

1、启动消费者端
`package com.it.wyh.mq.rocketmq.delay;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**

  • 延迟消息消费者

  • @author yunhui

  • @Date 2025/5/13 20:16

  • @SInCE 1.0
    */
    public class Consumer {
    public static void main(String[] args) throws Exception {

     //1.创建消费者Consumer,制定消费者组名
     DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group3");
     //2.指定Nameserver地址
     consumer.setNamesrvAddr("192.254.178.102:9876;192.254.178.103:9876");
     //3.订阅主题Topic和Tag
     consumer.subscribe("DelayTopic","*");
    
     //4.注册监听器,设置回调函数,处理消息
     consumer.registerMessageListener(new MessageListenerConcurrently() {
         /**
          * 接收到消息后,返回一个ConsumeConcurrentlyStatus状态。
          * @param msgs
          * @param context
          * @return ConsumeConcurrentlyStatus
          */
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
             for (MessageExt msg : msgs) {
                 // Print approximate delay time period
                 // 消费时刻
                 long currentTimeMillis = System.currentTimeMillis();
                 System.out.println("Receive message[消息Id=" + msg.getMsgId() + ", 消息内容=" + new String(msg.getBody()) + "] " + (currentTimeMillis - msg.getStoreTimestamp()) + "ms later" + ",消费时刻:" + formatMillis(currentTimeMillis) + ",消费存储时刻:"+ formatMillis(msg.getStoreTimestamp()));
             }
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
     });
    
     //5.启动消费者consumer
     consumer.start();
    
     System.out.printf("Consumer Started.%n");
    

    }

    /**

    • 将毫秒格式化为时:分:秒.毫秒的形式
    • 例如:
    • 1234567 毫秒 -> 00:20:34.567
      */
      public static String formatMillis(long millis) {
      long hours = millis / 3600000;
      long minutes = (millis % 3600000) / 60000;
      long seconds = (millis % 60000) / 1000;
      long ms = millis % 1000;
     return String.format("%02d:%02d:%02d.%03d", hours, minutes, seconds, ms);
    

    }
    }
    2、启动发送端package com.it.wyh.mq.rocketmq.delay;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

/**

  • 延迟消息生产者

  • 注意:延时消息是延迟投递,不是发了后延迟消费

  • @author yunhui

  • @Date 2025/5/13 20:16

  • @SInCE 1.0
    */
    public class Producer {
    public static void main(String[] args) throws Exception {
    //1.创建消息生产者producer,并制定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer("group3");
    //2.指定Nameserver地址
    producer.setNamesrvAddr("192.254.178.102:9876;192.254.178.103:9876");
    //3.启动producer
    producer.start();

     for (int i = 0; i < 10; i++) {
         try {
             //4.创建消息对象,指定主题Topic、Tag和消息体
             /*
             参数一:消息主题Topic
             参数二:消息Tag
             参数三:消息内容
              */
             Message msg = new Message("DelayTopic", "Tag1", ("Hello RocketMQ,我是延迟消息" + i).getBytes(StandardCharsets.UTF_8));
    
             // 发送前设置延时等级,延时等级对应消息的延迟时间,等级3是10秒
             msg.setDelayTimeLevel(5);
    
             //5.发送延时消息
             // 发送前时间
             long startTimeMillis = System.currentTimeMillis();
             SendResult sendResult = producer.send(msg);
             long endTimeMillis  = System.currentTimeMillis();
             long elapsedMillis = endTimeMillis - startTimeMillis;
             System.out.println(sendResult + ",发送前时间: " + formatMillis(startTimeMillis) + ",发送后时间: " + formatMillis(endTimeMillis) + ",时间差异: " + elapsedMillis + "ms");
    
             //避免因为频繁调用导致的报错,模拟休眠
             TimeUnit.SECONDS.sleep(2);
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
    
     //6.关闭生产者producer
     producer.shutdown();
    

    }

    /**

    • 将毫秒格式化为时:分:秒.毫秒的形式
    • 例如:
    • 1234567 毫秒 -> 00:20:34.567
      */
      public static String formatMillis(long millis) {
      long hours = millis / 3600000;
      long minutes = (millis % 3600000) / 60000;
      long seconds = (millis % 60000) / 1000;
      long ms = millis % 1000;
     return String.format("%02d:%02d:%02d.%03d", hours, minutes, seconds, ms);
    

    }
    }
    3、发送端消息结果org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:866)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1603)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1510)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.sendDirect(DefaultMQProducer.java:759)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:471)
    at com.it.wyh.mq.rocketmq.delay.Producer.main(Producer.java:43)
    SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A40610001, offsetMsgId=C0FEB26600002A9F000000000000CDD4, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=1], queueOffset=18, recallHandle=null],发送前时间: 485330:02:28.321,发送后时间: 485330:02:29.411,时间差异: 1090ms
    SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A4C740002, offsetMsgId=C0FEB26600002A9F000000000000CF0D, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=2], queueOffset=19, recallHandle=null],发送前时间: 485330:02:31.412,发送后时间: 485330:02:31.419,时间差异: 7ms
    SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A544C0003, offsetMsgId=C0FEB26600002A9F000000000000D046, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=3], queueOffset=20, recallHandle=null],发送前时间: 485330:02:33.420,发送后时间: 485330:02:33.439,时间差异: 19ms
    SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A5C300004, offsetMsgId=C0FEB26700002A9F000000000000C3AA, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-b, queueId=0], queueOffset=21, recallHandle=null],发送前时间: 485330:02:35.440,发送后时间: 485330:02:35.446,时间差异: 6ms
    SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A64070005, offsetMsgId=C0FEB26700002A9F000000000000C4E3, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-b, queueId=1], queueOffset=22, recallHandle=null],发送前时间: 485330:02:37.447,发送后时间: 485330:02:37.450,时间差异: 3ms
    SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A6BDA0006, offsetMsgId=C0FEB26700002A9F000000000000C61C, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-b, queueId=2], queueOffset=23, recallHandle=null],发送前时间: 485330:02:39.450,发送后时间: 485330:02:39.453,时间差异: 3ms
    SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A73AE0007, offsetMsgId=C0FEB26700002A9F000000000000C755, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-b, queueId=3], queueOffset=24, recallHandle=null],发送前时间: 485330:02:41.454,发送后时间: 485330:02:41.457,时间差异: 3ms
    SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A7B810008, offsetMsgId=C0FEB26600002A9F000000000000D17F, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=0], queueOffset=21, recallHandle=null],发送前时间: 485330:02:43.457,发送后时间: 485330:02:43.468,时间差异: 11ms
    SendResult [sendStatus=SEND_OK, msgId=A9FE8101269418B4AAC2451A835D0009, offsetMsgId=C0FEB26600002A9F000000000000D2B8, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=1], queueOffset=22, recallHandle=null],发送前时间: 485330:02:45.469,发送后时间: 485330:02:45.482,时间差异: 13ms
    4、消费端消费结果Consumer Started.
    Receive message[消息Id=A9FE8101269418B4AAC2451A40610001, 消息内容=Hello RocketMQ,我是延迟消息1] -767ms later,消费时刻:485330:03:29.505,消费存储时刻:485330:03:30.272
    Receive message[消息Id=A9FE8101269418B4AAC2451A4C740002, 消息内容=Hello RocketMQ,我是延迟消息2] -774ms later,消费时刻:485330:03:31.511,消费存储时刻:485330:03:32.285
    Receive message[消息Id=A9FE8101269418B4AAC2451A544C0003, 消息内容=Hello RocketMQ,我是延迟消息3] -774ms later,消费时刻:485330:03:33.440,消费存储时刻:485330:03:34.214
    Receive message[消息Id=A9FE8101269418B4AAC2451A5C300004, 消息内容=Hello RocketMQ,我是延迟消息4] -747ms later,消费时刻:485330:03:35.521,消费存储时刻:485330:03:36.268
    Receive message[消息Id=A9FE8101269418B4AAC2451A64070005, 消息内容=Hello RocketMQ,我是延迟消息5] -779ms later,消费时刻:485330:03:37.511,消费存储时刻:485330:03:38.290
    Receive message[消息Id=A9FE8101269418B4AAC2451A6BDA0006, 消息内容=Hello RocketMQ,我是延迟消息6] -779ms later,消费时刻:485330:03:39.527,消费存储时刻:485330:03:40.306
    Receive message[消息Id=A9FE8101269418B4AAC2451A73AE0007, 消息内容=Hello RocketMQ,我是延迟消息7] -777ms later,消费时刻:485330:03:41.536,消费存储时刻:485330:03:42.313
    Receive message[消息Id=A9FE8101269418B4AAC2451A7B810008, 消息内容=Hello RocketMQ,我是延迟消息8] -769ms later,消费时刻:485330:03:43.520,消费存储时刻:485330:03:44.289
    Receive message[消息Id=A9FE8101269418B4AAC2451A835D0009, 消息内容=Hello RocketMQ,我是延迟消息9] -727ms later,消费时刻:485330:03:45.571,消费存储时刻:485330:03:46.298
    `

What Did You Expect to See?

1、System.currentTimeMillis()>< msg.getStoreTimestamp()
2、差异为正数

What Did You See Instead?

Image

Additional Context

1、做过linux和windows时钟同步
2、深入分析过原理
3、未做深入debug调试

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions