Skip to content

[Enhancement] Make the message visible earlier when adding renew task fails due to client disconnection #9253

Closed
@redlsz

Description

@redlsz

Before Creating the Enhancement Request

  • I have confirmed that this should be classified as an enhancement rather than a bug/feature.

Summary

In proxy receiveMessage process, when adding message renew task fails due to client disconnection, we can make the message visible earlier by calling changeInvisibleTime, to reduce delay of the next consumption retry.

Motivation

Reproduce code:

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        String topic = "topic";
        String consumerGroup = "group";
        String accessKey = "xxx";
        String secretKey = "xxx";
        String endpoints = "127.0.0.1:8080";

        SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoints)
            .enableSsl(false)
            .setCredentialProvider(sessionCredentialsProvider)
            .build();

        ClientServiceProvider provider = ClientServiceProvider.loadService();
        PushConsumerBuilder pushConsumerBuilder = provider.newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            .setConsumerGroup(consumerGroup)
            .setSubscriptionExpressions(Collections.singletonMap(topic, new FilterExpression("*", FilterExpressionType.TAG)))
            .setMessageListener(messageView -> {
                System.out.printf("%s [consumer] receive %s attempts=%d%n",
                    dateFormat.format(new Date()), messageView.getMessageId(), messageView.getDeliveryAttempt());
                return ConsumeResult.SUCCESS;
            });

        // Start consumer
        AtomicReference<PushConsumer> consumerRef = new AtomicReference<>(pushConsumerBuilder.build());
        System.out.printf("%s [consumer] started %n", dateFormat.format(new Date()));

        // Wait for a while
        TimeUnit.SECONDS.sleep(3);

        new Thread(() -> {
            try {
                // Restart consumer
                consumerRef.get().close();
                System.out.printf("%s [consumer] closed %n", dateFormat.format(new Date()));
                consumerRef.set(pushConsumerBuilder.build());
                System.out.printf("%s [consumer] restarted %n", dateFormat.format(new Date()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        // Wait for a while
        TimeUnit.SECONDS.sleep(1);

        // Then send one message
        Producer producer = provider.newProducerBuilder()
            .setClientConfiguration(clientConfiguration)
            .setTopics(topic)
            .build();
        System.out.printf("%s [producer] started %n", dateFormat.format(new Date()));
        MessageBuilder messageBuilder = provider.newMessageBuilder()
            .setTopic(topic)
            .setBody("BODY".getBytes());
        SendReceipt receipt = producer.send(messageBuilder.build());
        System.out.printf("%s [producer] send %s %n", dateFormat.format(new Date()), receipt.getMessageId());

        TimeUnit.MINUTES.sleep(10);
        producer.close();
        consumerRef.get().close();
    }

Output:
Image

Proxy NullPointerException logs:

2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - clear handle of this client when client unregister. group:group, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, lastUpdateTimestamp=1742178611780]
2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove grpc channel when client unregister. group:group, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, lastUpdateTimestamp=1742178611780], removed:false
2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove remoting channel when client unregister. clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081}, clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413, lastUpdateTimestamp=1742178611780]
2025-03-17 10:30:12 ERROR ConsumerProcessorExecutor-9 - internal server error
java.lang.NullPointerException: null
at org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.getChannelId(ReceiptHandleGroupKey.java:34)
at org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.hashCode(ReceiptHandleGroupKey.java:59)
at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
at org.apache.rocketmq.common.utils.ConcurrentHashMapUtils.computeIfAbsent(ConcurrentHashMapUtils.java:48)
at org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager.addReceiptHandle(DefaultReceiptHandleManager.java:128)
at org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor.addReceiptHandle(ReceiptHandleProcessor.java:61)
at org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor.addReceiptHandle(DefaultMessagingProcessor.java:362)
at org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.lambda$receiveMessage$0(ReceiveMessageActivity.java:145)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.rocketmq.common.utils.FutureUtils.lambda$appendNextFuture$0(FutureUtils.java:31)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

Describe the Solution You'd Like

Call changeInvisibleTime to make message visible earlier when this happens.

Describe Alternatives You've Considered

/

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions