Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions src/main/java/com/github/dockerjava/netty/InvocationBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -75,6 +76,57 @@ public void onNext(Void object) {
}
}

/**
* Implementation of {@link ResultCallback} with the single result event expected.
*/
public static class AsyncResultCallback<A_RES_T>
extends ResultCallbackTemplate<AsyncResultCallback<A_RES_T>, A_RES_T> {

private A_RES_T result = null;

private final CountDownLatch resultReady = new CountDownLatch(1);

@Override
public void onNext(A_RES_T object) {
onResult(object);
}

private void onResult(A_RES_T object) {
if (resultReady.getCount() == 0) {
throw new IllegalStateException("Result has already been set");
}

try {
result = object;
} finally {
resultReady.countDown();
}
}

@Override
public void close() throws IOException {
try {
super.close();
} finally {
resultReady.countDown();
}
}

/**
* Blocks until {@link ResultCallback#onNext(Object)} was called for the first time
*/
@SuppressWarnings("unchecked")
public A_RES_T awaitResult() {
try {
resultReady.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
getFirstError();
return result;
}
}

private ChannelProvider channelProvider;

private String resource;
Expand Down Expand Up @@ -203,7 +255,7 @@ public InputStream post(final Object entity) {

Channel channel = getChannel();

ResponseCallback<InputStream> callback = new ResponseCallback<InputStream>();
AsyncResultCallback<InputStream> callback = new AsyncResultCallback<>();

HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, callback);
HttpResponseStreamHandler streamHandler = new HttpResponseStreamHandler(callback);
Expand Down Expand Up @@ -454,7 +506,7 @@ public InputStream get() {

Channel channel = getChannel();

ResponseCallback<InputStream> resultCallback = new ResponseCallback<InputStream>();
AsyncResultCallback<InputStream> resultCallback = new AsyncResultCallback<>();

HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, resultCallback);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.github.dockerjava.api.async.ResultCallback;

Expand All @@ -19,45 +16,87 @@
*/
public class HttpResponseStreamHandler extends SimpleChannelInboundHandler<ByteBuf> {

private HttpResponseInputStream stream = new HttpResponseInputStream();
private ResultCallback<InputStream> resultCallback;

private final HttpResponseInputStream stream = new HttpResponseInputStream();

public HttpResponseStreamHandler(ResultCallback<InputStream> resultCallback) {
resultCallback.onNext(stream);
this.resultCallback = resultCallback;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
invokeCallbackOnFirstRead();

stream.write(msg.copy());
}

private void invokeCallbackOnFirstRead() {
if (resultCallback != null) {
resultCallback.onNext(stream);
resultCallback = null;
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
stream.close();
super.channelReadComplete(ctx);
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
stream.writeComplete();

super.channelInactive(ctx);
}

public static class HttpResponseInputStream extends InputStream {

private AtomicBoolean closed = new AtomicBoolean(false);
private boolean writeCompleted = false;

private LinkedTransferQueue<ByteBuf> queue = new LinkedTransferQueue<ByteBuf>();
private boolean closed = false;

private ByteBuf current = null;

public void write(ByteBuf byteBuf) {
queue.put(byteBuf);
private final Object lock = new Object();

public void write(ByteBuf byteBuf) throws InterruptedException {
synchronized (lock) {
if (closed) {
return;
}
while (current != null) {
lock.wait();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This blocks the call to write until the current buffer is read completely, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. Or until this HttpResponseInputStream is closed.


if (closed) {
return;
}
}
current = byteBuf;

lock.notifyAll();
}
}

public void writeComplete() {
synchronized (lock) {
writeCompleted = true;

lock.notifyAll();
}
}

@Override
public void close() throws IOException {
closed.set(true);
super.close();
synchronized (lock) {
closed = true;
releaseCurrent();

lock.notifyAll();
}
}

@Override
public int available() throws IOException {
poll();
return readableBytes();
synchronized (lock) {
poll(0);
return readableBytes();
}
}

private int readableBytes() {
Expand All @@ -66,34 +105,72 @@ private int readableBytes() {
} else {
return 0;
}

}

@Override
public int read() throws IOException {
byte[] b = new byte[1];
int n = read(b, 0, 1);
return n != -1 ? b[0] : -1;
}

poll();
@Override
public int read(byte[] b, int off, int len) throws IOException {
synchronized (lock) {
off = poll(off);

if (readableBytes() == 0) {
if (closed.get()) {
if (current == null) {
return -1;
} else {
int availableBytes = Math.min(len, current.readableBytes() - off);
current.readBytes(b, off, availableBytes);
return availableBytes;
}
}
}

if (current != null && current.readableBytes() > 0) {
return current.readByte() & 0xff;
} else {
return read();
private int poll(int off) throws IOException {
synchronized (lock) {
while (readableBytes() <= off) {
try {
if (closed) {
throw new IOException("Stream closed");
}

off -= releaseCurrent();
if (writeCompleted) {
return off;
}
while (current == null) {
lock.wait();

if (closed) {
throw new IOException("Stream closed");
}
if (writeCompleted && current == null) {
return off;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return off;
}
}

private void poll() {
if (readableBytes() == 0) {
try {
current = queue.poll(50, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
private int releaseCurrent() {
synchronized (lock) {
if (current != null) {
int n = current.readableBytes();
current.release();
current = null;

lock.notifyAll();

return n;
}
return 0;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public void afterMethod(ITestResult result) {
@Test
public void saveImage() throws Exception {

InputStream image = IOUtils.toBufferedInputStream(dockerClient.saveImageCmd("busybox").exec());
assertThat(image.available(), greaterThan(0));
try (InputStream image = dockerClient.saveImageCmd("busybox").exec()) {
assertThat(image.available(), greaterThan(0));
}

}

Expand Down
Loading