summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-02-05 06:41:54 +0000
committerRafael H. Schloming <rhs@apache.org>2009-02-05 06:41:54 +0000
commit755ee6ed4196858f266e40b13030048af0c50c6b (patch)
tree12e742dcd69a001b729f412296038fd0ce4449e4
parentf3c08d2ea46e5fbb94e6117f67a874a0e3328fd2 (diff)
downloadqpid-python-755ee6ed4196858f266e40b13030048af0c50c6b.tar.gz
QPID-1646: implemented handlers for producer flow control signals
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@741024 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java46
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java38
3 files changed, 92 insertions, 4 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 276d534b14..993da168a9 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -22,8 +22,9 @@ package org.apache.qpid.transport;
import static org.apache.qpid.transport.Connection.State.OPEN;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -63,11 +64,14 @@ public class ClientDelegate extends ConnectionDelegate
@Override public void connectionStart(Connection conn, ConnectionStart start)
{
+ Map clientProperties = new HashMap();
+ clientProperties.put("qpid.session_flow", 1);
+
List<Object> mechanisms = start.getMechanisms();
if (mechanisms == null || mechanisms.isEmpty())
{
conn.connectionStartOk
- (Collections.EMPTY_MAP, null, null, conn.getLocale());
+ (clientProperties, null, null, conn.getLocale());
return;
}
@@ -86,7 +90,7 @@ public class ClientDelegate extends ConnectionDelegate
byte[] response = sc.hasInitialResponse() ?
sc.evaluateChallenge(new byte[0]) : null;
conn.connectionStartOk
- (Collections.EMPTY_MAP, sc.getMechanismName(), response,
+ (clientProperties, sc.getMechanismName(), response,
conn.getLocale());
}
catch (SaslException e)
@@ -132,7 +136,7 @@ public class ClientDelegate extends ConnectionDelegate
{
throw new UnsupportedOperationException();
}
-
+
/**
* Currently the spec specified the min and max for heartbeat using secs
*/
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 32bb9ca612..951370a124 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -33,6 +33,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import static org.apache.qpid.transport.Option.*;
import static org.apache.qpid.transport.Session.State.*;
@@ -99,6 +101,10 @@ public class Session extends SessionInvoker
private State state = NEW;
+ // transfer flow control
+ private volatile boolean flowControl = false;
+ private Semaphore credit = new Semaphore(0);
+
Session(Connection connection, Binary name, long expiry)
{
this.connection = connection;
@@ -166,6 +172,41 @@ public class Session extends SessionInvoker
}
}
+ void setFlowControl(boolean value)
+ {
+ flowControl = value;
+ }
+
+ void addCredit(int value)
+ {
+ credit.release(value);
+ }
+
+ void drainCredit()
+ {
+ credit.drainPermits();
+ }
+
+ void acquireCredit()
+ {
+ if (flowControl)
+ {
+ try
+ {
+ if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS))
+ {
+ throw new SessionException
+ ("timed out waiting for message credit");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new SessionException
+ ("interrupted while waiting for credit", null, e);
+ }
+ }
+ }
+
private void initReceiver()
{
synchronized (processedLock)
@@ -428,6 +469,11 @@ public class Session extends SessionInvoker
{
if (m.getEncodedTrack() == Frame.L4)
{
+ if (m.hasPayload())
+ {
+ acquireCredit();
+ }
+
synchronized (commands)
{
if (state != OPEN && state != CLOSED)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index b2c22f22e9..c8d0855607 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -146,4 +146,42 @@ public class SessionDelegate
ssn.getSessionListener().message(ssn, xfr);
}
+ @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm)
+ {
+ if ("".equals(sfm.getDestination()) &&
+ MessageFlowMode.CREDIT.equals(sfm.getFlowMode()))
+ {
+ ssn.setFlowControl(true);
+ }
+ else
+ {
+ super.messageSetFlowMode(ssn, sfm);
+ }
+ }
+
+ @Override public void messageFlow(Session ssn, MessageFlow flow)
+ {
+ if ("".equals(flow.getDestination()) &&
+ MessageCreditUnit.MESSAGE.equals(flow.getUnit()))
+ {
+ ssn.addCredit((int) flow.getValue());
+ }
+ else
+ {
+ super.messageFlow(ssn, flow);
+ }
+ }
+
+ @Override public void messageStop(Session ssn, MessageStop stop)
+ {
+ if ("".equals(stop.getDestination()))
+ {
+ ssn.drainCredit();
+ }
+ else
+ {
+ super.messageStop(ssn, stop);
+ }
+ }
+
}