diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
| commit | 0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e (patch) | |
| tree | 8fe7333962fbea735455340424657a540c6ef9a9 /qpid/cpp | |
| parent | c8ad468141a96e5fdf4534552fe72e84399d5d5d (diff) | |
| download | qpid-python-0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e.tar.gz | |
A further step to final 0-10 spec.
The extra.xml fragment adds class defs for connection in session that are in line with latest spec but use old schema.
The preview codepath (99-0) remains unaltered.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@633108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
30 files changed, 1277 insertions, 198 deletions
diff --git a/qpid/cpp/rubygen/99-0/Proxy.rb b/qpid/cpp/rubygen/99-0/Proxy.rb index 2829884673..85db52da5b 100755 --- a/qpid/cpp/rubygen/99-0/Proxy.rb +++ b/qpid/cpp/rubygen/99-0/Proxy.rb @@ -41,6 +41,7 @@ EOS # .h file h_file(@filename) { include "qpid/framing/Proxy.h" + include "qpid/framing/Array.h" include "qpid/framing/amqp_types.h" namespace("qpid::framing") { cpp_class(@classname, "public Proxy") { diff --git a/qpid/cpp/rubygen/99-0/Session.rb b/qpid/cpp/rubygen/99-0/Session.rb index e01a28a62d..5a6f061937 100644 --- a/qpid/cpp/rubygen/99-0/Session.rb +++ b/qpid/cpp/rubygen/99-0/Session.rb @@ -6,7 +6,7 @@ require 'cppgen' class CppGen def session_methods - excludes = ["channel", "connection", "session", "execution"] + excludes = ["channel", "connection", "session", "execution", "connection010", "session010"] gen_methods=@amqp.methods_on(@chassis).reject { |m| excludes.include? m.parent.name } diff --git a/qpid/cpp/rubygen/99-0/structs.rb b/qpid/cpp/rubygen/99-0/structs.rb index 336591be00..4052befa2b 100644 --- a/qpid/cpp/rubygen/99-0/structs.rb +++ b/qpid/cpp/rubygen/99-0/structs.rb @@ -17,6 +17,7 @@ class StructGen < CppGen "longlong"=>"LongLong", "longstr"=>"LongString", "shortstr"=>"ShortString", + "mediumstr"=>"MediumString", "timestamp"=>"LongLong", "table"=>"FieldTable", "content"=>"Content", @@ -33,7 +34,8 @@ class StructGen < CppGen ValueTypes=["octet", "short", "long", "longlong", "timestamp"] def is_packed(s) - s.kind_of? AmqpStruct + #return true + s.kind_of?(AmqpStruct) or s.body_name.include?("010") end def execution_header?(s) @@ -182,12 +184,21 @@ class StructGen < CppGen end end + def all_fields_via_accessors(s) + s.fields.collect { |f| "get#{f.name.caps}()" }.join(", ") + end + def methodbody_extra_defs(s) + if (s.parent.control?) + genl "virtual uint8_t type() const { return 0;/*control segment*/ }" + end + + gen <<EOS typedef #{s.result ? s.result.struct.cpptype.name : 'void'} ResultType; template <class T> ResultType invoke(T& invocable) const { - return invocable.#{s.cppname}(#{s.param_names.join ", "}); + return invocable.#{s.cppname}(#{all_fields_via_accessors(s)}); } using AMQMethodBody::accept; @@ -235,6 +246,14 @@ EOS end def define_packed_field_accessors(s, f, i) + if (s.kind_of? AmqpMethod) + define_packed_field_accessors_for_method(s, f, i) + else + define_packed_field_accessors_for_struct(s, f, i) + end + end + + def define_packed_field_accessors_for_struct(s, f, i) if (f.domain.type_ == "bit") genl "void #{s.cppname}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {" indent { @@ -265,6 +284,37 @@ EOS genl "" end + def define_packed_field_accessors_for_method(s, f, i) + if (f.domain.type_ == "bit") + genl "void #{s.body_name}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {" + indent { + genl "if (_#{f.cppname}) flags |= #{flag_mask(s, i)};" + genl "else flags &= ~#{flag_mask(s, i)};" + } + genl "}" + genl "#{f.cpptype.ret} #{s.body_name}::get#{f.name.caps}() const { return flags & #{flag_mask(s, i)}; }" + else + genl "void #{s.body_name}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {" + indent { + genl "#{f.cppname} = _#{f.cppname};" + genl "flags |= #{flag_mask(s, i)};" + } + genl "}" + genl "#{f.cpptype.ret} #{s.body_name}::get#{f.name.caps}() const { return #{f.cppname}; }" + if (f.cpptype.name == "FieldTable") + genl "#{f.cpptype.name}& #{s.body_name}::get#{f.name.caps}() {" + indent { + genl "flags |= #{flag_mask(s, i)};"#treat the field table as having been 'set' + genl "return #{f.cppname};" + } + genl "}" + end + genl "bool #{s.body_name}::has#{f.name.caps}() const { return flags & #{flag_mask(s, i)}; }" + genl "void #{s.body_name}::clear#{f.name.caps}Flag() { flags &= ~#{flag_mask(s, i)}; }" + end + genl "" + end + def define_packed_accessors(s) process_packed_fields(s) { |f, i| define_packed_field_accessors(s, f, i) } end @@ -383,7 +433,7 @@ EOS EOS } cpp_file("qpid/framing/#{classname}.cpp") { - if (s.fields.size > 0 || execution_header?(s)) + if (is_packed(s) || s.fields.size > 0 || execution_header?(s)) buffer = "buffer" else buffer = "/*buffer*/" diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb index 67b4b1c73c..9e4bb7c22c 100755 --- a/qpid/cpp/rubygen/amqpgen.rb +++ b/qpid/cpp/rubygen/amqpgen.rb @@ -343,6 +343,10 @@ class AmqpClass < AmqpElement !["connection", "session", "execution"].include?(name) end + def control?() + ["connection010", "session010"].include?(name) + end + def actions() controls+commands; end end diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb index df4ba49ca8..8e64b06208 100755 --- a/qpid/cpp/rubygen/cppgen.rb +++ b/qpid/cpp/rubygen/cppgen.rb @@ -142,6 +142,13 @@ class AmqpMethod def param_names() fields.map { |f| f.cppname }; end def signature() fields.map { |f| f.signature }; end def body_name() parent.name.caps+name.caps+"Body"; end + + def cpp_pack_type() # preview + CppType.new("uint16_t").code("Short").defval("0"); + end + def pack() # preview + "short" + end end module AmqpHasFields @@ -182,10 +189,12 @@ class AmqpDomain "timestamp"=>CppType.new("uint64_t").code("LongLong").defval("0"), "longstr"=>CppType.new("string").passcref.retcref.code("LongString"), "shortstr"=>CppType.new("string").passcref.retcref.code("ShortString"), + "mediumstr"=>CppType.new("string").passcref.retcref.code("MediumString"), "table"=>CppType.new("FieldTable").passcref.retcref, "array"=>CppType.new("Array").passcref.retcref, "content"=>CppType.new("Content").passcref.retcref, "rfc1982-long-set"=>CppType.new("SequenceNumberSet").passcref.retcref, + "sequence-set"=>CppType.new("SequenceSet").passcref.retcref, "long-struct"=>CppType.new("string").passcref.retcref.code("LongString"), "uuid"=>CppType.new("Uuid").passcref.retcref } diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 48851085c6..080260be02 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -14,7 +14,7 @@ force: if GENERATE # AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac -amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/cluster.xml +amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/extra.xml $(top_srcdir)/xml/cluster.xml amqp_0_10_xml=@AMQP_FINAL_XML@ specs=$(amqp_99_0_xml) $(amqp_0_10_xml) @@ -130,6 +130,7 @@ libqpidcommon_la_SOURCES = \ qpid/framing/SendContent.cpp \ qpid/framing/SequenceNumber.cpp \ qpid/framing/SequenceNumberSet.cpp \ + qpid/framing/SequenceSet.cpp \ qpid/framing/Proxy.cpp \ qpid/framing/Uuid.cpp \ qpid/framing/AMQP_HighestVersion.h \ @@ -412,6 +413,7 @@ nobase_include_HEADERS = \ qpid/framing/SessionState.h \ qpid/framing/SendContent.h \ qpid/framing/SequenceNumber.h \ + qpid/framing/SequenceSet.h \ qpid/framing/SequenceNumberSet.h \ qpid/framing/SerializeHandler.h \ qpid/framing/StructHelper.h \ diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h index ef2c51bb8d..5237087dc8 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h @@ -85,6 +85,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations ExecutionHandler* getExecutionHandler() { BADHANDLER(); } ConnectionHandler* getConnectionHandler() { BADHANDLER(); } SessionHandler* getSessionHandler() { BADHANDLER(); } + Connection010Handler* getConnection010Handler() { BADHANDLER(); } + Session010Handler* getSession010Handler() { BADHANDLER(); } #undef BADHANDLER private: diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index e296d52214..126e1b2723 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -23,6 +23,7 @@ #include "ConnectionHandler.h" #include "Connection.h" #include "qpid/framing/ConnectionStartBody.h" +#include "qpid/framing/Connection010StartBody.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" @@ -38,11 +39,14 @@ const std::string en_US = "en_US"; } void ConnectionHandler::init(const framing::ProtocolInitiation& header) { + //need to send out a protocol header back to the client + handler->connection.getOutput().initiated(header); + FieldTable properties; string mechanisms(PLAIN); string locales(en_US); - handler->serverMode = true; - handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales); + handler->serverMode = true; + handler->client.start(properties, mechanisms, locales); } void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) @@ -55,7 +59,7 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) AMQMethodBody* method=frame.getBody()->getMethod(); try{ if (handler->serverMode) { - if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method)) + if (!invoke(static_cast<AMQP_ServerOperations::Connection010Handler&>(*handler.get()), *method)) throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); } else { if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method)) diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index 2a581d5675..44e2ce05fa 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -41,10 +41,10 @@ class Connection; // TODO aconway 2007-09-18: Rename to ConnectionHandler class ConnectionHandler : public framing::FrameHandler { - struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, + struct Handler : public framing::AMQP_ServerOperations::Connection010Handler, public framing::AMQP_ClientOperations::ConnectionHandler { - framing::AMQP_ClientProxy::Connection client; + framing::AMQP_ClientProxy::Connection010 client; framing::AMQP_ServerProxy::Connection server; Connection& connection; bool serverMode; @@ -55,6 +55,7 @@ class ConnectionHandler : public framing::FrameHandler const std::string& locale); void secureOk(const std::string& response); void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); + void heartbeat() {} void open(const std::string& virtualHost, const std::string& capabilities, bool insist); void close(uint16_t replyCode, const std::string& replyText, diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 9b44f31e14..e012d693fb 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -387,7 +387,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); + for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -427,16 +427,16 @@ void SemanticState::requestDispatch(ConsumerImpl& c) } } -void SemanticState::acknowledged(const DeliveryRecord& delivery) +void SemanticState::adjustFlow(const DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - get_pointer(i)->acknowledged(delivery); + get_pointer(i)->adjustFlow(delivery); } } -void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery) { if (windowing) { if (msgCredit != 0xFFFFFFFF) msgCredit++; @@ -639,4 +639,33 @@ void SemanticState::ConsumerImpl::notify() parent->outputTasks.activateOutput(); } + +void SemanticState::accepted(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet + + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { + for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(range.start, range.end); + } +} + +void SemanticState::completed(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1)); + requestDispatch(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index cc9c0e1e9b..88a2fcab5c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -88,7 +88,7 @@ class SemanticState : public framing::FrameHandler::Chains, void addMessageCredit(uint32_t value); void flush(); void stop(); - void acknowledged(const DeliveryRecord&); + void adjustFlow(const DeliveryRecord&); Queue::shared_ptr getQueue() { return queue; } bool isBlocked() const { return blocked; } @@ -122,7 +122,7 @@ class SemanticState : public framing::FrameHandler::Chains, void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); - void acknowledged(const DeliveryRecord&); + void adjustFlow(const DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void requestDispatch(ConsumerImpl&); @@ -171,8 +171,6 @@ class SemanticState : public framing::FrameHandler::Chains, void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); - void ackCumulative(DeliveryId deliveryTag); - void ackRange(DeliveryId deliveryTag, DeliveryId endTag); void recover(bool requeue); void flow(bool active); DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); @@ -180,8 +178,15 @@ class SemanticState : public framing::FrameHandler::Chains, void release(DeliveryId first, DeliveryId last); void reject(DeliveryId first, DeliveryId last); void handle(intrusive_ptr<Message> msg); - bool doOutput() { return outputTasks.doOutput(); } + + //preview only (completed == ack): + void ackCumulative(DeliveryId deliveryTag); + void ackRange(DeliveryId deliveryTag, DeliveryId endTag); + + //final 0-10 spec (completed and accepted are distinct): + void completed(DeliveryId deliveryTag, DeliveryId endTag); + void accepted(DeliveryId deliveryTag, DeliveryId endTag); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 0e3c9928d1..de96ae3f12 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -60,17 +60,10 @@ void SessionHandler::handleIn(AMQFrame& f) { AMQMethodBody* m = f.getBody()->getMethod(); try { if (!ignoring) { - if (m && - (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || - invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { return; } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); session->handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; } else { throw ChannelErrorException( QPID_MSG("Channel " << channel.get() << " is not open")); @@ -80,7 +73,8 @@ void SessionHandler::handleIn(AMQFrame& f) { ignoring=true; // Ignore trailing frames sent by client. session->detach(); session.reset(); - peerSession.closed(e.code, e.what()); + //TODO: implement new exception handling mechanism + //peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -92,7 +86,7 @@ void SessionHandler::handleIn(AMQFrame& f) { void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) - peerSession.solicitAck(); + peerSession.flush(false, false, true); } void SessionHandler::assertAttached(const char* method) const { @@ -111,136 +105,123 @@ void SessionHandler::assertClosed(const char* method) const { << " is already open.")); } -void SessionHandler::open(uint32_t detachedLifetime) { - assertClosed("open"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); - peerSession.attached(session->getId(), session->getTimeout()); +void SessionHandler::localSuspend() { + if (session.get() && session->isAttached()) { + session->detach(); + connection.broker.getSessionManager().suspend(session); + session.reset(); + } } -void SessionHandler::resume(const Uuid& id) { - assertClosed("resume"); - session = connection.broker.getSessionManager().resume(id); - session->attach(*this); - SequenceNumber seq = session->resuming(); - peerSession.attached(session->getId(), session->getTimeout()); - proxy.getSession().ack(seq, SequenceNumberSet()); -} -void SessionHandler::flow(bool /*active*/) { - assertAttached("flow"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flow"); +ConnectionState& SessionHandler::getConnection() { return connection; } +const ConnectionState& SessionHandler::getConnection() const { return connection; } + +//new methods: +void SessionHandler::attach(const std::string& name, bool /*force*/) +{ + //TODO: need to revise session manager to support resume as well + assertClosed("attach"); + std::auto_ptr<SessionState> state( + connection.broker.getSessionManager().open(*this, 0)); + session.reset(state.release()); + peerSession.attached(name); } -void SessionHandler::flowOk(bool /*active*/) { - assertAttached("flowOk"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flowOk"); +void SessionHandler::attached(const std::string& /*name*/) +{ + std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0)); + session.reset(state.release()); } -void SessionHandler::close() { - assertAttached("close"); - QPID_LOG(info, "Received session.close"); - ignoring=false; - session->detach(); - session.reset(); - peerSession.closed(REPLY_SUCCESS, "ok"); +void SessionHandler::detach(const std::string& name) +{ + assertAttached("detach"); + localSuspend(); + peerSession.detached(name, 0); assert(&connection.getChannel(channel.get()) == this); connection.closeChannel(channel.get()); } -void SessionHandler::closed(uint16_t replyCode, const string& replyText) { - QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText); +void SessionHandler::detached(const std::string& name, uint8_t code) +{ ignoring=false; session->detach(); session.reset(); -} - -void SessionHandler::localSuspend() { - if (session.get() && session->isAttached()) { - session->detach(); - connection.broker.getSessionManager().suspend(session); - session.reset(); + if (code) { + //no error + } else { + //error occured + QPID_LOG(warning, "Received session.closed: "<< name << " " << code); } } -void SessionHandler::suspend() { - assertAttached("suspend"); - localSuspend(); - peerSession.detached(); - assert(&connection.getChannel(channel.get()) == this); - connection.closeChannel(channel.get()); -} - -void SessionHandler::ack(uint32_t cumulativeSeenMark, - const SequenceNumberSet& /*seenFrameSet*/) +void SessionHandler::requestTimeout(uint32_t t) { - assertAttached("ack"); - if (session->getState() == SessionState::RESUMING) { - session->receivedAck(cumulativeSeenMark); - framing::SessionState::Replay replay=session->replay(); - std::for_each(replay.begin(), replay.end(), - boost::bind(&SessionHandler::handleOut, this, _1)); - } - else - session->receivedAck(cumulativeSeenMark); + session->setTimeout(t); + //proxy.timeout(t); } -void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { - // TODO aconway 2007-10-02: may be removed from spec. - assert(0); throw NotImplementedException("session.high-water-mark"); +void SessionHandler::timeout(uint32_t) +{ + //not sure what we need to do on the server for this... } -void SessionHandler::solicitAck() { - assertAttached("solicit-ack"); - peerSession.ack(session->sendingAck(), SequenceNumberSet()); +void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) +{ + if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); + + session->next = id; } -void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) +void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments) { - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); + if (!commands.empty() || fragments.size()) { + throw NotImplementedException("Session resumption not yet supported"); + } } -void SessionHandler::detached() +void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) { - connection.broker.getSessionManager().suspend(session); - session.reset(); + //don't really care too much about this yet } - -ConnectionState& SessionHandler::getConnection() { return connection; } -const ConnectionState& SessionHandler::getConnection() const { return connection; } - -void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply) { - assertAttached("complete"); - session->complete(cumulative, range); + session->complete(commands); + if (timelyReply) { + peerSession.knownCompleted(session->knownCompleted); + session->knownCompleted.clear(); + } } -void SessionHandler::flush() +void SessionHandler::knownCompleted(const framing::SequenceSet& commands) { - assertAttached("flush"); - session->flush(); + session->completed.remove(commands); } -void SessionHandler::sync() + +void SessionHandler::flush(bool expected, bool confirmed, bool completed) { - assertAttached("sync"); - session->sync(); + if (expected) { + peerSession.expected(SequenceSet(session->next), Array()); + } + if (confirmed) { + peerSession.confirmed(session->completed, Array()); + } + if (completed) { + peerSession.completed(session->completed, true); + } } -void SessionHandler::noop() + +void SessionHandler::sendCompletion() { - assertAttached("noop"); - session->noop(); + peerSession.completed(session->completed, true); } -void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +void SessionHandler::gap(const framing::SequenceSet& /*commands*/) { - //never actually sent by client at present + throw NotImplementedException("gap not yet supported"); } - + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index e6bc463a82..4b031f2951 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -27,8 +27,10 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/Array.h" #include "qpid/framing/ChannelHandler.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceSet.h" #include <boost/noncopyable.hpp> @@ -44,9 +46,7 @@ class SessionState; * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ -class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, - public framing::AMQP_ClientOperations::SessionHandler, - public framing::AMQP_ServerOperations::ExecutionHandler, +class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, public framing::FrameHandler::InOutHandler, private boost::noncopyable { @@ -69,35 +69,32 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, // Called by closing connection. void localSuspend(); void detach() { localSuspend(); } + void sendCompletion(); protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); private: - /// Session methods - void open(uint32_t detachedLifetime); - void flow(bool active); - void flowOk(bool active); - void close(); - void closed(uint16_t replyCode, const std::string& replyText); - void resume(const framing::Uuid& sessionId); - void suspend(); - void ack(uint32_t cumulativeSeenMark, - const framing::SequenceNumberSet& seenFrameSet); - void highWaterMark(uint32_t lastSentMark); - void solicitAck(); - - //extra methods required for assuming client role - void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); - void detached(); - - //Execution methods: - void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); - void flush(); - void noop(); - void result(uint32_t command, const std::string& data); - void sync(); + //new methods: + void attach(const std::string& name, bool force); + void attached(const std::string& name); + void detach(const std::string& name); + void detached(const std::string& name, uint8_t code); + + void requestTimeout(uint32_t t); + void timeout(uint32_t t); + + void commandPoint(const framing::SequenceNumber& id, uint64_t offset); + void expected(const framing::SequenceSet& commands, const framing::Array& fragments); + void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments); + void completed(const framing::SequenceSet& commands, bool timelyReply); + void knownCompleted(const framing::SequenceSet& commands); + void flush(bool expected, bool confirmed, bool completed); + void gap(const framing::SequenceSet& commands); + + //hacks for old generator: + void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); } void assertAttached(const char* method) const; void assertActive(const char* method) const; @@ -106,7 +103,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; - framing::AMQP_ClientProxy::Session peerSession; + framing::AMQP_ClientProxy::Session010 peerSession; bool ignoring; std::auto_ptr<SessionState> session; }; diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 573a567da6..5f04136444 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -49,7 +49,7 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2)) + ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)) { getConnection().outputTasks.addOutputTask(&semanticState); @@ -170,9 +170,9 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, void SessionState::handleCommand(framing::AMQMethodBody* method) { - SequenceNumber id = incoming.next(); + SequenceNumber id = next++; Invoker::Result invocation = invoke(adapter, *method); - incoming.complete(id); + completed.add(id); if (!invocation.wasHandled()) { throw NotImplementedException("Not implemented"); @@ -180,7 +180,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method) getProxy().getExecution().result(id.getValue(), invocation.getResult()); } if (method->isSync()) { - incoming.sync(id); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -190,7 +189,8 @@ void SessionState::handleContent(AMQFrame& frame) { intrusive_ptr<Message> msg(msgBuilder.getMessage()); if (!msg) {//start of frameset will be indicated by frame flags - msgBuilder.start(incoming.next()); + SequenceNumber id = next++; + msgBuilder.start(id); msg = msgBuilder.getMessage(); } msgBuilder.handle(frame); @@ -198,9 +198,9 @@ void SessionState::handleContent(AMQFrame& frame) msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); - incoming.track(msg); + //TODO: may want to hold up execution until async enqueue is complete + completed.add(msg->getCommandId()); if (msg->getFrames().getMethod()->isSync()) { - incoming.sync(msg->getCommandId()); sendCompletion(); } } @@ -208,6 +208,8 @@ void SessionState::handleContent(AMQFrame& frame) void SessionState::handle(AMQFrame& frame) { + received(frame); + //TODO: make command handling more uniform, regardless of whether //commands carry content. (For now, assume all single frame //assmblies are non-content bearing and all content-bearing @@ -229,38 +231,13 @@ DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr t void SessionState::sendCompletion() { - SequenceNumber mark = incoming.getMark(); - SequenceNumberSet range = incoming.getRange(); - getProxy().getExecution().complete(mark.getValue(), range); -} - -void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) -{ - //record: - SequenceNumber mark(cumulative); - if (outgoing.lwm < mark) { - outgoing.lwm = mark; - //ack messages: - semanticState.ackCumulative(mark.getValue()); - } - range.processRanges(ackOp); -} - -void SessionState::flush() -{ - incoming.flush(); - sendCompletion(); -} - -void SessionState::sync() -{ - incoming.sync(); - sendCompletion(); + handler->sendCompletion(); } -void SessionState::noop() +void SessionState::complete(const SequenceSet& commands) { - incoming.noop(); + knownCompleted.add(commands); + commands.for_each(ackOp); } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 98c21a8ab5..fa6bd14ef3 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -25,6 +25,7 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SessionState.h" +#include "qpid/framing/SequenceSet.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" @@ -83,6 +84,8 @@ class SessionState : public framing::SessionState, ConnectionState& getConnection(); uint32_t getTimeout() const { return timeout; } + void setTimeout(uint32_t t) { timeout = t; } + Broker& getBroker() { return broker; } framing::ProtocolVersion getVersion() const { return version; } @@ -93,10 +96,7 @@ class SessionState : public framing::SessionState, void handleCommand(framing::AMQMethodBody* method); void handleContent(framing::AMQFrame& frame); - void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); - void flush(); - void noop(); - void sync(); + void complete(const framing::SequenceSet& ranges); void sendCompletion(); //delivery adapter methods: @@ -114,6 +114,10 @@ class SessionState : public framing::SessionState, uint32_t ackInterval); + framing::SequenceSet completed; + framing::SequenceSet knownCompleted; + framing::SequenceNumber next; + private: typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; @@ -130,8 +134,6 @@ class SessionState : public framing::SessionState, BrokerAdapter adapter; MessageBuilder msgBuilder; - //execution state - IncomingExecutionContext incoming; framing::Window outgoing; RangedOperation ackOp; diff --git a/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp index bf53bf0cd6..2d3ecf3f6a 100644 --- a/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp +++ b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -83,7 +83,7 @@ void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){ void AccumulatedAck::consolidate(){} void AccumulatedAck::clear(){ - mark = 0;//not sure that this is valid when wraparound is a possibility + mark = SequenceNumber(0);//not sure that this is valid when wraparound is a possibility ranges.clear(); } diff --git a/qpid/cpp/src/qpid/framing/BodyHolder.cpp b/qpid/cpp/src/qpid/framing/BodyHolder.cpp index f66f29d36a..de971b5b28 100644 --- a/qpid/cpp/src/qpid/framing/BodyHolder.cpp +++ b/qpid/cpp/src/qpid/framing/BodyHolder.cpp @@ -48,6 +48,7 @@ void BodyHolder::encode(Buffer& b) const { void BodyHolder::decode(uint8_t type, Buffer& buffer, uint32_t size) { switch(type) { + case 0://CONTROL case METHOD_BODY: { ClassId c = buffer.getOctet(); MethodId m = buffer.getOctet(); diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp index c0cd210042..60d67f1b07 100644 --- a/qpid/cpp/src/qpid/framing/Buffer.cpp +++ b/qpid/cpp/src/qpid/framing/Buffer.cpp @@ -194,6 +194,13 @@ void Buffer::putShortString(const string& s){ position += len; } +void Buffer::putMediumString(const string& s){ + uint16_t len = s.length(); + putShort(len); + s.copy(data + position, len); + position += len; +} + void Buffer::putLongString(const string& s){ uint32_t len = s.length(); putLong(len); @@ -208,6 +215,13 @@ void Buffer::getShortString(string& s){ position += len; } +void Buffer::getMediumString(string& s){ + uint16_t len = getShort(); + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + void Buffer::getLongString(string& s){ uint32_t len = getLong(); checkAvailable(len); diff --git a/qpid/cpp/src/qpid/framing/Buffer.h b/qpid/cpp/src/qpid/framing/Buffer.h index 9c0d403462..585379b09a 100644 --- a/qpid/cpp/src/qpid/framing/Buffer.h +++ b/qpid/cpp/src/qpid/framing/Buffer.h @@ -97,8 +97,10 @@ class Buffer void putUInt(uint64_t); void putShortString(const string& s); + void putMediumString(const string& s); void putLongString(const string& s); void getShortString(string& s); + void getMediumString(string& s); void getLongString(string& s); void getBin128(uint8_t* b); diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp index 3172246cc2..1b62d296c6 100644 --- a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp +++ b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp @@ -51,9 +51,10 @@ const SequenceNumber SequenceNumber::operator++(int) return old; } -SequenceNumber SequenceNumber::operator+(uint32_t i) const +SequenceNumber& SequenceNumber::operator--() { - return SequenceNumber(value + i); + value = value - 1; + return *this; } bool SequenceNumber::operator<(const SequenceNumber& other) const diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.h b/qpid/cpp/src/qpid/framing/SequenceNumber.h index b2594452d0..0ed591b804 100644 --- a/qpid/cpp/src/qpid/framing/SequenceNumber.h +++ b/qpid/cpp/src/qpid/framing/SequenceNumber.h @@ -39,7 +39,7 @@ class SequenceNumber SequenceNumber& operator++();//prefix ++ const SequenceNumber operator++(int);//postfix ++ - SequenceNumber operator+(uint32_t) const; + SequenceNumber& operator--();//prefix ++ bool operator==(const SequenceNumber& other) const; bool operator!=(const SequenceNumber& other) const; bool operator<(const SequenceNumber& other) const; diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp new file mode 100644 index 0000000000..e3461e233b --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp @@ -0,0 +1,222 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SequenceSet.h" + +using namespace qpid::framing; +using std::max; +using std::min; + +namespace { +//each range contains 2 numbers, 4 bytes each +uint16_t RANGE_SIZE = 2 * 4; +} + +void SequenceSet::encode(Buffer& buffer) const +{ + buffer.putShort(ranges.size() * RANGE_SIZE); + for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + i->encode(buffer); + } +} + +void SequenceSet::decode(Buffer& buffer) +{ + uint16_t size = buffer.getShort(); + uint16_t count = size / RANGE_SIZE;//number of ranges + if (size % RANGE_SIZE) throw FrameErrorException(QPID_MSG("Invalid size for sequence set: " << size)); + + for (uint16_t i = 0; i < count; i++) { + add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong())); + } +} + +uint32_t SequenceSet::size() const +{ + return 2 /*size field*/ + (ranges.size() * RANGE_SIZE); +} + +bool SequenceSet::contains(const SequenceNumber& point) const +{ + for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + if (i->contains(point)) return true; + } + return false; +} + +void SequenceSet::add(const SequenceNumber& s) +{ + add(s, s); +} + +void SequenceSet::add(const SequenceNumber& start, const SequenceNumber& end) +{ + if (start > end) { + add(end, start); + } else { + Range r(start, end); + bool merged = false; + Ranges::iterator i = ranges.begin(); + while (i != ranges.end() && !merged && i->start < start) { + if (i->merge(r)) merged = true; + i++; + } + if (!merged) { + ranges.insert(i, r); + } + } +} + +void SequenceSet::add(const SequenceSet& set) +{ + for (Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) { + add(i->start, i->end); + } +} + +void SequenceSet::remove(const SequenceSet& set) +{ + for (Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) { + remove(i->start, i->end); + } +} + +void SequenceSet::remove(const SequenceNumber& start, const SequenceNumber& end) +{ + if (start > end) { + remove(end, start); + } else { + Ranges::iterator i = ranges.begin(); + while (i != ranges.end() && i->start < start) { + if (start <= i->end) { + if (end > i->end) { + //i.e. start is within the range pointed to by i, but end is not + i->end = (uint32_t)start - 1; + } else { + //whole of range to be deleted is contained within that pointed to be i + if (end == i->end) { + //just shrink range pointed to by i + i->end = (uint32_t)start - 1; + } else { + //need to split the range pointed to by i + Range r(i->start, (uint32_t)start - 1); + i->start = end + 1; + ranges.insert(i, r); + } + return;//no need to go any further + } + } + i++; + } + Ranges::iterator j = i; + while (j != ranges.end() && j->end < end) { + j++; + } + if (j->start <= end){ + j->start = end + 1; + } + ranges.erase(i, j); + } +} + +void SequenceSet::remove(const SequenceNumber& s) +{ + for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= i->start; i++) { + if (i->start == s) { + if (i->start == i->end) { + ranges.erase(i); + } else { + ++(i->start); + } + } else if (i->end == s) { + --(i->end); + } else if (i->contains(s)) { + //need to split range pointed to by i + Range r(i->start, (uint32_t)s - 1); + i->start = s + 1; + ranges.insert(i, r); + } + } +} + +bool SequenceSet::empty() const +{ + return ranges.empty(); +} + +void SequenceSet::clear() +{ + return ranges.clear(); +} + +bool SequenceSet::Range::contains(SequenceNumber i) const +{ + return i >= start && i <= end; +} + +bool SequenceSet::Range::intersects(const Range& r) const +{ + return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end); +} + +bool SequenceSet::Range::merge(const Range& r) +{ + if (intersects(r) || mergeable(r.end) || r.mergeable(end)) { + start = min(start, r.start); + end = max(end, r.end); + return true; + } else { + return false; + } +} + +bool SequenceSet::Range::mergeable(const SequenceNumber& s) const +{ + if (contains(s) || start - s == 1) { + return true; + } else { + return false; + } +} + +void SequenceSet::Range::encode(Buffer& buffer) const +{ + buffer.putLong(start); + buffer.putLong(end); +} + +SequenceSet::Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {} + +namespace qpid{ +namespace framing{ + +std::ostream& operator<<(std::ostream& out, const SequenceSet& set) { + out << "{"; + for (SequenceSet::Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) { + if (i != set.ranges.begin()) out << ", "; + out << i->start.getValue() << "-" << i->end.getValue(); + } + out << "}"; + return out; +} + +} +} diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.h b/qpid/cpp/src/qpid/framing/SequenceSet.h new file mode 100644 index 0000000000..2f34cb5cba --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceSet.h @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _framing_SequenceSet_h +#define _framing_SequenceSet_h + +#include <ostream> +#include <list> +#include "amqp_types.h" +#include "Buffer.h" +#include "SequenceNumber.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace framing { + +class SequenceSet +{ + struct Range + { + SequenceNumber start; + SequenceNumber end; + + Range(SequenceNumber s, SequenceNumber e); + bool contains(SequenceNumber i) const; + bool intersects(const Range& r) const; + bool merge(const Range& r); + bool mergeable(const SequenceNumber& r) const; + void encode(Buffer& buffer) const; + }; + + typedef std::list<Range> Ranges; + Ranges ranges; + +public: + SequenceSet() {} + SequenceSet(const SequenceNumber& s) { add(s); } + + void encode(Buffer& buffer) const; + void decode(Buffer& buffer); + uint32_t size() const; + + bool contains(const SequenceNumber& s) const; + void add(const SequenceNumber& s); + void add(const SequenceNumber& start, const SequenceNumber& end); + void add(const SequenceSet& set); + void remove(const SequenceNumber& s); + void remove(const SequenceNumber& start, const SequenceNumber& end); + void remove(const SequenceSet& set); + + void clear(); + bool empty() const; + + template <class T> + void for_each(T& t) const + { + for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + t(i->start, i->end); + } + } + + friend std::ostream& operator<<(std::ostream&, const SequenceSet&); +}; + + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/amqp_types.h b/qpid/cpp/src/qpid/framing/amqp_types.h index 94442aa357..943970cc56 100644 --- a/qpid/cpp/src/qpid/framing/amqp_types.h +++ b/qpid/cpp/src/qpid/framing/amqp_types.h @@ -65,6 +65,7 @@ const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX); class FramingContent; class FieldTable; class SequenceNumberSet; +class SequenceSet; class Uuid; }} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/amqp_types_full.h b/qpid/cpp/src/qpid/framing/amqp_types_full.h index f1ed44ec05..da7bdc876d 100644 --- a/qpid/cpp/src/qpid/framing/amqp_types_full.h +++ b/qpid/cpp/src/qpid/framing/amqp_types_full.h @@ -34,6 +34,7 @@ #include "FramingContent.h" #include "FieldTable.h" #include "SequenceNumberSet.h" +#include "SequenceSet.h" #include "Uuid.h" #endif /*!_framing_amqp_types_decl_h*/ diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 0586eb9d36..c24205f53e 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -94,7 +94,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { std::queue<framing::AMQFrame> frameQueue; Mutex frameQueueLock; bool frameQueueClosed; - bool initiated; + bool isInitiated; bool readError; std::string identifier; bool isClient; @@ -105,7 +105,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { AsynchIOHandler() : inputHandler(0), frameQueueClosed(false), - initiated(false), + isInitiated(false), readError(false), isClient(false) {} @@ -128,6 +128,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { void send(framing::AMQFrame&); void close(); void activateOutput(); + void initiated(const framing::ProtocolInitiation&); + // Input side void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); @@ -259,13 +261,18 @@ void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } +void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi) +{ + write(pi); +} + // Input side void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (readError) { return; } framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - if(initiated){ + if(isInitiated){ framing::AMQFrame frame; try{ while(frame.decode(in)) { @@ -282,7 +289,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if(protocolInit.decode(in)){ QPID_LOG(debug, "INIT [" << identifier << "]"); inputHandler->initiated(protocolInit); - initiated = true; + isInitiated = true; } } // TODO: unreading needs to go away, and when we can cope @@ -324,10 +331,10 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ - if (isClient && !initiated) { + if (isClient && !isInitiated) { //get & write protocol header from upper layers write(inputHandler->getInitiation()); - initiated = true; + isInitiated = true; return; } ScopedLock<Mutex> l(frameQueueLock); diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h index 5a60ae4998..13407d9b9d 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h +++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h @@ -22,6 +22,7 @@ #define _ConnectionOutputHandler_ #include "qpid/framing/OutputHandler.h" +#include "qpid/framing/InitiationHandler.h" #include "OutputControl.h" namespace qpid { @@ -30,7 +31,7 @@ namespace sys { /** * Provides the output handler associated with a connection. */ -class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl +class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl, public framing::InitiationHandler { public: virtual void close() = 0; diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index d25378a519..0baf1a2763 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -38,6 +38,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ InlineVector.cpp \ ISList.cpp IList.cpp \ ClientSessionTest.cpp \ + SequenceSet.cpp \ serialize.cpp \ ProxyTemplate.cpp apply.cpp # FIXME aconway 2008-02-20: removed RefCountedMap.cpp due to valgrind error. diff --git a/qpid/cpp/src/tests/SequenceSet.cpp b/qpid/cpp/src/tests/SequenceSet.cpp new file mode 100644 index 0000000000..bffeed648e --- /dev/null +++ b/qpid/cpp/src/tests/SequenceSet.cpp @@ -0,0 +1,93 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/SequenceSet.h" +#include "unit_test.h" + +QPID_AUTO_TEST_SUITE(SequenceSetTestSuite) + +using namespace qpid::framing; + +BOOST_AUTO_TEST_CASE(testAdd) { + SequenceSet s; + s.add(2); + s.add(8,8); + s.add(3,5); + + for (uint32_t i = 0; i <= 1; i++) //0, 1 + BOOST_CHECK(!s.contains(i)); + + for (uint32_t i = 2; i <= 5; i++) //2, 3, 4 & 5 + BOOST_CHECK(s.contains(i)); + + for (uint32_t i = 0; i <= 1; i++) //6, 7 + BOOST_CHECK(!s.contains(i)); + + BOOST_CHECK(s.contains(8));//8 + + SequenceSet t; + t.add(6, 10); + t.add(s); + + for (uint32_t i = 0; i <= 1; i++) + BOOST_CHECK(!t.contains(i)); + + for (uint32_t i = 2; i <= 10; i++) + BOOST_CHECK(t.contains(i)); +} + +BOOST_AUTO_TEST_CASE(testRemove) { + SequenceSet s; + SequenceSet t; + s.add(0, 10); + t.add(0, 10); + + s.remove(7); + s.remove(3, 5); + s.remove(9, 10); + + t.remove(s); + + for (uint32_t i = 0; i <= 2; i++) { + BOOST_CHECK(s.contains(i)); + BOOST_CHECK(!t.contains(i)); + } + + for (uint32_t i = 3; i <= 5; i++) { + BOOST_CHECK(!s.contains(i)); + BOOST_CHECK(t.contains(i)); + } + + BOOST_CHECK(s.contains(6)); + BOOST_CHECK(!t.contains(6)); + + BOOST_CHECK(!s.contains(7)); + BOOST_CHECK(t.contains(7)); + + BOOST_CHECK(s.contains(8)); + BOOST_CHECK(!t.contains(8)); + + for (uint32_t i = 9; i <= 10; i++) { + BOOST_CHECK(!s.contains(i)); + BOOST_CHECK(t.contains(i)); + } +} + +QPID_AUTO_TEST_SUITE_END() + + diff --git a/qpid/cpp/xml/extra.xml b/qpid/cpp/xml/extra.xml new file mode 100644 index 0000000000..c5b91271c3 --- /dev/null +++ b/qpid/cpp/xml/extra.xml @@ -0,0 +1,585 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> + +<amqp major="99" minor="0" port="5672"> + + <domain name="mediumstr" type="mediumstr" label="string with 16bit size field" /> + + <domain name="sequence-set" type="sequence-set" label="ranged set representation"> + <doc> + Set of pairs of RFC-1982 numbers representing a discontinuous range. Each pair represents a + closed interval within the list. + + For example, the set (1,3), (6,6), (8,9) represents the sequence 1,2,3,6,8,9. + </doc> + </domain> + +<class name = "connection010" index = "1"> + +<method name = "start" index="1"> + <doc>new start method</doc> + <chassis name="client" implement="MUST" /> + + <response name="start-ok" /> + + <field name="server-properties" domain="table" label="server properties"> + <doc>blah, blah</doc> + </field> + + <field name="mechanisms" domain="longstr" label="available security mechanisms"> + <doc>blah, blah</doc> + </field> + + <field name="locales" domain="longstr" label="available message locales"> + </field> + +</method> + +<method name = "start-ok" index="2"> + <doc>new start-ok method</doc> + <chassis name="server" implement="MUST" /> + + <field name="client-properties" domain="table" label="server properties"> + <doc>blah, blah</doc> + </field> + + <field name="mechanism" domain="shortstr" label="chosen security mechanism"> + <doc>blah, blah</doc> + </field> + + <field name="response" domain="longstr" label="security response data"> + <doc>blah blah</doc> + </field> + + <field name="locale" domain="shortstr" label="chosen locale"> + <doc>blah, blah</doc> + </field> + +</method> + + <method name="secure" synchronous="1" index="3" label="security mechanism challenge"> + <doc> + The SASL protocol works by exchanging challenges and responses until both peers have + received sufficient information to authenticate each other. This method challenges the + client to provide more information. + </doc> + + <chassis name="client" implement="MUST" /> + + <response name="secure-ok" /> + + <field name="challenge" domain="longstr" label="security challenge data"> + <doc> + Challenge information, a block of opaque binary data passed to the security mechanism. + </doc> + </field> + </method> + + <!-- - Method: connection.secure-ok - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> + + <method name="secure-ok" synchronous="1" index="4" label="security mechanism response"> + <doc> + This method attempts to authenticate, passing a block of SASL data for the security + mechanism at the server side. + </doc> + + <chassis name="server" implement="MUST" /> + + <field name="response" domain="longstr" label="security response data"> + <doc> + A block of opaque data passed to the security mechanism. The contents of this data are + defined by the SASL security mechanism. + </doc> + <assert check="notnull" /> + </field> + </method> + + <method name="tune" synchronous="1" index="5" label="propose connection tuning parameters"> + <doc> + This method proposes a set of connection configuration values to the client. The client can + accept and/or adjust these. + </doc> + + <chassis name="client" implement="MUST" /> + + <response name="tune-ok" /> + + <field name="channel-max" domain="short" label="proposed maximum channels"> + <doc> + The maximum total number of channels that the server allows per connection. Zero means + that the server does not impose a fixed limit, but the number of allowed channels may be + limited by available server resources. + </doc> + </field> + + <field name="frame-max" domain="long" label="proposed maximum frame size"> + <doc> + The largest frame size that the server proposes for the connection. The client can + negotiate a lower value. Zero means that the server does not impose any specific limit but + may reject very large frames if it cannot allocate resources for them. + </doc> + + <rule name="minimum"> + <doc> + Until the frame-max has been negotiated, both peers MUST accept frames of up to + frame-min-size octets large, and the minimum negotiated value for frame-max is also + frame-min-size. + </doc> + <doc type="scenario"> + Client connects to server and sends a large properties field, creating a frame of + frame-min-size octets. The server must accept this frame. + </doc> + </rule> + </field> + + <field name="heartbeat" domain="short" label="desired heartbeat delay"> + <!-- TODO 0.82 - the heartbeat negotiation mechanism was changed during implementation + because the model documented here does not actually work properly. The best model we + found is that the server proposes a heartbeat value to the client; the client can reply + with zero, meaning 'do not use heartbeats (as documented here), or can propose its own + heartbeat value, which the server should then accept. This is different from the model + here which is disconnected - e.g. each side requests a heartbeat independently. Basically + a connection is heartbeated in both ways, or not at all, depending on whether both peers + support heartbeating or not, and the heartbeat value should itself be chosen by the client + so that remote links can get a higher value. Also, the actual heartbeat mechanism needs + documentation, and is as follows: so long as there is activity on a connection - in or out + - both peers assume the connection is active. When there is no activity, each peer must + send heartbeat frames. When no heartbeat frame is received after N cycles (where N is at + least 2), the connection can be considered to have died. /PH 2006/07/19 + --> + <doc> + The delay, in seconds, of the connection heartbeat that the server wants. Zero means the + server does not want a heartbeat. + </doc> + </field> + </method> + + <!-- - Method: connection.tune-ok - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> + + <method name="tune-ok" synchronous="1" index="6" + label="negotiate connection tuning parameters"> + <doc> + This method sends the client's connection tuning parameters to the server. Certain fields + are negotiated, others provide capability information. + </doc> + + <chassis name="server" implement="MUST" /> + + <field name="channel-max" domain="short" label="negotiated maximum channels"> + <doc> + The maximum total number of channels that the client will use per connection. + </doc> + + <rule name="upper-limit"> + <doc> + If the client specifies a channel max that is higher than the value provided by the + server, the server MUST close the connection without attempting a negotiated close. The + server may report the error in some fashion to assist implementors. + </doc> + </rule> + + <assert check="notnull" /> + <assert check="le" value="channel-max" /> + </field> + + <field name="frame-max" domain="long" label="negotiated maximum frame size"> + <doc> + The largest frame size that the client and server will use for the connection. Zero means + that the client does not impose any specific limit but may reject very large frames if it + cannot allocate resources for them. Note that the frame-max limit applies principally to + content frames, where large contents can be broken into frames of arbitrary size. + </doc> + + <rule name="minimum"> + <doc> + Until the frame-max has been negotiated, both peers MUST accept frames of up to + frame-min-size octets large, and the minimum negotiated value for frame-max is also + frame-min-size. + </doc> + </rule> + + <rule name="upper-limit"> + <doc> + If the client specifies a frame max that is higher than the value provided by the + server, the server MUST close the connection without attempting a negotiated close. The + server may report the error in some fashion to assist implementors. + </doc> + </rule> + </field> + + <field name="heartbeat" domain="short" label="desired heartbeat delay"> + <doc> + The delay, in seconds, of the connection heartbeat that the client wants. Zero means the + client does not want a heartbeat. + </doc> + </field> + </method> + + <method name="open" synchronous="1" index="7" label="open connection to virtual host"> + <doc> + This method opens a connection to a virtual host, which is a collection of resources, and + acts to separate multiple application domains within a server. The server may apply + arbitrary limits per virtual host, such as the number of each type of entity that may be + used, per connection and/or in total. + </doc> + + <chassis name="server" implement="MUST" /> + + <response name="open-ok" /> + <response name="redirect" /> + + <field name="virtual-host" domain="path" label="virtual host name"> + <!-- TODO 0.82 - the entire vhost model needs review. This concept was prompted by the HTTP + vhost concept but does not fit very well into AMQP. Currently we use the vhost as a + "cluster identifier" which is inaccurate usage. /PH 2006/07/19 + --> + <doc> + The name of the virtual host to work with. + </doc> + + <rule name="separation"> + <doc> + If the server supports multiple virtual hosts, it MUST enforce a full separation of + exchanges, queues, and all associated entities per virtual host. An application, + connected to a specific virtual host, MUST NOT be able to access resources of another + virtual host. + </doc> + </rule> + + <rule name="security"> + <doc> + The server SHOULD verify that the client has permission to access the specified virtual + host. + </doc> + </rule> + <assert check="regexp" value="^[a-zA-Z0-9/-_]+$" /> + </field> + + <field name="capabilities" domain="shortstr" label="required capabilities"> + <doc> + The client can specify zero or more capability names, delimited by spaces. The server can + use this string to how to process the client's connection request. + </doc> + </field> + + <field name="insist" domain="bit" label="insist on connecting to server"> + <doc> + In a configuration with multiple collaborating servers, the server may respond to a + Connection.Open method with a Connection.Redirect. The insist option tells the server that + the client is insisting on a connection to the specified server. + </doc> + <rule name="behaviour"> + <doc> + When the client uses the insist option, the server MUST NOT respond with a + Connection.Redirect method. If it cannot accept the client's connection request it + should respond by closing the connection with a suitable reply code. + </doc> + </rule> + </field> + </method> + + <method name="open-ok" synchronous="1" index="8" label="signal that connection is ready"> + <doc> + This method signals to the client that the connection is ready for use. + </doc> + + <chassis name="client" implement="MUST" /> + + <field name="known-hosts" domain="known-hosts" /> + </method> + + <method name="redirect" synchronous="1" index="9" label="redirects client to other server"> + <doc> + This method redirects the client to another server, based on the requested virtual host + and/or capabilities. + </doc> + + <rule name="usage"> + <doc> + When getting the Connection.Redirect method, the client SHOULD reconnect to the host + specified, and if that host is not present, to any of the hosts specified in the + known-hosts list. + </doc> + </rule> + + <chassis name="client" implement="MUST" /> + + <field name="host" domain="shortstr" label="server to connect to"> + <doc> + Specifies the server to connect to. This is an IP address or a DNS name, optionally + followed by a colon and a port number. If no port number is specified, the client should + use the default port number for the protocol. + </doc> + <assert check="notnull" /> + </field> + + <field name="known-hosts" domain="known-hosts" /> + </method> + +<method name = "heartbeat" index="10"> + <doc>new start-ok method</doc> + <chassis name="server" implement="MUST" /> +</method> + + <!-- - Method: connection.close - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> + + <method name="close" synchronous="1" index="11" label="request a connection close"> + <doc> + This method indicates that the sender wants to close the connection. This may be due to + internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, + i.e. an exception. When a close is due to an exception, the sender provides the class and + method id of the method which caused the exception. + </doc> + <!-- TODO: The connection close mechanism needs to be reviewed from the ODF documentation and + better expressed as rules here. /PH 2006/07/20 + --> + + <rule name="stability"> + <doc> + After sending this method any received method except the Close-OK method MUST be + discarded. + </doc> + </rule> + + <chassis name="client" implement="MUST" /> + <chassis name="server" implement="MUST" /> + + <response name="close-ok" /> + + <field name="reply-code" domain="reply-code" /> + <field name="reply-text" domain="reply-text" /> + + <field name="class-id" domain="class-id" label="failing method class"> + <doc> + When the close is provoked by a method exception, this is the class of the method. + </doc> + </field> + + <field name="method-id" domain="method-id" label="failing method ID"> + <doc> + When the close is provoked by a method exception, this is the ID of the method. + </doc> + </field> + </method> + + <!-- - Method: connection.close-ok - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> + + <method name="close-ok" synchronous="1" index="12" label="confirm a connection close"> + <doc> + This method confirms a Connection.Close method and tells the recipient that it is safe to + release resources for the connection and close the socket. + </doc> + + <rule name="reporting"> + <doc> + A peer that detects a socket closure without having received a Close-Ok handshake method + SHOULD log the error. + </doc> + </rule> + + <chassis name="client" implement="MUST" /> + <chassis name="server" implement="MUST" /> + </method> + + +</class> + + + +<class name = "session010" index = "2"> + +<method name = "attach" index="1"> + + <doc>blah, blah</doc> + <chassis name="client" implement="MUST" /> + <chassis name="server" implement="MUST" /> + + <response name="start-ok" /> + + <field name="name" domain="mediumstr" label="blah"> + <doc>blah, blah</doc> + </field> + + <field name="force" domain="bit" label="blah"> + <doc>blah, blah</doc> + </field> + +</method> + +<method name = "attached" index="2"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="name" domain="mediumstr" label="blah"> + <doc>blah, blah</doc> + </field> + +</method> + +<method name = "detach" index="3"> + + <doc>blah, blah</doc> + <chassis name="client" implement="MUST" /> + <chassis name="server" implement="MUST" /> + + <response name="start-ok" /> + + <field name="name" domain="mediumstr" label="blah"> + <doc>blah, blah</doc> + </field> + +</method> + +<method name = "detached" index="4"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="name" domain="mediumstr" label="blah"> + <doc>blah, blah</doc> + </field> + + + <field name="detach-code" domain="octet" label="blah"> + <doc>blah, blah</doc> + </field> + +</method> + +<method name = "request-timeout" index="5"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="timeout" domain="long" label="blah"> + <doc>blah, blah</doc> + </field> +</method> + +<method name = "timeout" index="6"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="timeout" domain="long" label="blah"> + <doc>blah, blah</doc> + </field> +</method> + + +<method name = "command-point" index="7"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="command-id" domain="rfc1982-long" label="blah"> + <doc>blah, blah</doc> + </field> + + + <field name="command-offset" domain="longlong" label="blah"> + <doc>blah, blah</doc> + </field> +</method> + +<method name = "expected" index="8"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="commands" domain="sequence-set" label="blah"> + <doc>blah, blah</doc> + </field> + + <field name="fragments" domain="array" label="blah"> + <doc>blah, blah</doc> + </field> +</method> + +<method name = "confirmed" index="9"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="commands" domain="sequence-set" label="blah"> + <doc>blah, blah</doc> + </field> + + <field name="fragments" domain="array" label="blah"> + <doc>blah, blah</doc> + </field> +</method> + +<method name = "completed" index="10"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="commands" domain="sequence-set" label="blah"> + <doc>blah, blah</doc> + </field> + + <field name="timely-reply" domain="bit" label="blah"> + <doc>blah, blah</doc> + </field> +</method> + +<method name = "known-completed" index="11"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="commands" domain="sequence-set" label="blah"> + <doc>blah, blah</doc> + </field> +</method> + +<method name = "flush" index="12"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="expected" domain="bit" label="blah"> + <doc>blah, blah</doc> + </field> + <field name="confirmed" domain="bit" label="blah"> + <doc>blah, blah</doc> + </field> + <field name="completed" domain="bit" label="blah"> + <doc>blah, blah</doc> + </field> +</method> + +<method name = "gap" index="13"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="commands" domain="sequence-set" label="blah"> + <doc>blah, blah</doc> + </field> +</method> + +</class> + +</amqp> |
