diff options
Diffstat (limited to 'cpp')
-rwxr-xr-x | cpp/rubygen/cppgen.rb | 6 | ||||
-rwxr-xr-x | cpp/rubygen/templates/MethodBodyDefaultVisitor.rb | 34 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 88 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 65 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 31 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SuspendedSessions.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameDefaultVisitor.h | 14 | ||||
-rw-r--r-- | cpp/src/tests/Session.cpp | 16 |
9 files changed, 255 insertions, 31 deletions
diff --git a/cpp/rubygen/cppgen.rb b/cpp/rubygen/cppgen.rb index d72731075f..0063231e84 100755 --- a/cpp/rubygen/cppgen.rb +++ b/cpp/rubygen/cppgen.rb @@ -221,5 +221,11 @@ class CppGen < Generator def public() outdent { genl "public:" } end def private() outdent { genl "private:" } end def protected() outdent { genl "protected:" } end + + # Returns [namespace, classname, filename] + def parse_classname(full_cname) + names=full_cname.split("::") + return names[0..-2].join('::'), names[-1], names.join("/") + end end diff --git a/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb b/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb new file mode 100755 index 0000000000..2247c4d8a1 --- /dev/null +++ b/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb @@ -0,0 +1,34 @@ +#!/usr/bin/env ruby +$: << ".." # Include .. in load path +require 'cppgen' + +class MethodBodyDefaultVisitorGen < CppGen + + def initialize(outdir, amqp) + super(outdir, amqp) + @namespace, @classname, @filename = parse_classname("qpid::framing::MethodBodyDefaultVisitor") + end + + def generate() + h_file(@filename) { + include "qpid/framing/MethodBodyConstVisitor" + namespace(@namespace) { + genl + cpp_class(@classname, "public MethodBodyConstVisitor") { + genl "public:" + genl "virtual void defaultVisit() = 0;" + @amqp.methods_.each { |m| + genl "virtual void visit(const #{m.body_name}&);" } + }}} + + cpp_file(@filename) { + include(@filename) + namespace(@namespace) { + @amqp.methods_.each { |m| + genl "void #{@classname}::visit(const #{m.body_name}&) { defaultVisit(); }" + }}} + end +end + +MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate(); + diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 726bbd12c5..387d4dce91 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -192,9 +192,11 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ qpid/broker/Reference.cpp \ - qpid/broker/Session.h \ + qpid/broker/SessionState.h \ qpid/broker/SuspendedSessions.h \ qpid/broker/SuspendedSessions.cpp \ + qpid/broker/SessionAdapter.h \ + qpid/broker/SessionAdapter.cpp \ qpid/broker/SemanticHandler.cpp \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp new file mode 100644 index 0000000000..06f7ca5b27 --- /dev/null +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionAdapter.h" + +namespace qpid { +namespace broker { + +SessionAdapter::SessionAdapter() { + // FIXME aconway 2007-08-27: Implement +} + +void SessionAdapter::visit(const SessionOpenBody&) { + // FIXME aconway 2007-08-27: Implement +} + +void SessionAdapter::visit(const SessionAckBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionAttachedBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionCloseBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionClosedBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionDetachedBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionFlowBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionFlowOkBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionHighWaterMarkBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionResumeBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionSolicitAckBody&) { + // FIXME aconway 2007-08-27: Implement +} + + +void SessionAdapter::visit(const SessionSuspendBody&) { + // FIXME aconway 2007-08-27: Implement +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h new file mode 100644 index 0000000000..9b4a843946 --- /dev/null +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -0,0 +1,65 @@ +#ifndef QPID_BROKER_SESSIONADAPTER_H +#define QPID_BROKER_SESSIONADAPTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameDefaultVisitor.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/broker/SuspendedSessions.h" + +namespace qpid { +namespace broker { + +/** + * Session Handler: Handles frames arriving for a session. + * Implements AMQP session class commands, forwards other traffic + * to the next handler in the chain. + */ +class SessionAdapter : public FrameVisitorHandler + +{ + public: + SessionAdapter(FrameHandler& out, SuspendedSessions&); + ~SessionAdapter(); + + protected: + void visit(const SessionAckBody&); + void visit(const SessionAttachedBody&); + void visit(const SessionCloseBody&); + void visit(const SessionClosedBody&); + void visit(const SessionDetachedBody&); + void visit(const SessionFlowBody&); + void visit(const SessionFlowOkBody&); + void visit(const SessionHighWaterMarkBody&); + void visit(const SessionOpenBody&); + void visit(const SessionResumeBody&); + void visit(const SessionSolicitAckBody&); + void visit(const SessionSuspendBody&); + + private: + SessionState state; + SuspendedSessions& suspended; + Chain next; + FrameHandler& out; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_SESSIONADAPTER_H*/ diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 30f071ca55..7558ea7866 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,20 +33,33 @@ namespace broker { class SessionState { public: - /** Create a new, active session. */ - SessionState(uint32_t timeoutSeconds) - : id(true), active(true), timeout(timeoutSeconds) {} + enum State { CLOSED, ACTIVE, SUSPENDED }; + + /** Initially in CLOSED state */ + SessionState() : id(false), state(CLOSED), timeout(0) {} + /** Make CLOSED session ACTIVE, assigns a new UUID. + * #@param timeout in seconds + */ + void open(u_int32_t timeout_) { + state=ACTIVE; id.generate(); timeout=timeout_; + } + + /** Close a session. */ + void close() { state=CLOSED; id.clear(); timeout=0; } + + State getState() const { return state; } const framing::Uuid& getId() const { return id; } uint32_t getTimeout() const { return timeout; } - /** Call SuspendedSessions::resume to re-activate a suspended session. */ - bool isActive() const { return active; } - + bool isOpen() { return state == ACTIVE; } + bool isClosed() { return state == CLOSED; } + bool isSuspended() { return state == SUSPENDED; } + private: friend class SuspendedSessions; framing::Uuid id; - bool active; + State state; uint32_t timeout; }; diff --git a/cpp/src/qpid/broker/SuspendedSessions.cpp b/cpp/src/qpid/broker/SuspendedSessions.cpp index cc90f04809..1cd0710f1e 100644 --- a/cpp/src/qpid/broker/SuspendedSessions.cpp +++ b/cpp/src/qpid/broker/SuspendedSessions.cpp @@ -31,25 +31,27 @@ typedef Mutex::ScopedLock Lock; void SuspendedSessions::suspend(SessionState& s) { Lock l(lock); - assert(s.isActive()); - AbsTime expires(now(), Duration(s.timeout*TIME_SEC)); - suspended.insert(std::make_pair(expires, s)); - s.active = false; + assert(s.state == SessionState::ACTIVE); + if (s.timeout == 0) + s.state = SessionState::CLOSED; + else { + AbsTime expires(now(), Duration(s.timeout*TIME_SEC)); + suspended.insert(std::make_pair(expires, s)); + s.state = SessionState::SUSPENDED; + } } SessionState SuspendedSessions::resume(const Uuid& id) { Lock l(lock); - Map::iterator expired = suspended.lower_bound(now()); - suspended.erase(suspended.begin(), expired); - - Map::iterator resume = std::find_if( - suspended.begin(), suspended.end(), - bind(&SessionState::getId, bind(&Map::value_type::second, _1))==id); - - if (resume == suspended.end()) + Map::iterator notExpired = suspended.lower_bound(now()); + suspended.erase(suspended.begin(), notExpired); + Map::iterator i = suspended.begin(); + while (i != suspended.end() && i->second.getId() != id) + ++i; + if (i == suspended.end()) throw Exception(QPID_MSG("Session timed out or invalid ID: " << id)); - return resume->second; + return i->second; } }} // namespace qpid::broker diff --git a/cpp/src/qpid/framing/FrameDefaultVisitor.h b/cpp/src/qpid/framing/FrameDefaultVisitor.h index 84606b90e7..a66adf28c4 100644 --- a/cpp/src/qpid/framing/FrameDefaultVisitor.h +++ b/cpp/src/qpid/framing/FrameDefaultVisitor.h @@ -23,6 +23,7 @@ #include "qpid/framing/MethodBodyDefaultVisitor.h" #include "qpid/framing/AMQBody.h" +#include "qpid/framing/FrameHandler.h" namespace qpid { namespace framing { @@ -33,8 +34,8 @@ class AMQContentBody; class AMQHeartbeatBody; /** - * Functor to handle a frame by visiting its content. - * Combines AMQBodyConstVisitor and MethodBodyDefaultVisitor. + * Visitor for all concrete frame body types, combines + * AMQBodyConstVisitor and MethodBodyDefaultVisitor. * * Derived classes may override visit methods to specify actions. * Derived classes must override defaultVisit(), which is called @@ -49,6 +50,15 @@ struct FrameDefaultVisitor : public AMQBodyConstVisitor, public MethodBodyDefaul void visit(const AMQMethodBody& method) { method.accept(*this); } }; +/** + * A FrameHandler that is implemented as a visitor. + */ +struct FrameVisitorHandler : public FrameHandler, + protected FrameVisitorHandler +{ + void handle(AMQFrame& f) { f.accept(*this); } +}; + diff --git a/cpp/src/tests/Session.cpp b/cpp/src/tests/Session.cpp index 445f612111..e33ec6e50c 100644 --- a/cpp/src/tests/Session.cpp +++ b/cpp/src/tests/Session.cpp @@ -29,24 +29,28 @@ using namespace qpid::sys; BOOST_AUTO_TEST_CASE(testSuspendedSessions) { SuspendedSessions suspended; - SessionState s(0); // 0 timeout - BOOST_CHECK(s.isActive()); + SessionState s; + BOOST_CHECK_EQUAL(s.getState(), SessionState::CLOSED); + s.open(0); + BOOST_CHECK_EQUAL(s.getState(), SessionState::ACTIVE); + BOOST_CHECK(!s.getId().empty()); suspended.suspend(s); - BOOST_CHECK(!s.isActive()); + BOOST_CHECK(s.getState() == SessionState::CLOSED); try { s = suspended.resume(s.getId()); BOOST_FAIL("Expected session to be timed out."); } catch (...) {} - s = SessionState(1); // New session, 1 sec timeout. + s.close(); + s.open(1); // New session, 1 sec timeout. try { suspended.resume(s.getId()); BOOST_FAIL("Expeced exception: non-existent session."); } catch (...) {} suspended.suspend(s); - BOOST_CHECK(!s.isActive()); + BOOST_CHECK(s.getState() == SessionState::SUSPENDED); s = suspended.resume(s.getId()); - BOOST_CHECK(s.isActive()); + BOOST_CHECK(s.getState() == SessionState::ACTIVE); suspended.suspend(s); // Real timeout sleep(2); |