summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/rubygen/cppgen.rb6
-rwxr-xr-xcpp/rubygen/templates/MethodBodyDefaultVisitor.rb34
-rw-r--r--cpp/src/Makefile.am4
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp88
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h65
-rw-r--r--cpp/src/qpid/broker/SessionState.h31
-rw-r--r--cpp/src/qpid/broker/SuspendedSessions.cpp28
-rw-r--r--cpp/src/qpid/framing/FrameDefaultVisitor.h14
-rw-r--r--cpp/src/tests/Session.cpp16
9 files changed, 255 insertions, 31 deletions
diff --git a/cpp/rubygen/cppgen.rb b/cpp/rubygen/cppgen.rb
index d72731075f..0063231e84 100755
--- a/cpp/rubygen/cppgen.rb
+++ b/cpp/rubygen/cppgen.rb
@@ -221,5 +221,11 @@ class CppGen < Generator
def public() outdent { genl "public:" } end
def private() outdent { genl "private:" } end
def protected() outdent { genl "protected:" } end
+
+ # Returns [namespace, classname, filename]
+ def parse_classname(full_cname)
+ names=full_cname.split("::")
+ return names[0..-2].join('::'), names[-1], names.join("/")
+ end
end
diff --git a/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb b/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb
new file mode 100755
index 0000000000..2247c4d8a1
--- /dev/null
+++ b/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb
@@ -0,0 +1,34 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class MethodBodyDefaultVisitorGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace, @classname, @filename = parse_classname("qpid::framing::MethodBodyDefaultVisitor")
+ end
+
+ def generate()
+ h_file(@filename) {
+ include "qpid/framing/MethodBodyConstVisitor"
+ namespace(@namespace) {
+ genl
+ cpp_class(@classname, "public MethodBodyConstVisitor") {
+ genl "public:"
+ genl "virtual void defaultVisit() = 0;"
+ @amqp.methods_.each { |m|
+ genl "virtual void visit(const #{m.body_name}&);" }
+ }}}
+
+ cpp_file(@filename) {
+ include(@filename)
+ namespace(@namespace) {
+ @amqp.methods_.each { |m|
+ genl "void #{@classname}::visit(const #{m.body_name}&) { defaultVisit(); }"
+ }}}
+ end
+end
+
+MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate();
+
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 726bbd12c5..387d4dce91 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -192,9 +192,11 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
qpid/broker/Reference.cpp \
- qpid/broker/Session.h \
+ qpid/broker/SessionState.h \
qpid/broker/SuspendedSessions.h \
qpid/broker/SuspendedSessions.cpp \
+ qpid/broker/SessionAdapter.h \
+ qpid/broker/SessionAdapter.cpp \
qpid/broker/SemanticHandler.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
new file mode 100644
index 0000000000..06f7ca5b27
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SessionAdapter.h"
+
+namespace qpid {
+namespace broker {
+
+SessionAdapter::SessionAdapter() {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+void SessionAdapter::visit(const SessionOpenBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+void SessionAdapter::visit(const SessionAckBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionAttachedBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionCloseBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionClosedBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionDetachedBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionFlowBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionFlowOkBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionHighWaterMarkBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionResumeBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionSolicitAckBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+
+void SessionAdapter::visit(const SessionSuspendBody&) {
+ // FIXME aconway 2007-08-27: Implement
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
new file mode 100644
index 0000000000..9b4a843946
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -0,0 +1,65 @@
+#ifndef QPID_BROKER_SESSIONADAPTER_H
+#define QPID_BROKER_SESSIONADAPTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameDefaultVisitor.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/broker/SuspendedSessions.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Session Handler: Handles frames arriving for a session.
+ * Implements AMQP session class commands, forwards other traffic
+ * to the next handler in the chain.
+ */
+class SessionAdapter : public FrameVisitorHandler
+
+{
+ public:
+ SessionAdapter(FrameHandler& out, SuspendedSessions&);
+ ~SessionAdapter();
+
+ protected:
+ void visit(const SessionAckBody&);
+ void visit(const SessionAttachedBody&);
+ void visit(const SessionCloseBody&);
+ void visit(const SessionClosedBody&);
+ void visit(const SessionDetachedBody&);
+ void visit(const SessionFlowBody&);
+ void visit(const SessionFlowOkBody&);
+ void visit(const SessionHighWaterMarkBody&);
+ void visit(const SessionOpenBody&);
+ void visit(const SessionResumeBody&);
+ void visit(const SessionSolicitAckBody&);
+ void visit(const SessionSuspendBody&);
+
+ private:
+ SessionState state;
+ SuspendedSessions& suspended;
+ Chain next;
+ FrameHandler& out;
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_SESSIONADAPTER_H*/
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 30f071ca55..7558ea7866 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,20 +33,33 @@ namespace broker {
class SessionState
{
public:
- /** Create a new, active session. */
- SessionState(uint32_t timeoutSeconds)
- : id(true), active(true), timeout(timeoutSeconds) {}
+ enum State { CLOSED, ACTIVE, SUSPENDED };
+
+ /** Initially in CLOSED state */
+ SessionState() : id(false), state(CLOSED), timeout(0) {}
+ /** Make CLOSED session ACTIVE, assigns a new UUID.
+ * #@param timeout in seconds
+ */
+ void open(u_int32_t timeout_) {
+ state=ACTIVE; id.generate(); timeout=timeout_;
+ }
+
+ /** Close a session. */
+ void close() { state=CLOSED; id.clear(); timeout=0; }
+
+ State getState() const { return state; }
const framing::Uuid& getId() const { return id; }
uint32_t getTimeout() const { return timeout; }
- /** Call SuspendedSessions::resume to re-activate a suspended session. */
- bool isActive() const { return active; }
-
+ bool isOpen() { return state == ACTIVE; }
+ bool isClosed() { return state == CLOSED; }
+ bool isSuspended() { return state == SUSPENDED; }
+
private:
friend class SuspendedSessions;
framing::Uuid id;
- bool active;
+ State state;
uint32_t timeout;
};
diff --git a/cpp/src/qpid/broker/SuspendedSessions.cpp b/cpp/src/qpid/broker/SuspendedSessions.cpp
index cc90f04809..1cd0710f1e 100644
--- a/cpp/src/qpid/broker/SuspendedSessions.cpp
+++ b/cpp/src/qpid/broker/SuspendedSessions.cpp
@@ -31,25 +31,27 @@ typedef Mutex::ScopedLock Lock;
void SuspendedSessions::suspend(SessionState& s) {
Lock l(lock);
- assert(s.isActive());
- AbsTime expires(now(), Duration(s.timeout*TIME_SEC));
- suspended.insert(std::make_pair(expires, s));
- s.active = false;
+ assert(s.state == SessionState::ACTIVE);
+ if (s.timeout == 0)
+ s.state = SessionState::CLOSED;
+ else {
+ AbsTime expires(now(), Duration(s.timeout*TIME_SEC));
+ suspended.insert(std::make_pair(expires, s));
+ s.state = SessionState::SUSPENDED;
+ }
}
SessionState SuspendedSessions::resume(const Uuid& id)
{
Lock l(lock);
- Map::iterator expired = suspended.lower_bound(now());
- suspended.erase(suspended.begin(), expired);
-
- Map::iterator resume = std::find_if(
- suspended.begin(), suspended.end(),
- bind(&SessionState::getId, bind(&Map::value_type::second, _1))==id);
-
- if (resume == suspended.end())
+ Map::iterator notExpired = suspended.lower_bound(now());
+ suspended.erase(suspended.begin(), notExpired);
+ Map::iterator i = suspended.begin();
+ while (i != suspended.end() && i->second.getId() != id)
+ ++i;
+ if (i == suspended.end())
throw Exception(QPID_MSG("Session timed out or invalid ID: " << id));
- return resume->second;
+ return i->second;
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/framing/FrameDefaultVisitor.h b/cpp/src/qpid/framing/FrameDefaultVisitor.h
index 84606b90e7..a66adf28c4 100644
--- a/cpp/src/qpid/framing/FrameDefaultVisitor.h
+++ b/cpp/src/qpid/framing/FrameDefaultVisitor.h
@@ -23,6 +23,7 @@
#include "qpid/framing/MethodBodyDefaultVisitor.h"
#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/FrameHandler.h"
namespace qpid {
namespace framing {
@@ -33,8 +34,8 @@ class AMQContentBody;
class AMQHeartbeatBody;
/**
- * Functor to handle a frame by visiting its content.
- * Combines AMQBodyConstVisitor and MethodBodyDefaultVisitor.
+ * Visitor for all concrete frame body types, combines
+ * AMQBodyConstVisitor and MethodBodyDefaultVisitor.
*
* Derived classes may override visit methods to specify actions.
* Derived classes must override defaultVisit(), which is called
@@ -49,6 +50,15 @@ struct FrameDefaultVisitor : public AMQBodyConstVisitor, public MethodBodyDefaul
void visit(const AMQMethodBody& method) { method.accept(*this); }
};
+/**
+ * A FrameHandler that is implemented as a visitor.
+ */
+struct FrameVisitorHandler : public FrameHandler,
+ protected FrameVisitorHandler
+{
+ void handle(AMQFrame& f) { f.accept(*this); }
+};
+
diff --git a/cpp/src/tests/Session.cpp b/cpp/src/tests/Session.cpp
index 445f612111..e33ec6e50c 100644
--- a/cpp/src/tests/Session.cpp
+++ b/cpp/src/tests/Session.cpp
@@ -29,24 +29,28 @@ using namespace qpid::sys;
BOOST_AUTO_TEST_CASE(testSuspendedSessions) {
SuspendedSessions suspended;
- SessionState s(0); // 0 timeout
- BOOST_CHECK(s.isActive());
+ SessionState s;
+ BOOST_CHECK_EQUAL(s.getState(), SessionState::CLOSED);
+ s.open(0);
+ BOOST_CHECK_EQUAL(s.getState(), SessionState::ACTIVE);
+ BOOST_CHECK(!s.getId().empty());
suspended.suspend(s);
- BOOST_CHECK(!s.isActive());
+ BOOST_CHECK(s.getState() == SessionState::CLOSED);
try {
s = suspended.resume(s.getId());
BOOST_FAIL("Expected session to be timed out.");
} catch (...) {}
- s = SessionState(1); // New session, 1 sec timeout.
+ s.close();
+ s.open(1); // New session, 1 sec timeout.
try {
suspended.resume(s.getId());
BOOST_FAIL("Expeced exception: non-existent session.");
} catch (...) {}
suspended.suspend(s);
- BOOST_CHECK(!s.isActive());
+ BOOST_CHECK(s.getState() == SessionState::SUSPENDED);
s = suspended.resume(s.getId());
- BOOST_CHECK(s.isActive());
+ BOOST_CHECK(s.getState() == SessionState::ACTIVE);
suspended.suspend(s); // Real timeout
sleep(2);