Description
Before Creating the Enhancement Request
- I have confirmed that this should be classified as an enhancement rather than a bug/feature.
Summary
In proxy receiveMessage process, when adding message renew task fails due to client disconnection, we can make the message visible earlier by calling changeInvisibleTime, to reduce delay of the next consumption retry.
Motivation
Reproduce code:
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
String topic = "topic";
String consumerGroup = "group";
String accessKey = "xxx";
String secretKey = "xxx";
String endpoints = "127.0.0.1:8080";
SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
ClientServiceProvider provider = ClientServiceProvider.loadService();
PushConsumerBuilder pushConsumerBuilder = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression("*", FilterExpressionType.TAG)))
.setMessageListener(messageView -> {
System.out.printf("%s [consumer] receive %s attempts=%d%n",
dateFormat.format(new Date()), messageView.getMessageId(), messageView.getDeliveryAttempt());
return ConsumeResult.SUCCESS;
});
// Start consumer
AtomicReference<PushConsumer> consumerRef = new AtomicReference<>(pushConsumerBuilder.build());
System.out.printf("%s [consumer] started %n", dateFormat.format(new Date()));
// Wait for a while
TimeUnit.SECONDS.sleep(3);
new Thread(() -> {
try {
// Restart consumer
consumerRef.get().close();
System.out.printf("%s [consumer] closed %n", dateFormat.format(new Date()));
consumerRef.set(pushConsumerBuilder.build());
System.out.printf("%s [consumer] restarted %n", dateFormat.format(new Date()));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// Wait for a while
TimeUnit.SECONDS.sleep(1);
// Then send one message
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
System.out.printf("%s [producer] started %n", dateFormat.format(new Date()));
MessageBuilder messageBuilder = provider.newMessageBuilder()
.setTopic(topic)
.setBody("BODY".getBytes());
SendReceipt receipt = producer.send(messageBuilder.build());
System.out.printf("%s [producer] send %s %n", dateFormat.format(new Date()), receipt.getMessageId());
TimeUnit.MINUTES.sleep(10);
producer.close();
consumerRef.get().close();
}
Proxy NullPointerException logs:
2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - clear handle of this client when client unregister. group:group, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, lastUpdateTimestamp=1742178611780]
2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove grpc channel when client unregister. group:group, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, lastUpdateTimestamp=1742178611780], removed:false
2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove remoting channel when client unregister. clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, lastUpdateTimestamp=1742178611780]
2025-03-17 10:30:12 ERROR ConsumerProcessorExecutor-9 - internal server error
java.lang.NullPointerException: null
at org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.getChannelId(ReceiptHandleGroupKey.java:34)
at org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.hashCode(ReceiptHandleGroupKey.java:59)
at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
at org.apache.rocketmq.common.utils.ConcurrentHashMapUtils.computeIfAbsent(ConcurrentHashMapUtils.java:48)
at org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager.addReceiptHandle(DefaultReceiptHandleManager.java:128)
at org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor.addReceiptHandle(ReceiptHandleProcessor.java:61)
at org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor.addReceiptHandle(DefaultMessagingProcessor.java:362)
at org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.lambda$receiveMessage$0(ReceiveMessageActivity.java:145)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.rocketmq.common.utils.FutureUtils.lambda$appendNextFuture$0(FutureUtils.java:31)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Describe the Solution You'd Like
Call changeInvisibleTime to make message visible earlier when this happens.
Describe Alternatives You've Considered
/
Additional Context
No response