diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-10-19 21:02:03 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-10-19 21:02:03 +0000 |
| commit | 26591e1041f2ab3b759529058e433f899212ea37 (patch) | |
| tree | 927891aa187a8a4817c10d7613eaf9d98e64b612 /java | |
| parent | 2590a21c0b3d4b89e9022b005099f0e0c81ec5ee (diff) | |
| download | qpid-python-26591e1041f2ab3b759529058e433f899212ea37.tar.gz | |
QPID-4382 : Wait for subsequent frames to arrive in synchronous receive
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1400287 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java | 115 | ||||
| -rw-r--r-- | java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java | 4 |
2 files changed, 14 insertions, 105 deletions
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index e3d56fae09..e501662dbb 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -20,6 +20,15 @@ */
package org.apache.qpid.amqp_1_0.client;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.net.ssl.SSLSocketFactory;
import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -30,17 +39,6 @@ import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
public class Connection
{
private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
@@ -224,7 +222,6 @@ public class Connection }
- //ConnectionHandler.OutputHandler outputHandler = new ConnectionHandler.OutputHandler(outputStream, out, _conn.getDescribedTypeRegistry());
ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
Thread outputThread = new Thread(outputHandler);
outputThread.setDaemon(true);
@@ -236,8 +233,6 @@ public class Connection final ConnectionHandler handler = new ConnectionHandler(_conn);
final InputStream inputStream = s.getInputStream();
- //final AMQPTransport transport = new AMQPTransport(new AMQPFrameTransport(_conn));
-
Thread inputThread = new Thread(new Runnable()
{
@@ -246,7 +241,6 @@ public class Connection try
{
doRead(handler, inputStream);
-// doRead(transport, inputStream);
}
finally
{
@@ -268,85 +262,6 @@ public class Connection inputThread.setDaemon(true);
inputThread.start();
-/*
- Thread outputThread = new Thread(new Runnable()
- {
-
- private int _lastWrite;
-
- public void run()
- {
- try
- {
-// doRead(handler, inputStream);
- final Object lock = new Object();
- transport.setOutputStateChangeListener(new StateChangeListener()
- {
-
- public void onStateChange(final boolean active)
- {
- synchronized (lock)
- {
- lock.notifyAll();
- }
- }
- });
-
- synchronized(lock)
- {
- while(transport.isOpenForOutput())
- {
- _lastWrite = 0;
- transport.getNextBytes(new BytesProcessor()
- {
-
- public void processBytes(final ByteBuffer buf)
- {
- _lastWrite = buf.remaining();
- try
- {
- outputStream.write(buf.array(),
- buf.arrayOffset() + buf.position(),
- buf.limit() - buf.position());
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- });
- if(_lastWrite == 0 && transport.isOpenForOutput())
- {
- try
- {
- lock.wait(1000);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- }
- finally
- {
- if(_conn.closedForInput() && _conn.closedForOutput())
- {
- try
- {
- s.close();
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- });
-*/
-
_conn.open();
}
@@ -394,7 +309,7 @@ public class Connection }
catch (IOException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
}
@@ -419,7 +334,7 @@ public class Connection {
int read;
boolean done = false;
- while(!done && (read = inputStream.read(buf)) != -1)
+ while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
{
ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
Binary b = new Binary(buf,0,read);
@@ -428,12 +343,6 @@ public class Connection {
RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
}
- /*System.err.println(b);
- System.err.println("XXX: " + bbuf.hasRemaining() + "; " + handler.isDone());
- if(handler.isDone())
- {
- System.err.println(handler.getClass().getName() + "IS DONE!");
- } */
while(bbuf.hasRemaining() && !handler.isDone())
{
handler.parse(bbuf);
@@ -444,7 +353,7 @@ public class Connection }
catch (IOException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index ad390fd498..581744778e 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -241,7 +241,7 @@ public class Receiver implements DeliveryStateHandler }
if(hasMore)
{
- xfr = receiveFromPrefetch(0L);
+ xfr = receiveFromPrefetch(-1l);
if(xfr== null)
{
// TODO - this is wrong!!!!
@@ -558,4 +558,4 @@ public class Receiver implements DeliveryStateHandler void messageArrived(Receiver receiver);
}
-}
\ No newline at end of file +}
|
