File tree Expand file tree Collapse file tree 2 files changed +5
-7
lines changed
rsocket-core/src/main/java/io/rsocket Expand file tree Collapse file tree 2 files changed +5
-7
lines changed Original file line number Diff line number Diff line change @@ -31,16 +31,19 @@ public abstract class AbstractRSocket implements RSocket {
3131
3232 @ Override
3333 public Mono <Void > fireAndForget (Payload payload ) {
34+ payload .release ();
3435 return Mono .error (new UnsupportedOperationException ("Fire and forget not implemented." ));
3536 }
3637
3738 @ Override
3839 public Mono <Payload > requestResponse (Payload payload ) {
40+ payload .release ();
3941 return Mono .error (new UnsupportedOperationException ("Request-Response not implemented." ));
4042 }
4143
4244 @ Override
4345 public Flux <Payload > requestStream (Payload payload ) {
46+ payload .release ();
4447 return Flux .error (new UnsupportedOperationException ("Request-Stream not implemented." ));
4548 }
4649
@@ -51,6 +54,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
5154
5255 @ Override
5356 public Mono <Void > metadataPush (Payload payload ) {
57+ payload .release ();
5458 return Mono .error (new UnsupportedOperationException ("Metadata-Push not implemented." ));
5559 }
5660
Original file line number Diff line number Diff line change @@ -253,7 +253,7 @@ public Mono<RSocket> start() {
253253 }
254254
255255 public static class ServerRSocketFactory {
256- private Supplier < SocketAcceptor > acceptor ;
256+ private SocketAcceptor acceptor ;
257257 private Function <Frame , ? extends Payload > frameDecoder = DefaultPayload ::create ;
258258 private Consumer <Throwable > errorConsumer = Throwable ::printStackTrace ;
259259 private int mtu = 0 ;
@@ -277,11 +277,6 @@ public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
277277 }
278278
279279 public ServerTransportAcceptor acceptor (SocketAcceptor acceptor ) {
280- this .acceptor = () -> acceptor ;
281- return ServerStart ::new ;
282- }
283-
284- public ServerTransportAcceptor acceptor (Supplier <SocketAcceptor > acceptor ) {
285280 this .acceptor = acceptor ;
286281 return ServerStart ::new ;
287282 }
@@ -357,7 +352,6 @@ private Mono<Void> processSetupFrame(
357352 RSocket wrappedRSocketClient = plugins .applyClient (rSocketClient );
358353
359354 return acceptor
360- .get ()
361355 .accept (setupPayload , wrappedRSocketClient )
362356 .doOnNext (
363357 unwrappedServerSocket -> {
You can’t perform that action at this time.
0 commit comments