Skip to content

Commit a83a5be

Browse files
lhotarionobc
andauthored
Update Pulsar to 3.3.2 (#189)
* Update Pulsar to 3.3.2 * Fix compilation in test due to Pulsar 3.3.2 update * Replace order of `given/willAnswer` for spied objects The cause of the test hang was that the test was incorrectly setting up the spy on the type message builder impl. In previous Pulsar version of TypedMessageBuilderImpl, the fact that the method sendAsync was being called at mock setup time was not causing an issue. However, in the latest impl it did not like that and was throwing things off. Spied objects should always use the `doReturn|Answer|Throw()` family as described in https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#important-gotcha-on-spying-real-objects--heading --------- Co-authored-by: onobc <[email protected]>
1 parent 8d58f8a commit a83a5be

File tree

5 files changed

+14
-13
lines changed

5 files changed

+14
-13
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ junit-jupiter = "5.11.0-RC1"
2929
licenser = "0.6.1"
3030
log4j = "2.23.1"
3131
mockito = "5.12.0"
32-
pulsar = "3.3.1"
32+
pulsar = "3.3.2"
3333
rat-gradle = "0.8.0"
3434
reactor = "3.6.9"
3535
slf4j = "2.0.16"

pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ static PulsarClient createPulsarClient() throws PulsarClientException {
4545
}
4646

4747
static DockerImageName getPulsarImage() {
48-
return DockerImageName.parse("apachepulsar/pulsar:3.3.1");
48+
return DockerImageName.parse("apachepulsar/pulsar:3.3.2");
4949
}
5050

5151
}

pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ void consumerProperties() throws Exception {
7878
PulsarClientImpl pulsarClient = spy(
7979
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
8080
doReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata())).when(pulsarClient)
81-
.getPartitionedTopicMetadata(anyString(), anyBoolean());
81+
.getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean());
8282

8383
Consumer<String> consumer = mock(Consumer.class);
8484
doReturn(CompletableFuture.completedFuture(null)).when(consumer).closeAsync();

pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import static org.mockito.ArgumentMatchers.eq;
7878
import static org.mockito.ArgumentMatchers.isNull;
7979
import static org.mockito.BDDMockito.given;
80+
import static org.mockito.BDDMockito.willAnswer;
8081
import static org.mockito.Mockito.doReturn;
8182
import static org.mockito.Mockito.mock;
8283
import static org.mockito.Mockito.spy;
@@ -195,11 +196,11 @@ void sendOnePulsarException() throws Exception {
195196
given(producer.newMessage()).willAnswer((__) -> {
196197
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
197198
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
198-
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
199+
willAnswer((___) -> {
199200
CompletableFuture<MessageId> failed = new CompletableFuture<>();
200201
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
201202
return failed;
202-
});
203+
}).given(typedMessageBuilder).sendAsync();
203204
return typedMessageBuilder;
204205
});
205206

@@ -231,7 +232,7 @@ void sendManyStopOnError() throws Exception {
231232
given(producer.newMessage()).willAnswer((__) -> {
232233
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
233234
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
234-
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
235+
willAnswer((___) -> {
235236
if (entryId.get() == 1) {
236237
CompletableFuture<MessageId> failed = new CompletableFuture<>();
237238
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
@@ -241,7 +242,7 @@ void sendManyStopOnError() throws Exception {
241242
.newMessageId(1, entryId.incrementAndGet(), 1);
242243
messageIds.add(messageId);
243244
return CompletableFuture.completedFuture(messageId);
244-
});
245+
}).given(typedMessageBuilder).sendAsync();
245246
return typedMessageBuilder;
246247
});
247248

@@ -279,7 +280,7 @@ void sendMany() throws Exception {
279280
given(producer.newMessage()).willAnswer((__) -> {
280281
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
281282
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
282-
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
283+
willAnswer((___) -> {
283284
if (entryId.get() == 2) {
284285
CompletableFuture<MessageId> failed = new CompletableFuture<>();
285286
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
@@ -289,7 +290,7 @@ void sendMany() throws Exception {
289290
.newMessageId(1, entryId.incrementAndGet(), 1);
290291
messageIds.add(messageId);
291292
return CompletableFuture.completedFuture(messageId);
292-
});
293+
}).given(typedMessageBuilder).sendAsync();
293294
return typedMessageBuilder;
294295
});
295296

@@ -498,7 +499,7 @@ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, Flux<Integer>,
498499
given(producer.newMessage()).willAnswer((__) -> {
499500
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
500501
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
501-
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
502+
willAnswer((___) -> {
502503
CompletableFuture<MessageId> messageSender = new CompletableFuture<>();
503504
finalExecutorService.execute(() -> {
504505
long current = totalRequests.incrementAndGet();
@@ -512,7 +513,7 @@ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, Flux<Integer>,
512513
DefaultImplementation.getDefaultImplementation().newMessageId(1, encodedEntryId, 1));
513514
}, 100, TimeUnit.MILLISECONDS);
514515
return messageSender;
515-
});
516+
}).given(typedMessageBuilder).sendAsync();
516517
return typedMessageBuilder;
517518
});
518519

scripts/validate_staging_repo.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ if ! command -v gradle &>/dev/null; then
4040
fi
4141

4242
DOCKER_CONTAINER_NAME=pulsar-standalone-$$
43-
: ${DOCKER_IMAGE_NAME:=apachepulsar/pulsar:3.3.1}
43+
: ${DOCKER_IMAGE_NAME:=apachepulsar/pulsar:3.3.2}
4444

4545
mkdir test-app-reactive-$$
4646
cd test-app-reactive-$$
@@ -90,7 +90,7 @@ public class HelloPulsarClientReactive {
9090
9191
public static void main(String[] args) throws PulsarClientException, InterruptedException {
9292
// Before running this, start Pulsar within docker with this command:
93-
// docker run -it -p 8080:8080 -p 6650:6650 apachepulsar/pulsar:3.3.1 /pulsar/bin/pulsar standalone -nss -nfw
93+
// docker run -it -p 8080:8080 -p 6650:6650 apachepulsar/pulsar:3.3.2 /pulsar/bin/pulsar standalone -nss -nfw
9494
9595
try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
9696

0 commit comments

Comments
 (0)