From 26591e1041f2ab3b759529058e433f899212ea37 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 19 Oct 2012 21:02:03 +0000 Subject: QPID-4382 : Wait for subsequent frames to arrive in synchronous receive git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1400287 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/amqp_1_0/client/Connection.java | 115 +++------------------ .../org/apache/qpid/amqp_1_0/client/Receiver.java | 4 +- 2 files changed, 14 insertions(+), 105 deletions(-) (limited to 'java') diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index e3d56fae09..e501662dbb 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -20,6 +20,15 @@ */ package org.apache.qpid.amqp_1_0.client; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.net.ssl.SSLSocketFactory; import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; import org.apache.qpid.amqp_1_0.transport.AMQPTransport; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; @@ -30,17 +39,6 @@ import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.logging.Level; -import java.util.logging.Logger; - public class Connection { private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); @@ -224,7 +222,6 @@ public class Connection } - //ConnectionHandler.OutputHandler outputHandler = new ConnectionHandler.OutputHandler(outputStream, out, _conn.getDescribedTypeRegistry()); ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn); Thread outputThread = new Thread(outputHandler); outputThread.setDaemon(true); @@ -236,8 +233,6 @@ public class Connection final ConnectionHandler handler = new ConnectionHandler(_conn); final InputStream inputStream = s.getInputStream(); - //final AMQPTransport transport = new AMQPTransport(new AMQPFrameTransport(_conn)); - Thread inputThread = new Thread(new Runnable() { @@ -246,7 +241,6 @@ public class Connection try { doRead(handler, inputStream); -// doRead(transport, inputStream); } finally { @@ -268,85 +262,6 @@ public class Connection inputThread.setDaemon(true); inputThread.start(); -/* - Thread outputThread = new Thread(new Runnable() - { - - private int _lastWrite; - - public void run() - { - try - { -// doRead(handler, inputStream); - final Object lock = new Object(); - transport.setOutputStateChangeListener(new StateChangeListener() - { - - public void onStateChange(final boolean active) - { - synchronized (lock) - { - lock.notifyAll(); - } - } - }); - - synchronized(lock) - { - while(transport.isOpenForOutput()) - { - _lastWrite = 0; - transport.getNextBytes(new BytesProcessor() - { - - public void processBytes(final ByteBuffer buf) - { - _lastWrite = buf.remaining(); - try - { - outputStream.write(buf.array(), - buf.arrayOffset() + buf.position(), - buf.limit() - buf.position()); - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - }); - if(_lastWrite == 0 && transport.isOpenForOutput()) - { - try - { - lock.wait(1000); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - } - finally - { - if(_conn.closedForInput() && _conn.closedForOutput()) - { - try - { - s.close(); - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - }); -*/ - _conn.open(); } @@ -394,7 +309,7 @@ public class Connection } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } } @@ -419,7 +334,7 @@ public class Connection { int read; boolean done = false; - while(!done && (read = inputStream.read(buf)) != -1) + while(!handler.isDone() && (read = inputStream.read(buf)) != -1) { ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); Binary b = new Binary(buf,0,read); @@ -428,12 +343,6 @@ public class Connection { RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString()); } - /*System.err.println(b); - System.err.println("XXX: " + bbuf.hasRemaining() + "; " + handler.isDone()); - if(handler.isDone()) - { - System.err.println(handler.getClass().getName() + "IS DONE!"); - } */ while(bbuf.hasRemaining() && !handler.isDone()) { handler.parse(bbuf); @@ -444,7 +353,7 @@ public class Connection } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } } diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index ad390fd498..581744778e 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -241,7 +241,7 @@ public class Receiver implements DeliveryStateHandler } if(hasMore) { - xfr = receiveFromPrefetch(0L); + xfr = receiveFromPrefetch(-1l); if(xfr== null) { // TODO - this is wrong!!!! @@ -558,4 +558,4 @@ public class Receiver implements DeliveryStateHandler void messageArrived(Receiver receiver); } -} \ No newline at end of file +} -- cgit v1.2.1