diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-02-05 06:41:54 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-02-05 06:41:54 +0000 |
commit | 755ee6ed4196858f266e40b13030048af0c50c6b (patch) | |
tree | 12e742dcd69a001b729f412296038fd0ce4449e4 | |
parent | f3c08d2ea46e5fbb94e6117f67a874a0e3328fd2 (diff) | |
download | qpid-python-755ee6ed4196858f266e40b13030048af0c50c6b.tar.gz |
QPID-1646: implemented handlers for producer flow control signals
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@741024 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 92 insertions, 4 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 276d534b14..993da168a9 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -22,8 +22,9 @@ package org.apache.qpid.transport; import static org.apache.qpid.transport.Connection.State.OPEN; -import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -63,11 +64,14 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionStart(Connection conn, ConnectionStart start) { + Map clientProperties = new HashMap(); + clientProperties.put("qpid.session_flow", 1); + List<Object> mechanisms = start.getMechanisms(); if (mechanisms == null || mechanisms.isEmpty()) { conn.connectionStartOk - (Collections.EMPTY_MAP, null, null, conn.getLocale()); + (clientProperties, null, null, conn.getLocale()); return; } @@ -86,7 +90,7 @@ public class ClientDelegate extends ConnectionDelegate byte[] response = sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null; conn.connectionStartOk - (Collections.EMPTY_MAP, sc.getMechanismName(), response, + (clientProperties, sc.getMechanismName(), response, conn.getLocale()); } catch (SaslException e) @@ -132,7 +136,7 @@ public class ClientDelegate extends ConnectionDelegate { throw new UnsupportedOperationException(); } - + /** * Currently the spec specified the min and max for heartbeat using secs */ diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 32bb9ca612..951370a124 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -33,6 +33,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import static org.apache.qpid.transport.Option.*; import static org.apache.qpid.transport.Session.State.*; @@ -99,6 +101,10 @@ public class Session extends SessionInvoker private State state = NEW; + // transfer flow control + private volatile boolean flowControl = false; + private Semaphore credit = new Semaphore(0); + Session(Connection connection, Binary name, long expiry) { this.connection = connection; @@ -166,6 +172,41 @@ public class Session extends SessionInvoker } } + void setFlowControl(boolean value) + { + flowControl = value; + } + + void addCredit(int value) + { + credit.release(value); + } + + void drainCredit() + { + credit.drainPermits(); + } + + void acquireCredit() + { + if (flowControl) + { + try + { + if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS)) + { + throw new SessionException + ("timed out waiting for message credit"); + } + } + catch (InterruptedException e) + { + throw new SessionException + ("interrupted while waiting for credit", null, e); + } + } + } + private void initReceiver() { synchronized (processedLock) @@ -428,6 +469,11 @@ public class Session extends SessionInvoker { if (m.getEncodedTrack() == Frame.L4) { + if (m.hasPayload()) + { + acquireCredit(); + } + synchronized (commands) { if (state != OPEN && state != CLOSED) diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index b2c22f22e9..c8d0855607 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -146,4 +146,42 @@ public class SessionDelegate ssn.getSessionListener().message(ssn, xfr); } + @Override public void messageSetFlowMode(Session ssn, MessageSetFlowMode sfm) + { + if ("".equals(sfm.getDestination()) && + MessageFlowMode.CREDIT.equals(sfm.getFlowMode())) + { + ssn.setFlowControl(true); + } + else + { + super.messageSetFlowMode(ssn, sfm); + } + } + + @Override public void messageFlow(Session ssn, MessageFlow flow) + { + if ("".equals(flow.getDestination()) && + MessageCreditUnit.MESSAGE.equals(flow.getUnit())) + { + ssn.addCredit((int) flow.getValue()); + } + else + { + super.messageFlow(ssn, flow); + } + } + + @Override public void messageStop(Session ssn, MessageStop stop) + { + if ("".equals(stop.getDestination())) + { + ssn.drainCredit(); + } + else + { + super.messageStop(ssn, stop); + } + } + } |