diff options
Diffstat (limited to 'qpid/cpp')
57 files changed, 782 insertions, 388 deletions
diff --git a/qpid/cpp/examples/messaging/client.cpp b/qpid/cpp/examples/messaging/client.cpp index 983f0a8878..44720ef3ce 100644 --- a/qpid/cpp/examples/messaging/client.cpp +++ b/qpid/cpp/examples/messaging/client.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,7 +39,7 @@ using std::string; int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; std::string connectionOptions = argc > 2 ? argv[2] : ""; - + Connection connection(url, connectionOptions); try { connection.open(); @@ -62,10 +62,11 @@ int main(int argc, char** argv) { Message request; request.setReplyTo(responseQueue); for (int i=0; i<4; i++) { - request.setContent(s[i]); + request.setContentObject(s[i]); sender.send(request); Message response = receiver.fetch(); - std::cout << request.getContent() << " -> " << response.getContent() << std::endl; + std::cout << request.getContentObject() << " -> " << response.getContentObject() << std::endl; + session.acknowledge(response); } connection.close(); return 0; diff --git a/qpid/cpp/examples/messaging/server.cpp b/qpid/cpp/examples/messaging/server.cpp index aa271d91f9..ba9fa9e063 100644 --- a/qpid/cpp/examples/messaging/server.cpp +++ b/qpid/cpp/examples/messaging/server.cpp @@ -52,15 +52,31 @@ int main(int argc, char** argv) { const Address& address = request.getReplyTo(); if (address) { Sender sender = session.createSender(address); - std::string s = request.getContent(); - std::transform(s.begin(), s.end(), s.begin(), toupper); - Message response(s); + Message response; + + qpid::types::Variant requestObj = request.getContentObject(); + if (requestObj.getType() == qpid::types::VAR_STRING) { + // Received a string. + // Server returns request string in upper case with same encoding. + std::string s = requestObj; + std::transform(s.begin(), s.end(), s.begin(), toupper); + qpid::types::Variant responseObj(s); + responseObj.setEncoding( requestObj.getEncoding() ); + response.setContentObject( responseObj ); + } else { + // Received something other than a string. + // Server echos received object as a utf8 string. + qpid::types::Variant responseObj( requestObj.asString() ); + responseObj.setEncoding( "utf8" ); + response.setContentObject( requestObj ); + } sender.send(response); - std::cout << "Processed request: " - << request.getContent() - << " -> " - << response.getContent() << std::endl; + std::cout << "Processed request: " + << request.getContentObject() + << " -> " + << response.getContentObject() << std::endl; session.acknowledge(); + sender.close(); } else { std::cerr << "Error: no reply address specified for request: " << request.getContent() << std::endl; session.reject(request); diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index fe7a809cee..3e5165dfb0 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1077,6 +1077,7 @@ set (qpidbroker_SOURCES qpid/broker/IngressCompletion.cpp qpid/broker/Link.cpp qpid/broker/LinkRegistry.cpp + qpid/broker/LossyLvq.cpp qpid/broker/LossyQueue.cpp qpid/broker/Lvq.cpp qpid/broker/Message.cpp diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index 3be9f520e0..b2ff10bd68 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -22,7 +22,7 @@ find_package(Proton 0.5) set (amqp_default ${amqp_force}) -set (maximum_version 0.7) +set (maximum_version 0.9) if (Proton_FOUND) if (Proton_VERSION GREATER ${maximum_version}) message(WARNING "Qpid proton ${Proton_VERSION} is not a tested version and might not be compatible, ${maximum_version} is highest tested; build may not work") @@ -35,7 +35,11 @@ if (Proton_FOUND) endif (NOT Proton_VERSION EQUAL 0.5) if (Proton_VERSION GREATER 0.7) set (USE_PROTON_TRANSPORT_CONDITION 1) + set (HAVE_PROTON_EVENTS 1) endif (Proton_VERSION GREATER 0.7) + if (Proton_VERSION GREATER 0.8) + set (NO_PROTON_DELIVERY_TAG_T 1) + endif (Proton_VERSION GREATER 0.8) else () message(STATUS "Qpid proton not found, amqp 1.0 support not enabled") endif () diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake index f8139262d5..777fc1b893 100644 --- a/qpid/cpp/src/config.h.cmake +++ b/qpid/cpp/src/config.h.cmake @@ -58,5 +58,6 @@ #cmakedefine HAVE_LOG_FTP #cmakedefine HAVE_PROTON_TRACER #cmakedefine USE_PROTON_TRANSPORT_CONDITION - +#cmakedefine HAVE_PROTON_EVENTS +#cmakedefine NO_PROTON_DELIVERY_TAG_T #endif /* QPID_CONFIG_H */ diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index 87085b6d77..866f98071b 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -54,7 +54,7 @@ size_t Connection::decode(const char* buffer, size_t size) { } } framing::AMQFrame frame; - while(frame.decode(in)) { + while(!pushClosed && frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); connection->received(frame); } diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 43f39c2919..bd0dcbfc85 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -276,6 +276,7 @@ void SessionHandler::flush(bool expected, bool confirmed, bool completed) { } void SessionHandler::gap(const SequenceSet& /*commands*/) { + checkAttached(); throw NotImplementedException("session.gap not supported"); } diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index 1ab69f32d3..cb5d58977b 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -111,13 +111,14 @@ class AsyncCompletion : public virtual RefCounted qpid::sys::Mutex::ScopedLock l(callbackLock); if (active) { if (callback.get()) { + boost::intrusive_ptr<Callback> save = callback; + callback = boost::intrusive_ptr<Callback>(); // Nobody else can run callback. inCallback = true; { qpid::sys::Mutex::ScopedUnlock ul(callbackLock); - callback->completed(sync); + save->completed(sync); } inCallback = false; - callback = boost::intrusive_ptr<Callback>(); callbackLock.notifyAll(); } active = false; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 2afdc5a61d..8972040be5 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -93,14 +93,14 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) } else if (isOpen()) { handler->connection.getChannel(frame.getChannel()).in(frame); } else { - handler->proxy.close( + handler->connection.close( connection::CLOSE_CODE_FRAMING_ERROR, "Connection not yet open, invalid frame received."); } }catch(ConnectionException& e){ - handler->proxy.close(e.code, e.what()); + handler->connection.close(e.code, e.what()); }catch(std::exception& e){ - handler->proxy.close(541/*internal error*/, e.what()); + handler->connection.close(connection::CLOSE_CODE_CONNECTION_FORCED, e.what()); } } @@ -234,21 +234,25 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, void ConnectionHandler::Handler::open(const string& /*virtualHost*/, const framing::Array& /*capabilities*/, bool /*insist*/) { + if (connection.getUserId().empty()) { + throw ConnectionForcedException("Not authenticated!"); + } + if (connection.isFederationLink()) { AclModule* acl = connection.getBroker().getAcl(); if (acl && acl->userAclRules()) { if (!acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){ - proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, - QPID_MSG("ACL denied " << connection.getUserId() - << " creating a federation link")); + connection.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, + QPID_MSG("ACL denied " << connection.getUserId() + << " creating a federation link")); return; } } else { if (connection.getBroker().isAuthenticating()) { - proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, - QPID_MSG("User " << connection.getUserId() - << " federation connection denied. Systems with authentication " - "enabled must specify ACL create link rules.")); + connection.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, + QPID_MSG("User " << connection.getUserId() + << " federation connection denied. Systems with authentication " + "enabled must specify ACL create link rules.")); return; } } @@ -302,6 +306,11 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, const framing::Array& supportedMechanisms, const framing::Array& /*locales*/) { + if (serverMode) { + throw ConnectionForcedException("Invalid protocol sequence."); + } + + string requestedMechanism = connection.getAuthMechanism(); std::string username = connection.getUsername(); @@ -388,6 +397,10 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, void ConnectionHandler::Handler::secure(const string& challenge ) { + if (serverMode) { + throw ConnectionForcedException("Invalid protocol sequence."); + } + if (sasl.get()) { string response = sasl->step(challenge); proxy.secureOk(response); @@ -402,6 +415,10 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax, uint16_t /*heartbeatMin*/, uint16_t heartbeatMax) { + if (serverMode) { + throw ConnectionForcedException("Invalid protocol sequence."); + } + maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); connection.setFrameMax(maxFrameSize); @@ -420,6 +437,10 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax, void ConnectionHandler::Handler::openOk(const framing::Array& knownHosts) { + if (serverMode) { + throw ConnectionForcedException("Invalid protocol sequence."); + } + for (Array::ValueVector::const_iterator i = knownHosts.begin(); i != knownHosts.end(); ++i) { Url url((*i)->get<std::string>()); connection.getKnownHosts().push_back(url); diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index 7af2fe3cb4..30155fb903 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -100,13 +100,14 @@ class ConnectionHandler : public framing::FrameHandler std::auto_ptr<Handler> handler; bool handle(const qpid::framing::AMQMethodBody& method); + void close(framing::connection::CloseCode code, const std::string& text); public: ConnectionHandler(amqp_0_10::Connection& connection, bool isClient ); - void close(framing::connection::CloseCode code, const std::string& text); void heartbeat(); void handle(framing::AMQFrame& frame); void setSecureConnection(SecureConnection* secured); bool isOpen() { return handler->isOpen; } + friend class amqp_0_10::Connection; }; diff --git a/qpid/cpp/src/qpid/broker/LossyLvq.cpp b/qpid/cpp/src/qpid/broker/LossyLvq.cpp new file mode 100644 index 0000000000..f59ecc1925 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LossyLvq.cpp @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LossyLvq.h" +#include "MessageMap.h" + +namespace qpid { +namespace broker { + +LossyLvq::LossyLvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b) + : Queue(n, s, ms, p, b), Lvq(n, m, s, ms, p, b), LossyQueue(n, s, ms, p, b) {} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/LossyLvq.h b/qpid/cpp/src/qpid/broker/LossyLvq.h new file mode 100644 index 0000000000..e0a266ab77 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LossyLvq.h @@ -0,0 +1,41 @@ +#ifndef QPID_BROKER_LOSSYLVQ_H +#define QPID_BROKER_LOSSYLVQ_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Lvq.h" +#include "qpid/broker/LossyQueue.h" + +namespace qpid { +namespace broker { +class MessageMap; + +/** + * Combination of LossyQueue and Lvq behaviours. + */ +class LossyLvq : public Lvq, public LossyQueue +{ + public: + LossyLvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_LOSSYLVQ_H*/ diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.h b/qpid/cpp/src/qpid/broker/LossyQueue.h index 3e62151d6f..705865f449 100644 --- a/qpid/cpp/src/qpid/broker/LossyQueue.h +++ b/qpid/cpp/src/qpid/broker/LossyQueue.h @@ -29,7 +29,7 @@ namespace broker { /** * Drops messages to prevent a breach of any configured maximum depth. */ -class LossyQueue : public Queue +class LossyQueue : public virtual Queue { public: LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); diff --git a/qpid/cpp/src/qpid/broker/Lvq.h b/qpid/cpp/src/qpid/broker/Lvq.h index 335270a073..26ba2b4914 100644 --- a/qpid/cpp/src/qpid/broker/Lvq.h +++ b/qpid/cpp/src/qpid/broker/Lvq.h @@ -32,7 +32,7 @@ class MessageMap; * conjunction with the MessageMap class. This requires an existing * message to be 'replaced' by a newer message with the same key. */ -class Lvq : public Queue +class Lvq : public virtual Queue { public: Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index 7cb99514d5..f5e9332052 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -45,6 +45,9 @@ void MessageBuilder::handle(AMQFrame& frame) switch(state) { case METHOD: checkType(METHOD_BODY, type); + if (!frame.getMethod()->isA<qpid::framing::MessageTransferBody>()) + throw NotImplementedException(QPID_MSG("Unexpected method: " << *(frame.getMethod()))); + exchange = frame.castBody<qpid::framing::MessageTransferBody>()->getDestination(); state = HEADER; break; diff --git a/qpid/cpp/src/qpid/broker/Protocol.cpp b/qpid/cpp/src/qpid/broker/Protocol.cpp index 2ef8c66445..e9e7892499 100644 --- a/qpid/cpp/src/qpid/broker/Protocol.cpp +++ b/qpid/cpp/src/qpid/broker/Protocol.cpp @@ -35,8 +35,10 @@ ProtocolRegistry::ProtocolRegistry(const std::set<std::string>& e, Broker* b) : qpid::sys::ConnectionCodec* ProtocolRegistry::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s) { - if (v == qpid::framing::ProtocolVersion(0, 10) && isEnabled(AMQP_0_10)) { - return create_0_10(o, id, s, false); + if (v == qpid::framing::ProtocolVersion(0, 10)) { + if (isEnabled(AMQP_0_10)) { + return create_0_10(o, id, s, false); + } } qpid::sys::ConnectionCodec* codec = 0; for (Protocols::const_iterator i = protocols.begin(); !codec && i != protocols.end(); ++i) { @@ -51,7 +53,22 @@ qpid::sys::ConnectionCodec* ProtocolRegistry::create(qpid::sys::OutputControl& o return create_0_10(o, id, s, true); } -bool ProtocolRegistry::isEnabled(const std::string& name) +qpid::framing::ProtocolVersion ProtocolRegistry::supportedVersion() const +{ + if (isEnabled(AMQP_0_10)) { + return qpid::framing::ProtocolVersion(0,10); + } else { + for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) { + if (isEnabled(i->first)) { + return i->second->supportedVersion(); + } + } + } + QPID_LOG(error, "No enabled protocols!"); + return qpid::framing::ProtocolVersion(0,0); +} + +bool ProtocolRegistry::isEnabled(const std::string& name) const { return enabled.empty()/*if nothing is explicitly enabled, assume everything is*/ || enabled.find(name) != enabled.end(); } diff --git a/qpid/cpp/src/qpid/broker/Protocol.h b/qpid/cpp/src/qpid/broker/Protocol.h index 59a631848e..a7dbc98fff 100644 --- a/qpid/cpp/src/qpid/broker/Protocol.h +++ b/qpid/cpp/src/qpid/broker/Protocol.h @@ -61,6 +61,7 @@ class Protocol virtual qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) = 0; virtual boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&) = 0; virtual boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&) = 0; + virtual qpid::framing::ProtocolVersion supportedVersion() const = 0; private: }; @@ -70,6 +71,7 @@ class ProtocolRegistry : public Protocol, public qpid::sys::ConnectionCodec::Fac public: QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + QPID_BROKER_EXTERN qpid::framing::ProtocolVersion supportedVersion() const; QPID_BROKER_EXTERN boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&); QPID_BROKER_EXTERN boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&); QPID_BROKER_EXTERN Message decode(qpid::framing::Buffer&); @@ -86,7 +88,7 @@ class ProtocolRegistry : public Protocol, public qpid::sys::ConnectionCodec::Fac Broker* broker; qpid::sys::ConnectionCodec* create_0_10(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&, bool); - bool isEnabled(const std::string&); + bool isEnabled(const std::string&) const; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index d897029c85..b1f7d0524b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1258,6 +1258,10 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) if (has_userId) result.first->setOwningUser(_userId); + if (result.first->getSettings().autoDeleteDelay) { + result.first->scheduleAutoDelete(); + } + return result.first; } @@ -1296,10 +1300,10 @@ struct AutoDeleteTask : qpid::sys::TimerTask } }; -void Queue::scheduleAutoDelete() +void Queue::scheduleAutoDelete(bool immediate) { if (canAutoDelete()) { - if (settings.autoDeleteDelay) { + if (!immediate && settings.autoDeleteDelay) { AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC)); autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(shared_from_this(), time)); broker->getTimer().add(autoDeleteTask); @@ -1339,7 +1343,7 @@ bool Queue::isExclusiveOwner(const OwnershipToken* const o) const return o == owner; } -void Queue::releaseExclusiveOwnership() +void Queue::releaseExclusiveOwnership(bool immediateExpiry) { bool unused; { @@ -1351,7 +1355,7 @@ void Queue::releaseExclusiveOwnership() unused = !users.isUsed(); } if (unused && settings.autodelete) { - scheduleAutoDelete(); + scheduleAutoDelete(immediateExpiry); } } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 65a91b8729..efca9b9d40 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -379,7 +379,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN uint32_t getConsumerCount() const; inline const std::string& getName() const { return name; } QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const; - QPID_BROKER_EXTERN void releaseExclusiveOwnership(); + QPID_BROKER_EXTERN void releaseExclusiveOwnership(bool immediateExpiry=false); QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o); QPID_BROKER_EXTERN bool hasExclusiveConsumer() const; QPID_BROKER_EXTERN bool hasExclusiveOwner() const; @@ -389,7 +389,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, inline bool isAutoDelete() const { return settings.autodelete; } inline bool isBrowseOnly() const { return settings.isBrowseOnly; } QPID_BROKER_EXTERN bool canAutoDelete() const; - QPID_BROKER_EXTERN void scheduleAutoDelete(); + QPID_BROKER_EXTERN void scheduleAutoDelete(bool immediate=false); QPID_BROKER_EXTERN bool isDeleted() const; const QueueBindings& getBindings() const { return bindings; } diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp index 8104fff740..16cdea3b0a 100644 --- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/LossyLvq.h" #include "qpid/broker/LossyQueue.h" #include "qpid/broker/Lvq.h" #include "qpid/broker/Messages.h" @@ -51,10 +52,17 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que boost::shared_ptr<QueueFlowLimit> flow_ptr(QueueFlowLimit::createLimit(name, settings)); //1. determine Queue type (i.e. whether we are subclassing Queue) - // -> if 'ring' policy is in use then subclass boost::shared_ptr<Queue> queue; if (settings.dropMessagesAtLimit) { - queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); + // -> if 'ring' policy is in use then subclass + if (settings.lvqKey.size()) { + //combination of ring and lvq: + std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey)); + queue = boost::shared_ptr<Queue>(new LossyLvq(name, map, settings, settings.durable ? store : 0, parent, broker)); + } else { + //simple ring: + queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); + } } else if (settings.selfDestructAtLimit) { queue = boost::shared_ptr<Queue>(new SelfDestructQueue(name, settings, settings.durable ? store : 0, parent, broker)); } else if (settings.lvqKey.size()) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 04bbe8b944..3a93e2aac5 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -37,6 +37,9 @@ extern "C" { #include <proton/engine.h> #include <proton/error.h> +#ifdef HAVE_PROTON_EVENTS +#include <proton/event.h> +#endif } namespace qpid { @@ -117,8 +120,14 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated), connection(pn_connection()), transport(pn_transport()), + collector(0), out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false) { +#ifdef HAVE_PROTON_EVENTS + collector = pn_collector(); + pn_connection_collect(connection, collector); +#endif + if (pn_transport_bind(transport, connection)) { //error QPID_LOG(error, "Failed to bind transport to connection: " << getError()); @@ -157,6 +166,9 @@ Connection::~Connection() getBroker().getConnectionObservers().closed(*this); pn_transport_free(transport); pn_connection_free(connection); +#ifdef HAVE_PROTON_EVENTS + pn_collector_free(collector); +#endif } pn_transport_t* Connection::getTransport() @@ -222,10 +234,15 @@ size_t Connection::encode(char* buffer, size_t size) void Connection::doOutput(size_t capacity) { - for (ssize_t n = pn_transport_pending(transport); n > 0 && n < (ssize_t) capacity; n = pn_transport_pending(transport)) { - if (dispatch()) processDeliveries(); - else break; - } + ssize_t n = 0; + do { + if (dispatch()) { + processDeliveries(); + ssize_t next = pn_transport_pending(transport); + if (n == next) break; + n = next; + } else break; + } while (n > 0 && n < (ssize_t) capacity); } bool Connection::dispatch() @@ -327,122 +344,92 @@ framing::ProtocolVersion Connection::getVersion() const { return qpid::framing::ProtocolVersion(1,0); } -namespace { -pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE; -pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; -} void Connection::process() { QPID_LOG(trace, id << " process()"); +#ifdef HAVE_PROTON_EVENTS + pn_event_t *event = pn_collector_peek(collector); + while (event) { + switch (pn_event_type(event)) { + case PN_CONNECTION_REMOTE_OPEN: + doConnectionRemoteOpen(); + break; + case PN_CONNECTION_REMOTE_CLOSE: + doConnectionRemoteClose(); + break; + case PN_SESSION_REMOTE_OPEN: + doSessionRemoteOpen(pn_event_session(event)); + break; + case PN_SESSION_REMOTE_CLOSE: + doSessionRemoteClose(pn_event_session(event)); + break; + case PN_LINK_REMOTE_OPEN: + doLinkRemoteOpen(pn_event_link(event)); + break; + case PN_LINK_REMOTE_CLOSE: + doLinkRemoteClose(pn_event_link(event)); + break; + case PN_DELIVERY: + doDeliveryUpdated(pn_event_delivery(event)); + break; + default: + break; + } + pn_collector_pop(collector); + event = pn_collector_peek(collector); + } + +#else // !HAVE_PROTON_EVENTS + + const pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE; + const pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; + if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) { - QPID_LOG_CAT(debug, model, id << " connection opened"); - open(); - setContainerId(pn_connection_remote_container(connection)); + doConnectionRemoteOpen(); } for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) { - QPID_LOG_CAT(debug, model, id << " session begun"); - pn_session_open(s); - boost::shared_ptr<Session> ssn(new Session(s, *this, out)); - sessions[s] = ssn; + doSessionRemoteOpen(s); } for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) { - pn_link_open(l); - - Sessions::iterator session = sessions.find(pn_link_session(l)); - if (session == sessions.end()) { - QPID_LOG(error, id << " Link attached on unknown session!"); - } else { - try { - session->second->attach(l); - QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l)); - } catch (const Exception& e) { - QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - pn_condition_t* error = pn_link_condition(l); - pn_condition_set_name(error, e.symbol()); - pn_condition_set_description(error, e.what()); - pn_link_close(l); - } catch (const qpid::framing::UnauthorizedAccessException& e) { - QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - pn_condition_t* error = pn_link_condition(l); - pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str()); - pn_condition_set_description(error, e.what()); - pn_link_close(l); - } catch (const std::exception& e) { - QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - pn_condition_t* error = pn_link_condition(l); - pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); - pn_condition_set_description(error, e.what()); - pn_link_close(l); - } - } + doLinkRemoteOpen(l); } processDeliveries(); for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) { - pn_link_close(l); - Sessions::iterator session = sessions.find(pn_link_session(l)); - if (session == sessions.end()) { - QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); - } else { - session->second->detach(l); - QPID_LOG_CAT(debug, model, id << " link detached"); - } + doLinkRemoteClose(l); } for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) { - pn_session_close(s); - Sessions::iterator i = sessions.find(s); - if (i != sessions.end()) { - i->second->close(); - sessions.erase(i); - QPID_LOG_CAT(debug, model, id << " session ended"); - } else { - QPID_LOG(error, id << " peer attempted to close unrecognised session"); - } + doSessionRemoteClose(s); } if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { - QPID_LOG_CAT(debug, model, id << " connection closed"); - pn_connection_close(connection); + doConnectionRemoteClose(); } +#endif // !HAVE_PROTON_EVENTS } namespace { std::string convert(pn_delivery_tag_t in) { +#ifdef NO_PROTON_DELIVERY_TAG_T + return std::string(in.start, in.size); +#else return std::string(in.bytes, in.size); +#endif } } void Connection::processDeliveries() { - //handle deliveries +#ifdef HAVE_PROTON_EVENTS + // with the event API, there's no way to selectively process only + // the delivery-related events. We have to process all events: + process(); +#else for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) { - pn_link_t* link = pn_delivery_link(delivery); - try { - if (pn_link_is_receiver(link)) { - Sessions::iterator i = sessions.find(pn_link_session(link)); - if (i != sessions.end()) { - i->second->readable(link, delivery); - } else { - pn_delivery_update(delivery, PN_REJECTED); - } - } else { //i.e. SENDER - Sessions::iterator i = sessions.find(pn_link_session(link)); - if (i != sessions.end()) { - QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); - i->second->writable(link, delivery); - } else { - QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); - } - } - } catch (const Exception& e) { - QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what()); - pn_condition_t* error = pn_link_condition(link); - pn_condition_set_name(error, e.symbol()); - pn_condition_set_description(error, e.what()); - pn_link_close(link); - } + doDeliveryUpdated(delivery); } +#endif } std::string Connection::getError() @@ -470,4 +457,132 @@ void Connection::closedByManagement() closeRequested = true; out.activateOutput(); } + +// the peer has issued an Open performative +void Connection::doConnectionRemoteOpen() +{ + // respond in kind if we haven't yet + if ((pn_connection_state(connection) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) { + QPID_LOG_CAT(debug, model, id << " connection opened"); + open(); + setContainerId(pn_connection_remote_container(connection)); + } +} + +// the peer has issued a Close performative +void Connection::doConnectionRemoteClose() +{ + if ((pn_connection_state(connection) & PN_LOCAL_CLOSED) == 0) { + QPID_LOG_CAT(debug, model, id << " connection closed"); + pn_connection_close(connection); + } +} + +// the peer has issued a Begin performative +void Connection::doSessionRemoteOpen(pn_session_t *session) +{ + if ((pn_session_state(session) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) { + QPID_LOG_CAT(debug, model, id << " session begun"); + pn_session_open(session); + boost::shared_ptr<Session> ssn(new Session(session, *this, out)); + sessions[session] = ssn; + } +} + +// the peer has issued an End performative +void Connection::doSessionRemoteClose(pn_session_t *session) +{ + if ((pn_session_state(session) & PN_LOCAL_CLOSED) == 0) { + pn_session_close(session); + Sessions::iterator i = sessions.find(session); + if (i != sessions.end()) { + i->second->close(); + sessions.erase(i); + QPID_LOG_CAT(debug, model, id << " session ended"); + } else { + QPID_LOG(error, id << " peer attempted to close unrecognised session"); + } + } +} + +// the peer has issued an Attach performative +void Connection::doLinkRemoteOpen(pn_link_t *link) +{ + if ((pn_link_state(link) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) { + pn_link_open(link); + Sessions::iterator session = sessions.find(pn_link_session(link)); + if (session == sessions.end()) { + QPID_LOG(error, id << " Link attached on unknown session!"); + } else { + try { + session->second->attach(link); + QPID_LOG_CAT(debug, protocol, id << " link " << link << " attached on " << pn_link_session(link)); + } catch (const Exception& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } catch (const std::exception& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } + } + } +} + +// the peer has issued a Detach performative +void Connection::doLinkRemoteClose(pn_link_t *link) +{ + if ((pn_link_state(link) & PN_LOCAL_CLOSED) == 0) { + pn_link_close(link); + Sessions::iterator session = sessions.find(pn_link_session(link)); + if (session == sessions.end()) { + QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); + } else { + session->second->detach(link); + QPID_LOG_CAT(debug, model, id << " link detached"); + } + } +} + +// the status of the delivery has changed +void Connection::doDeliveryUpdated(pn_delivery_t *delivery) +{ + pn_link_t* link = pn_delivery_link(delivery); + try { + if (pn_link_is_receiver(link)) { + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { + i->second->readable(link, delivery); + } else { + pn_delivery_update(delivery, PN_REJECTED); + } + } else { //i.e. SENDER + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { + QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); + i->second->writable(link, delivery); + } else { + QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); + } + } + } catch (const Exception& e) { + QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index 17c5b0ecf0..ea4ce06163 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -31,6 +31,9 @@ struct pn_connection_t; struct pn_session_t; struct pn_transport_t; +struct pn_collector_t; +struct pn_link_t; +struct pn_delivery_t; namespace qpid { namespace sys { @@ -69,6 +72,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions; pn_connection_t* connection; pn_transport_t* transport; + pn_collector_t* collector; qpid::sys::OutputControl& out; const std::string id; bool haveOutput; @@ -86,6 +90,17 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man void open(); void readPeerProperties(); void closedByManagement(); + + private: + // handle Proton engine events + void doConnectionRemoteOpen(); + void doConnectionRemoteClose(); + void doSessionRemoteOpen(pn_session_t *session); + void doSessionRemoteClose(pn_session_t *session); + void doLinkRemoteOpen(pn_link_t *link); + void doLinkRemoteClose(pn_link_t *link); + void doDeliveryUpdated(pn_delivery_t *delivery); + }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp index cc714c0730..c2d4782fc4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp @@ -140,6 +140,10 @@ class InterconnectFactory : public BrokerContext, public qpid::sys::ConnectionCo boost::shared_ptr<Domain>, BrokerContext&, boost::shared_ptr<Relay>); qpid::sys::ConnectionCodec* create(const framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + qpid::framing::ProtocolVersion supportedVersion() const + { + return qpid::framing::ProtocolVersion(1, 0); + } bool connect(); void failed(int, std::string); private: diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index ce4c73dead..d4f73fc511 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -57,7 +57,7 @@ uint32_t Incoming::getCredit() return credit;//TODO: proper flow control } -void Incoming::detached() +void Incoming::detached(bool /*closed*/) { } diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h index 1a7064337d..ccf999a256 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h @@ -42,7 +42,7 @@ class Incoming : public ManagedIncomingLink virtual ~Incoming(); virtual bool doWork();//do anything that requires output virtual bool haveWork();//called when handling input to see whether any output work is needed - virtual void detached(); + virtual void detached(bool closed); virtual void readable(pn_delivery_t* delivery) = 0; void verify(const std::string& userid, const std::string& defaultRealm); void wakeup(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 54993d071e..0136d5a0ed 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -32,6 +32,7 @@ #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" +#include "config.h" namespace qpid { namespace broker { @@ -156,7 +157,7 @@ bool OutgoingFromQueue::canDeliver() return deliveries[current].delivery == 0 && pn_link_credit(link); } -void OutgoingFromQueue::detached() +void OutgoingFromQueue::detached(bool closed) { QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName()); queue->cancel(shared_from_this()); @@ -164,12 +165,14 @@ void OutgoingFromQueue::detached() for (size_t i = 0 ; i < deliveries.capacity(); ++i) { if (deliveries[i].msg) queue->release(deliveries[i].cursor, true); } - if (exclusive) queue->releaseExclusiveOwnership(); - else if (isControllingUser) queue->releaseFromUse(true); + if (exclusive) { + queue->releaseExclusiveOwnership(closed); + } else if (isControllingUser) { + queue->releaseFromUse(true); + } cancelled = true; } - OutgoingFromQueue::~OutgoingFromQueue() { if (!cancelled && isControllingUser) queue->releaseFromUse(true); @@ -283,7 +286,11 @@ qpid::broker::OwnershipToken* OutgoingFromQueue::getSession() OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0) { +#ifdef NO_PROTON_DELIVERY_TAG_T + tag.start = tagData; +#else tag.bytes = tagData; +#endif tag.size = TAG_WIDTH; } void OutgoingFromQueue::Record::init(size_t i) @@ -304,7 +311,11 @@ void OutgoingFromQueue::Record::reset() size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t) { assert(t.size == TAG_WIDTH); +#ifdef NO_PROTON_DELIVERY_TAG_T + qpid::framing::Buffer buffer(const_cast<char*>(t.start)/*won't ever be written to*/, t.size); +#else qpid::framing::Buffer buffer(const_cast<char*>(t.bytes)/*won't ever be written to*/, t.size); +#endif return (size_t) buffer.getLong(); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 27d8205fc8..d3825d0894 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -70,7 +70,7 @@ class Outgoing : public ManagedOutgoingLink /** * Signals that this link has been detached */ - virtual void detached() = 0; + virtual void detached(bool closed) = 0; /** * Called when a delivery is writable */ @@ -98,7 +98,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public void write(const char* data, size_t size); void handle(pn_delivery_t* delivery); bool canDeliver(); - void detached(); + void detached(bool closed); //Consumer interface: bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index 2ea381e2bc..621f25f04b 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -73,6 +73,7 @@ class ProtocolImpl : public BrokerContext, public Protocol qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&); boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&); + qpid::framing::ProtocolVersion supportedVersion() const; private: }; @@ -158,5 +159,10 @@ boost::shared_ptr<RecoverableMessage> ProtocolImpl::recover(qpid::framing::Buffe } } +qpid::framing::ProtocolVersion ProtocolImpl::supportedVersion() const +{ + return qpid::framing::ProtocolVersion(1,0); +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp index 83b3e64ee6..587a11466a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp @@ -23,6 +23,7 @@ #include "qpid/log/Statement.h" #include <algorithm> #include <string.h> +#include "config.h" namespace qpid { namespace broker { @@ -126,7 +127,13 @@ bool OutgoingFromRelay::doWork() { relay->check(); relay->setCredit(pn_link_credit(link)); - return relay->send(link); + bool worked = relay->send(link); + pn_delivery_t *d = pn_link_current(link); + if (d && pn_delivery_writable(d)) { + handle(d); + return true; + } + return worked; } /** * Called when a delivery is writable @@ -163,7 +170,7 @@ void OutgoingFromRelay::handle(pn_delivery_t* delivery) /** * Signals that this link has been detached */ -void OutgoingFromRelay::detached() +void OutgoingFromRelay::detached(bool /*closed*/) { relay->detached(this); } @@ -221,7 +228,7 @@ uint32_t IncomingToRelay::getCredit() return relay->getCredit(); } -void IncomingToRelay::detached() +void IncomingToRelay::detached(bool /*closed*/) { relay->detached(this); } @@ -238,7 +245,11 @@ void BufferedTransfer::initIn(pn_link_t* link, pn_delivery_t* d) //copy delivery tag pn_delivery_tag_t dt = pn_delivery_tag(d); tag.resize(dt.size); +#ifdef NO_PROTON_DELIVERY_TAG_T + ::memmove(&tag[0], dt.start, dt.size); +#else ::memmove(&tag[0], dt.bytes, dt.size); +#endif //set context pn_delivery_set_context(d, this); @@ -258,7 +269,11 @@ bool BufferedTransfer::settle() void BufferedTransfer::initOut(pn_link_t* link) { pn_delivery_tag_t dt; +#ifdef NO_PROTON_DELIVERY_TAG_T + dt.start = &tag[0]; +#else dt.bytes = &tag[0]; +#endif dt.size = tag.size(); out.handle = pn_delivery(link, dt); //set context diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.h b/qpid/cpp/src/qpid/broker/amqp/Relay.h index ef700690fd..32f317bfe1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.h +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.h @@ -100,7 +100,7 @@ class OutgoingFromRelay : public Outgoing const std::string& target, const std::string& name, boost::shared_ptr<Relay>); bool doWork(); void handle(pn_delivery_t* delivery); - void detached(); + void detached(bool closed); void init(); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); @@ -118,7 +118,7 @@ class IncomingToRelay : public Incoming bool settle(); bool doWork(); bool haveWork(); - void detached(); + void detached(bool closed); void readable(pn_delivery_t* delivery); uint32_t getCredit(); private: diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 85da8c0c21..538883f29a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -48,6 +48,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" +#include "config.h" #include <boost/intrusive_ptr.hpp> #include <boost/format.hpp> #include <map> @@ -505,10 +506,11 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (!settings.autodelete) settings.autodelete = autodelete; altExchange = node.topic->getAlternateExchange(); } - if (!settings.autoDeleteDelay) { + if (settings.original.find("qpid.auto_delete_timeout") == settings.original.end()) { //only use delay from link if policy didn't specify one settings.autoDeleteDelay = pn_terminus_get_timeout(source); - settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay; + if (settings.autoDeleteDelay) + settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay; } if (settings.autoDeleteDelay) { settings.autodelete = true; @@ -577,7 +579,7 @@ void Session::detach(pn_link_t* link) if (pn_link_is_sender(link)) { OutgoingLinks::iterator i = outgoing.find(link); if (i != outgoing.end()) { - i->second->detached(); + i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/); boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get()); if (q && !q->isAutoDelete() && !q->isDeleted()) { connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId()); @@ -588,7 +590,7 @@ void Session::detach(pn_link_t* link) } else { IncomingLinks::iterator i = incoming.find(link); if (i != incoming.end()) { - i->second->detached(); + i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/); incoming.erase(i); QPID_LOG(debug, "Incoming link detached"); } @@ -615,7 +617,11 @@ void Session::accepted(pn_delivery_t* delivery, bool sync) void Session::readable(pn_link_t* link, pn_delivery_t* delivery) { pn_delivery_tag_t tag = pn_delivery_tag(delivery); +#ifdef NO_PROTON_DELIVERY_TAG_T + QPID_LOG(debug, "received delivery: " << std::string(tag.start, tag.size)); +#else QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size)); +#endif incomingMessageReceived(); IncomingLinks::iterator target = incoming.find(link); if (target == incoming.end()) { @@ -653,7 +659,7 @@ bool Session::dispatch() pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(s->first); - s->second->detached(); + s->second->detached(true); outgoing.erase(s++); output = true; } @@ -678,7 +684,7 @@ bool Session::dispatch() pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(i->first); - i->second->detached(); + i->second->detached(true); incoming.erase(i++); output = true; } @@ -690,10 +696,10 @@ bool Session::dispatch() void Session::close() { for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { - i->second->detached(); + i->second->detached(false); } for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) { - i->second->detached(); + i->second->detached(false); } outgoing.clear(); incoming.clear(); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index bbb310b0f4..0225ee74cb 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -55,6 +55,7 @@ using qpid::messaging::AssertionFailed; using qpid::framing::ExchangeBoundResult; using qpid::framing::ExchangeQueryResult; using qpid::framing::FieldTable; +using qpid::framing::FieldValue; using qpid::framing::QueueQueryResult; using qpid::framing::ReplyTo; using qpid::framing::Uuid; @@ -140,6 +141,11 @@ const std::string PREFIX_AMQ("amq."); const std::string PREFIX_QPID("qpid."); const Verifier verifier; + +bool areEquivalent(const FieldValue& a, const FieldValue& b) +{ + return ((a == b) || (a.convertsTo<int64_t>() && b.convertsTo<int64_t>() && a.get<int64_t>() == b.get<int64_t>())); +} } struct Binding @@ -534,19 +540,19 @@ Subscription::Subscription(const Address& address, const std::string& type) reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)), actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), - autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(true)), + autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(!(durable || reliable))), exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)), alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str()) { - const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; - if (timeout) { + + if ((Opt(address)/LINK).hasKey(TIMEOUT)) { + const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32()); - } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) { - //if durable but not explicitly reliable, then set a non-zero - //default for the autodelete timeout (previously this would - //have defaulted to autodelete immediately anyway, so the risk - //of the change causing problems is mitigated) - queueOptions.setInt("qpid.auto_delete_delay", 15*60); + } else if (durable && !reliable && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) { + //if durable but not reliable, and auto-delete not + //explicitly set, then set a non-zero default for the + //autodelete timeout + queueOptions.setInt("qpid.auto_delete_timeout", 2*60); } (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); @@ -609,7 +615,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str //create subscription queue: session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, - arg::autoDelete=autoDeleteQueue && (!(durable || reliable)), arg::durable=durable, + arg::autoDelete=autoDeleteQueue, arg::durable=durable, arg::alternateExchange=alternateExchange, arg::arguments=queueOptions); //'default' binding: @@ -806,7 +812,7 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (*i->second != *v) { + } else if (!areEquivalent(*i->second, *v)) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } @@ -906,7 +912,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (*i->second != *v) { + } else if (!areEquivalent(*i->second, *v)) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp index 72fcd8a9e2..6510842c58 100644 --- a/qpid/cpp/src/qpid/framing/SequenceSet.cpp +++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp @@ -33,7 +33,18 @@ namespace framing { namespace { //each range contains 2 numbers, 4 bytes each -uint16_t RANGE_SIZE = 2 * 4; +uint16_t RANGE_SIZE = 2 * 4; +int32_t MAX_RANGE = 2147483647;//2^31-1 + +int32_t gap(const SequenceNumber& a, const SequenceNumber& b) +{ + return a < b ? b - a : a - b; +} + +bool is_max_range(const SequenceNumber& a, const SequenceNumber& b) +{ + return gap(a, b) == MAX_RANGE; +} } void SequenceSet::encode(Buffer& buffer) const @@ -54,7 +65,21 @@ void SequenceSet::decode(Buffer& buffer) throw IllegalArgumentException(QPID_MSG("Invalid size for sequence set: " << size)); for (uint16_t i = 0; i < count; i++) { - add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong())); + SequenceNumber a(buffer.getLong()); + SequenceNumber b(buffer.getLong()); + if (b < a) + throw IllegalArgumentException(QPID_MSG("Invalid range in sequence set: " << a << " -> " << b)); + if (is_max_range(a, b)) { + //RangeSet holds 'half-closed' ranges, where the end is + //one past the 'highest' value in the range. So if the + //range is already the maximum expressable with a 32bit + //sequence number, we can't represent it as a + //'half-closed' range, so we represent it as two ranges. + add(a, b-1); + add(b); + } else { + add(a, b); + } } } diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 0e87346ac1..870e4723b2 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -482,6 +482,7 @@ shared_ptr<PrimaryTxObserver> Primary::makeTxObserver( { shared_ptr<PrimaryTxObserver> observer = PrimaryTxObserver::create(*this, haBroker, txBuffer); + sys::Mutex::ScopedLock l(lock); txMap[observer->getTxQueue()->getName()] = observer; return observer; } diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 2d5a389615..4023ba9629 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -25,7 +25,7 @@ Current/pending: ------ ------- ---------------------- 5359 - Linearstore: Implement new management schema and wire into store 5360 - Linearstore: Evaluate and rework logging to produce a consistent log output - 5361 1145359 Linearstore: No tests for linearstore functionality currently exist +* 5361 1145359 Linearstore: No tests for linearstore functionality currently exist svn r.1564893 2014-02-05: Added tx-test-soak.sh svn r.1564935 2014-02-05: Added license text to tx-test-soak.sh svn r.1625283 2014-09-16: Basic python tests from legacystore ported over to linearstore @@ -37,15 +37,22 @@ Current/pending: ** Basic performance tests 5464 - [linearstore] Incompletely created journal files accumulate in EFP - 1088944 [Linearstore] store does not return all files to EFP after purging big queue <queue purge issue> - 6043 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation +* - 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation - 1067480 [LinearStore] Provide a way to limit max count/size of empty files in EFP - 1067429 [LinearStore] last file from deleted queue is not moved to EFP <queue delete issue> - - 1067482 [LinearStore] Provide a way to prealocate empty pages in EFP -* 5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs - svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks - svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new - 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal - svn r.1643053 2014-11-18: Proposed fix + - 1067482 [LinearStore] Provide a way to preallocate empty pages in EFP +* 6303 1180660 [linearstore] Roll back auto-upgrade of store directory structure +* 5362 1145363 Linearstore: No store tools exist for examining the journals + svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up. + svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze + svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze + svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze + svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze + svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze + * Store analysis and status + * Recovery/reading of message content + * Empty file pool status and management + @@ -115,16 +122,6 @@ NO-JIRA - Added missing Apache copyright/license text 5651 - [C++ broker] segfault in qpid::linearstore::journal::jdir::clear_dir when declaring durable queue svn r.1582730 2014-03-28 Proposed fix by Pavel Moravec * Bug introduced by r.1578899. - 5362 1145363 Linearstore: No store tools exist for examining the journals - svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up. - svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze - svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze - svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze - svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze - svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze - * Store analysis and status - * Recovery/reading of message content - * Empty file pool status and management 5661 - [linearstore] Set default cmake build to exclude linearstore svn r.1584379 2014-04-03 Proposed solution. * Run ccmake, select BUILD_LINEARSTORE to change its value to ON to build. @@ -147,8 +144,16 @@ NO-JIRA - Added missing Apache copyright/license text svn r.1631360 2014-10-13 Proposed solution 6157 1150397 linearstore: segfault when 2 journals request new journal file from empty EFP svn r.1632504 2014-10-17 Proposed solution by pmoravec - 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute + 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal + svn r.1643053 2014-11-18: Proposed fix + 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute svn r.1641689 2014-11-25 Proposed solution + 5671 1160367 [linearstore] Add ability to use disk partitions and select per-queue EFPs + svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks + svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new + svn r.1649081 2015-01-02: WIP: Specify new queue using qpid-config --durable together with --efp-partition-num and/or --efp-pool-file-size. Needs testing. + - 1148807 [linearstore] Restarting broker with empty journal raises confusing warning + Fixed by svn r.1649081 of bug 5671 / 1160367 above Ordered checkin list: @@ -187,14 +192,17 @@ no. svn r Q-JIRA RHBZ Date Alt Committer 28. 1596509 5767 1098118 2014-05-21 0.22-mrg (pmoravec) 29. 1596633 NO-JIRA 1078937 2014-05-21 (includes tools install update) 30. 1599243 5767 1098118 2014-06-02 0.22-mrg -30. 1599243 5767 1098118 2014-06-02 -31. 1614665 5924 1124906 2014-07-30 -32. 1620426 6043 1089652 2014-08-25 -33. 1631360 6147 1152012 2014-10-13 (pmoravec) -34. 1632504 6157 1150397 2014-10-17 (pmoravec) -35. 1636598 5671 2014-11-04 -36. 1637985 5671 2014-11-10 -37. 1641689 6248 1167911 2014-11-25 +31. 1599243 5767 1098118 2014-06-02 +32. 1614665 5924 1124906 2014-07-30 +33. 1620426 6043 1089652 2014-08-25 +34. 1631360 6147 1152012 2014-10-13 (pmoravec) +35. 1632504 6157 1150397 2014-10-17 (pmoravec) +36. 1636598 5671 1160367 2014-11-04 +37. 1637985 5671 1160367 2014-11-10 +38. 1643053 6230 1165200 2014-11-18 +39. 1641689 6248 1167911 2014-11-25 +40. 1649081 5671 1160367 2015-01-02 +41. 1649082 NO-JIRA - 2015-01-02 See above sections for details on these checkins. diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 77d5703636..70eac27f48 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -148,7 +148,7 @@ void MessageStoreImpl::initManagement () mgmtObject = qmf::org::apache::qpid::linearstore::Store::shared_ptr ( new qmf::org::apache::qpid::linearstore::Store(agent, this, broker)); - mgmtObject->set_location(storeDir); + mgmtObject->set_storeDir(storeDir); mgmtObject->set_tplIsInitialized(false); mgmtObject->set_tplDirectory(getTplBaseDir()); mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * QLS_SBLK_SIZE_BYTES); @@ -406,7 +406,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, if (queue_.getName().size() == 0) { - QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); + QLS_LOG(error, "Cannot create store for empty (null) queue name - queue create ignored."); return; } @@ -449,15 +449,15 @@ qpid::linearstore::journal::EmptyFilePool* MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { qpid::framing::FieldTable::ValuePtr value; qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; - value = args_.get("qpid.efp_partition"); + value = args_.get("qpid.efp_partition_num"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { - localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition"); + localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition_num"); } qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib; - value = args_.get("qpid.efp_file_size"); + value = args_.get("qpid.efp_pool_file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { - localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" ); + localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(), "qpid.efp_pool_file_size"); } return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib); } @@ -1488,21 +1488,21 @@ std::string MessageStoreImpl::getStoreTopLevelDir() { std::string MessageStoreImpl::getJrnlBaseDir() { std::ostringstream dir; - dir << storeDir << "/" << storeTopLevelDir << "/jrnl/" ; + dir << storeDir << "/" << storeTopLevelDir << "/jrnl2/" ; return dir.str(); } std::string MessageStoreImpl::getBdbBaseDir() { std::ostringstream dir; - dir << storeDir << "/" << storeTopLevelDir << "/dat/" ; + dir << storeDir << "/" << storeTopLevelDir << "/dat2/" ; return dir.str(); } std::string MessageStoreImpl::getTplBaseDir() { std::ostringstream dir; - dir << storeDir << "/" << storeTopLevelDir << "/tpl/" ; + dir << storeDir << "/" << storeTopLevelDir << "/tpl2/" ; return dir.str(); } diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp index 18f4d3afc3..08db3f75bd 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp @@ -59,10 +59,16 @@ EmptyFilePool::EmptyFilePool(const std::string& efpDirectory, EmptyFilePool::~EmptyFilePool() {} void EmptyFilePool::initialize() { -//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition " << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ << std::endl; // DEBUG - std::vector<std::string> dirList; + if (::mkdir(efpDirectory_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { // Create EFP dir if it does not yet exist + if (errno != EEXIST) { + std::ostringstream oss; + oss << "directory=" << efpDirectory_ << " " << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_EFP_MKDIR, oss.str(), "EmptyFilePool", "initialize"); + } + } // Process empty files in main dir + std::vector<std::string> dirList; jdir::read_dir(efpDirectory_, dirList, false, true, false, false); for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { size_t dotPos = i->rfind("."); @@ -122,14 +128,14 @@ const efpIdentity_t EmptyFilePool::getIdentity() const { std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { std::string emptyFileName = popEmptyFile(); - std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); + std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(emptyFileName, newFileName)) { + if (!moveFile(emptyFileName, newFileName)) { // Try again with new UUID for file name newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName(); - if (moveFile(emptyFileName, newFileName)) { + if (!moveFile(emptyFileName, newFileName)) { //std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG - pushEmptyFile(emptyFileName); + pushEmptyFile(emptyFileName); // Return empty file to pool std::ostringstream oss; oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile"); @@ -138,7 +144,7 @@ std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { if (createSymLink(newFileName, symlinkName)) { std::ostringstream oss; oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile"); } return symlinkName; } @@ -189,12 +195,27 @@ efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirN } // --- protected functions --- +void EmptyFilePool::checkIosState(std::ofstream& ofs, + const uint32_t jerrno, + const std::string& fqFileName, + const std::string& operation, + const std::string& errorMessage, + const std::string& className, + const std::string& fnName) { + if (!ofs.good()) { + if (ofs.is_open()) { + ofs.close(); + } + std::ostringstream oss; + oss << "IO failure: eofbit=" << (ofs.eof()?"T":"F") << " failbit=" << (ofs.fail()?"T":"F") << " badbit=" + << (ofs.bad()?"T":"F") << " file=" << fqFileName << " operation=" << operation << ": " << errorMessage; + throw jexception(jerrno, oss.str(), className, fnName); + } +} std::string EmptyFilePool::createEmptyFile() { std::string efpfn = getEfpFileName(); - if (!overwriteFileContents(efpfn)) { - // TODO: handle failure to prepare new file here - } + overwriteFileContents(efpfn); return efpfn; } @@ -226,24 +247,20 @@ void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) { } } -bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) { +void EmptyFilePool::overwriteFileContents(const std::string& fqFileName) { ::file_hdr_t fh; ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_); std::ofstream ofs(fqFileName.c_str(), std::ofstream::out | std::ofstream::binary); - if (ofs.good()) { - ofs.write((char*)&fh, sizeof(::file_hdr_t)); - uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); - while (rem--) - ofs.put('\0'); - ofs.close(); - return true; -//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG - } else { - std::ostringstream oss; - oss << "std::ofstream ofs: file=\"" << fqFileName.c_str() << "\"" << " failed to be open"; - throw jexception(jerrno::JERR_EFP_FOPEN, oss.str(), "EmptyFilePool", "overwriteFileContents"); + checkIosState(ofs, jerrno::JERR_EFP_FOPEN, fqFileName, "constructor", "Failed to create file", "EmptyFilePool", "overwriteFileContents"); + ofs.write((char*)&fh, sizeof(::file_hdr_t)); + checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "write()", "Failed to write header", "EmptyFilePool", "overwriteFileContents"); + uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); + while (rem--) { + ofs.put('\0'); + checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "put()", "Failed to put \0", "EmptyFilePool", "overwriteFileContents"); } - return false; + ofs.close(); +//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG } std::string EmptyFilePool::popEmptyFile() { @@ -271,7 +288,7 @@ void EmptyFilePool::pushEmptyFile(const std::string fqFileName) { void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) { std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(emptyFileName, returnedFileName)) { + if (!moveFile(emptyFileName, returnedFileName)) { ::unlink(emptyFileName.c_str()); //std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG } @@ -283,7 +300,7 @@ void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) { overwriteFileContents(returnedFileName); } std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(returnedFileName, sanitizedEmptyFileName)) { + if (!moveFile(returnedFileName, sanitizedEmptyFileName)) { ::unlink(returnedFileName.c_str()); //std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG } else { @@ -395,18 +412,6 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const { return true; } -// static -int EmptyFilePool::moveFile(const std::string& from, - const std::string& to) { - if (::rename(from.c_str(), to.c_str())) { - if (errno == EEXIST) return errno; // File name exists - std::ostringstream oss; - oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); - } - return 0; -} - //static int EmptyFilePool::createSymLink(const std::string& fqFileName, const std::string& fqLinkName) { @@ -414,7 +419,7 @@ int EmptyFilePool::createSymLink(const std::string& fqFileName, if (errno == EEXIST) return errno; // File name exists std::ostringstream oss; oss << "file=\"" << fqFileName << "\" symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "createSymLink"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "createSymLink"); } return 0; } @@ -426,7 +431,7 @@ std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) { if (len < 0) { std::ostringstream oss; oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink"); } ::unlink(fqLinkName.c_str()); return std::string(buff, len); @@ -455,4 +460,18 @@ bool EmptyFilePool::isSymlink(const std::string& fqName) { } +// static +bool EmptyFilePool::moveFile(const std::string& from, + const std::string& to) { + if (::rename(from.c_str(), to.c_str())) { + if (errno == EEXIST) { + return false; // File name exists + } + std::ostringstream oss; + oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); + } + return true; +} + }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h index 1a1264fa26..dc567ff917 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h @@ -87,23 +87,30 @@ public: const efpPartitionNumber_t partitionNumber); protected: + void checkIosState(std::ofstream& ofs, + const uint32_t jerrno, + const std::string& fqFileName, + const std::string& operation, + const std::string& errorMessage, + const std::string& className, + const std::string& fnName); std::string createEmptyFile(); std::string getEfpFileName(); void initializeSubDirectory(const std::string& fqDirName); - bool overwriteFileContents(const std::string& fqFileName); + void overwriteFileContents(const std::string& fqFileName); std::string popEmptyFile(); void pushEmptyFile(const std::string fqFileName); void returnEmptyFile(const std::string& emptyFileName); void resetEmptyFileHeader(const std::string& fqFileName); bool validateEmptyFile(const std::string& emptyFileName) const; - static int moveFile(const std::string& fromFqPath, - const std::string& toFqPath); static int createSymLink(const std::string& fqFileName, const std::string& fqLinkName); static std::string deleteSymlink(const std::string& fqLinkName); static bool isFile(const std::string& fqName); static bool isSymlink(const std::string& fqName); + static bool moveFile(const std::string& fromFqPath, + const std::string& toFqPath); }; }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp index 28e1b0b56e..a02679736e 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp @@ -74,6 +74,7 @@ void EmptyFilePoolManager::findEfpPartitions() { if (!foundPartition) { std::ostringstream oss1; oss1 << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_) + << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_); jdir::create_dir(oss1.str()); insertPartition(defaultPartitionNumber_, oss1.str()); @@ -165,9 +166,10 @@ EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIde EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpDataSize_kib_t efpDataSize_kib) { EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber > 0 ? partitionNumber : defaultPartitionNumber_); - if (efppp != 0) - return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_); - return 0; + if (efppp == 0) { + return 0; + } + return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_, true); } void EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp index a31855e0d8..12d2db74b8 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp @@ -32,6 +32,9 @@ namespace qpid { namespace linearstore { namespace journal { +// static +const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition + EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum, const std::string& partitionDir, const bool overwriteBeforeReturnFlag, @@ -57,72 +60,31 @@ EmptyFilePoolPartition::~EmptyFilePoolPartition() { void EmptyFilePoolPartition::findEmptyFilePools() { //std::cout << "*** EmptyFilePoolPartition::findEmptyFilePools(): Reading " << partitionDir_ << std::endl; // DEBUG - std::vector<std::string> dirList; - bool upgradeDirStructureFlag = false; - std::string oldPartitionDir; - jdir::read_dir(partitionDir_, dirList, true, false, false, false); -//std::cout << "*** dirList.size()=" << dirList.size() << "; dirList.front()=" << dirList.front() << std::endl; // DEBUG - if (dirList.size() == 1 && dirList.front().compare("efp") == 0) { - upgradeDirStructureFlag = true; - oldPartitionDir = partitionDir_ + "/efp"; -//std::cout << "*** oldPartitionDir=" << oldPartitionDir << std::endl; // DEBUG - dirList.clear(); - jdir::read_dir(oldPartitionDir, dirList, true, false, false, false); - } + std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_); + if (jdir::is_dir(efpDir)) { + std::vector<std::string> dirList; + jdir::read_dir(efpDir, dirList, true, false, false, true); for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { - std::string fqFileName(partitionDir_ + "/" + *i); - if (upgradeDirStructureFlag) { - std::string fqOldFileName(partitionDir_ + "/efp/" + *i); - if (::rename(fqOldFileName.c_str(), fqFileName.c_str())) { - // File move failed - std::ostringstream oss; - oss << "File \'" << fqOldFileName << "\' could not be renamed to \'" << fqFileName << "\' (" << FORMAT_SYSERR(errno) << "); file deleted"; - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - if (::unlink(fqOldFileName.c_str())) { - std::ostringstream oss; - oss << "File \'" << fqOldFileName << "\' could not be deleted (" << FORMAT_SYSERR(errno) << "\'; file orphaned"; - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } - } - } - EmptyFilePool* efpp = 0; - try { - efpp = new EmptyFilePool(fqFileName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); - { - slock l(efpMapMutex_); - efpMap_[efpp->dataSize_kib()] = efpp; - } - } - catch (const std::exception& e) { - if (efpp != 0) { - delete efpp; - efpp = 0; - } - std::ostringstream oss; - oss << "EmptyFilePool create failed: " << e.what(); - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } - if (efpp != 0) { - efpp->initialize(); - } - } - if (upgradeDirStructureFlag) { - std::string oldEfpDir(partitionDir_ + "/efp"); - if (::rmdir(oldEfpDir.c_str())) { - // Unable to delete old "efp" dir - std::ostringstream oss; - oss << "Unable to delete old EFP directory \'" << oldEfpDir << "\' (" << FORMAT_SYSERR(errno) << "\'; directory orphaned"; - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } + createEmptyFilePool(*i); } + } else { + std::ostringstream oss; + oss << "Partition \"" << partitionDir_ << "\" does not contain top level EFP dir \"" << s_efpTopLevelDir_ << "\""; + journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); + } } -EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) { - slock l(efpMapMutex_); - efpMapItr_t i = efpMap_.find(efpDataSize_kib); - if (i == efpMap_.end()) - return 0; - return i->second; +EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent) { + { + slock l(efpMapMutex_); + efpMapItr_t i = efpMap_.find(efpDataSize_kib); + if (i != efpMap_.end()) + return i->second; + } + if (createIfNonExistent) { + return createEmptyFilePool(efpDataSize_kib); + } + return 0; } void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) { @@ -183,7 +145,7 @@ std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNu //static efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::string& name) { if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && ::isdigit(name[2]) && ::isdigit(name[3])) { - long pn = ::strtol(name.c_str() + 1, 0, 0); + long pn = ::strtol(name.c_str() + 1, 0, 10); if (pn == 0 && errno) { return 0; } else { @@ -195,12 +157,42 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::strin // --- protected functions --- +EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) { + std::string fqEfpDirectoryName(partitionDir_ + "/" + EmptyFilePoolPartition::s_efpTopLevelDir_ + "/" + EmptyFilePool::dirNameFromDataSize(efpDataSize_kib)); + return createEmptyFilePool(fqEfpDirectoryName); +} + +EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const std::string fqEfpDirectoryName) { + EmptyFilePool* efpp = 0; + try { + efpp = new EmptyFilePool(fqEfpDirectoryName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); + { + slock l(efpMapMutex_); + efpMap_[efpp->dataSize_kib()] = efpp; + } + } + catch (const std::exception& e) { + if (efpp != 0) { + delete efpp; + efpp = 0; + } + std::ostringstream oss; + oss << "EmptyFilePool create failed: " << e.what(); + journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); + } + if (efpp != 0) { + efpp->initialize(); + } + return efpp; +} + void EmptyFilePoolPartition::validatePartitionDir() { + std::ostringstream ss; if (!jdir::is_dir(partitionDir_)) { - std::ostringstream ss; ss << "Invalid partition directory: \'" << partitionDir_ << "\' is not a directory"; throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir"); } + // TODO: other validity checks here } diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h index c653c6be6a..570e2b073f 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h @@ -37,6 +37,8 @@ class JournalLog; class EmptyFilePoolPartition { +public: + static const std::string s_efpTopLevelDir_; protected: typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t; typedef efpMap_t::iterator efpMapItr_t; @@ -59,7 +61,7 @@ public: virtual ~EmptyFilePoolPartition(); void findEmptyFilePools(); - EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib); + EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent); void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList); void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const; std::string getPartitionDirectory() const; @@ -70,6 +72,8 @@ public: static efpPartitionNumber_t getPartitionNumber(const std::string& name); protected: + EmptyFilePool* createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib); + EmptyFilePool* createEmptyFilePool(const std::string fqEfpDirectoryName); void validatePartitionDir(); }; diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 3f39913422..73a16f01b7 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -43,6 +43,7 @@ #include "qpid/linearstore/journal/utils/file_hdr.h" #include <sstream> #include <string> +#include <unistd.h> #include <vector> namespace qpid { @@ -101,7 +102,11 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr analyzeJournalFileHeaders(efpIdentity); if (journalEmptyFlag_) { - *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP + if (uninitFileList_.empty()) { + *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP + } else { + *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); + } } else { *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); if (! *emptyFilePoolPtrPtr) { @@ -294,6 +299,7 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, LinearFileController* lfcPtr) { if (journalEmptyFlag_) { if (uninitFileList_.size() > 0) { + // TODO: Handle case if uninitFileList_.size() > 1, but this should not happen in normal operation. Here we assume only one item in the list. std::string uninitFile = uninitFileList_.back(); uninitFileList_.pop_back(); lfcPtr->restoreEmptyFile(uninitFile); @@ -377,11 +383,28 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { jdir::read_dir(journalDirectory_, directoryList, false, true, false, true); for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { bool hdrOk = readJournalFileHeader(*i, fileHeader, headerQueueName); - if (!hdrOk || headerQueueName.empty()) { + bool hdrEmpty = ::is_file_hdr_reset(&fileHeader); + if (!hdrOk) { std::ostringstream oss; - oss << "Journal file " << (*i) << " is uninitialized or corrupted"; + oss << "Journal file " << (*i) << " is corrupted or invalid"; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); + } else if (hdrEmpty) { + // Read symlink, find efp directory name which is efp size in KiB + // TODO: place this bit into a common function as it is also used in EmptyFilePool.cpp::deleteSymlink() + char buff[1024]; + ssize_t len = ::readlink((*i).c_str(), buff, 1024); + if (len < 0) { + std::ostringstream oss; + oss << "symlink=\"" << (*i) << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "RecoveryManager", "analyzeJournalFileHeaders"); + } + // Find second and third '/' from back of string, which contains the EFP directory name + *(::strrchr(buff, '/')) = '\0'; + *(::strrchr(buff, '/')) = '\0'; + int efpDataSize_kib = atoi(::strrchr(buff, '/') + 1); uninitFileList_.push_back(*i); + efpIdentity.pn_ = fileHeader._efp_partition; + efpIdentity.ds_ = efpDataSize_kib; } else if (headerQueueName.compare(queueName_) != 0) { std::ostringstream oss; oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; @@ -406,6 +429,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { } } +//std::cerr << "*** RecoveryManager::analyzeJournalFileHeaders() fileNumberMap_.size()=" << fileNumberMap_.size() << std::endl; // DEBUG if (fileNumberMap_.empty()) { journalEmptyFlag_ = true; } else { @@ -905,7 +929,9 @@ bool RecoveryManager::readJournalFileHeader(const std::string& journalFileName, } ifs.close(); ::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t)); - if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) return false; + if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) { + return false; + } queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len); return true; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp index 9d59039220..ce88e7809c 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp @@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108; const uint32_t jerrno::JERR__RECNFOUND = 0x0109; const uint32_t jerrno::JERR__NOTIMPL = 0x010a; const uint32_t jerrno::JERR__NULL = 0x010b; +const uint32_t jerrno::JERR__SYMLINK = 0x010c; // class jcntl const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200; @@ -112,10 +113,11 @@ const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02; const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03; const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04; const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05; -const uint32_t jerrno::JERR_EFP_SYMLINK = 0x0d06; -const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d07; -const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d08; -const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d09; +const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d06; +const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d07; +const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d08; +const uint32_t jerrno::JERR_EFP_FWRITE = 0x0d09; +const uint32_t jerrno::JERR_EFP_MKDIR = 0x0d0a; // Negative returns for some functions const int32_t jerrno::AIO_TIMEOUT = -1; @@ -140,6 +142,7 @@ jerrno::__init() _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found."; _err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented"; _err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer"; + _err_map[JERR__SYMLINK] = "JERR__SYMLINK: Symbolic link operation failed"; // class jcntl _err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal."; @@ -210,10 +213,11 @@ jerrno::__init() _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory"; _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size"; _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty"; - _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation failed"; _err_map[JERR_EFP_LSTAT] = "JERR_EFP_LSTAT: lstat() operation failed"; _err_map[JERR_EFP_BADFILETYPE] = "JERR_EFP_BADFILETYPE: File type incorrect for operation"; _err_map[JERR_EFP_FOPEN] = "JERR_EFP_FOPEN: Unable to fopen file for write"; + _err_map[JERR_EFP_FWRITE] = "JERR_EFP_FWRITE: Write failed"; + _err_map[JERR_EFP_MKDIR] = "JERR_EFP_MKDIR: Directory creation failed"; //_err_map[] = ""; diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h index 77b18b17e8..6e817682ca 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h @@ -60,6 +60,7 @@ namespace journal { static const uint32_t JERR__RECNFOUND; ///< Record not found static const uint32_t JERR__NOTIMPL; ///< Not implemented static const uint32_t JERR__NULL; ///< Operation on null pointer + static const uint32_t JERR__SYMLINK; ///< Symbolic Link operation failed // class jcntl static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal @@ -130,10 +131,11 @@ namespace journal { static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size static const uint32_t JERR_EFP_EMPTY; ///< EFP empty - static const uint32_t JERR_EFP_SYMLINK; ///< Symbolic Link operation failed static const uint32_t JERR_EFP_LSTAT; ///< lstat operation failed static const uint32_t JERR_EFP_BADFILETYPE; ///< Bad file type static const uint32_t JERR_EFP_FOPEN; ///< Unable to fopen file for write + static const uint32_t JERR_EFP_FWRITE; ///< Write failed + static const uint32_t JERR_EFP_MKDIR; ///< Directory creation failed // Negative returns for some functions static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml index a55883a255..ebd388593e 100644 --- a/qpid/cpp/src/qpid/linearstore/management-schema.xml +++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml @@ -21,38 +21,24 @@ <class name="Store"> <property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/> - <property name="location" type="sstr" access="RO" desc="Logical directory on disk"/> - <!--property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/--> - <!--property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/--> + <property name="storeDir" type="sstr" access="RO" desc="Logical directory on disk"/> <property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/> <property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/> <property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/> <property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/> - <!--property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/--> - <!--property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/--> - <!--property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/--> <statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/> <statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/> <statistic name="tplTxnCommits" type="count64" unit="record" desc="Total transaction commits on transaction prepared list"/> <statistic name="tplTxnAborts" type="count64" unit="record" desc="Total transaction aborts on transaction prepared list"/> - <statistic name="tplOutstandingAIOs" type="hilo32" unit="aio_op" desc="Deprecated"/> </class> <class name="Journal"> <property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/> - <property name="name" type="sstr" access="RC" index="y"/> + <property name="queueName" type="sstr" access="RC" index="y"/> <property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/> - <property name="baseFileName" type="sstr" access="RO" desc="Deprecated"/> <property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/> <property name="writePages" type="uint32" access="RO" unit="wpage" desc="Deprecated"/> - <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/> - <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Deprecated"/> - <!--property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/--> - <!--property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/--> - <!--property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/--> - <!--property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/--> - <!--property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/--> <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/> <statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/> @@ -64,36 +50,5 @@ <statistic name="txnAborts" type="count64" unit="record" desc="Total transactional abort records on journal"/> <statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/> -<!-- - The following are not yet "wired up" in JournalImpl.cpp ---> - <statistic name="freeFileCount" type="hilo32" unit="file" desc="Deprecated"/> - <statistic name="availableFileCount" type="hilo32" unit="file" desc="Deprecated"/> - <statistic name="writeWaitFailures" type="count64" unit="record" desc="Deprecated"/> - <statistic name="writeBusyFailures" type="count64" unit="record" desc="Deprecated"/> - <statistic name="readRecordCount" type="count64" unit="record" desc="Deprecated"/> - <statistic name="readBusyFailures" type="count64" unit="record" desc="Deprecated"/> - <statistic name="writePageCacheDepth" type="hilo32" unit="wpage" desc="Deprecated"/> - <statistic name="readPageCacheDepth" type="hilo32" unit="rpage" desc="Deprecated"/> - - <!--method name="expand" desc="Increase number of files allocated for this journal"> - <arg name="by" type="uint32" dir="I" desc="Number of files to increase journal size by"/> - </method--> </class> - - <eventArguments> - <!--arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/--> - <arg name="fileSize" type="uint32" desc="Journal file size in bytes"/> - <arg name="jrnlId" type="sstr" desc="Journal Id"/> - <arg name="numEnq" type="uint32" desc="Number of recovered enqueues"/> - <arg name="numFiles" type="uint16" desc="Number of journal files"/> - <arg name="numTxn" type="uint32" desc="Number of recovered transactions"/> - <arg name="numTxnDeq" type="uint32" desc="Number of recovered transactional dequeues"/> - <arg name="numTxnEnq" type="uint32" desc="Number of recovered transactional enqueues"/> - <arg name="what" type="sstr" desc="Description of event"/> - </eventArguments> - <event name="enqThresholdExceeded" sev="warn" args="jrnlId, what"/> - <event name="created" sev="notice" args="jrnlId, fileSize, numFiles"/> - <event name="full" sev="error" args="jrnlId, what"/> - <event name="recovered" sev="notice" args="jrnlId, fileSize, numFiles, numEnq, numTxn, numTxnEnq, numTxnDeq"/> </schema> diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 763deb33c6..7f19ca7ec0 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -242,7 +242,7 @@ bool replace(Variant::Map& map, const std::string& original, const std::string& } } -const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes +const uint32_t DEFAULT_DURABLE_TIMEOUT(2*60);//2 minutes const uint32_t DEFAULT_TIMEOUT(0); } @@ -267,7 +267,7 @@ AddressHelper::AddressHelper(const Address& address) : bind(link, RELIABILITY, reliability); durableNode = test(node, DURABLE); durableLink = test(link, DURABLE); - timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); + timeout = get(link, TIMEOUT, durableLink && reliability != AT_LEAST_ONCE ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); std::string mode; if (bind(address, MODE, mode)) { if (mode == BROWSE) { @@ -571,7 +571,8 @@ bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const bool AddressHelper::isUnreliable() const { - return reliability == AT_MOST_ONCE || reliability == UNRELIABLE; + return reliability == AT_MOST_ONCE || reliability == UNRELIABLE || + (reliability.empty() && browse); // A browser defaults to unreliable. } const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index 66aee1ae22..3ee58cad8d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -44,7 +44,6 @@ class AddressHelper const qpid::types::Variant::Map& getNodeProperties() const; bool getLinkSource(std::string& out) const; bool getLinkTarget(std::string& out) const; - bool getBrowse() const { return browse; } const qpid::types::Variant::Map& getLinkProperties() const; static std::string getLinkName(const Address& address); private: diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 9e3f95742b..fedab4286f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -292,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); encoded->init(impl); impl.setEncoded(encoded); - impl.setInternalId(ssn->record(current, lnk->getBrowse())); + impl.setInternalId(ssn->record(current)); pn_link_advance(lnk->receiver); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 454106149d..5e0707056f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -36,12 +36,10 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co address(a), helper(address), receiver(pn_receiver(session, name.c_str())), - capacity(0), used(0) -{} - + capacity(0), used(0) {} ReceiverContext::~ReceiverContext() { - //pn_link_free(receiver); + pn_link_free(receiver); } void ReceiverContext::setCapacity(uint32_t c) diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 8ded487bf3..2b4e8e1986 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -60,8 +60,6 @@ class ReceiverContext void verify(); Address getAddress() const; bool hasCurrent(); - bool getBrowse() const { return helper.getBrowse(); } - private: friend class ConnectionContext; const std::string name; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 1a254c1846..2a48b2241a 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -30,6 +30,7 @@ #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/log/Statement.h" +#include "config.h" extern "C" { #include <proton/engine.h> } @@ -49,7 +50,7 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n, const SenderContext::~SenderContext() { - //pn_link_free(sender); + pn_link_free(sender); } void SenderContext::close() @@ -510,7 +511,11 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) { pn_delivery_tag_t tag; tag.size = sizeof(id); +#ifdef NO_PROTON_DELIVERY_TAG_T + tag.start = reinterpret_cast<const char*>(&id); +#else tag.bytes = reinterpret_cast<const char*>(&id); +#endif token = pn_delivery(sender, tag); pn_link_send(sender, encoded.getData(), encoded.getSize()); if (unreliable) { diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index f2b7b24b4c..824b958af3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -110,10 +110,11 @@ uint32_t SessionContext::getUnsettledAcks() return 0;//TODO } -qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery, bool browse) +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) { qpid::framing::SequenceNumber id = next++; - if (!browse) unacked[id] = delivery; + if (!pn_delivery_settled(delivery)) + unacked[id] = delivery; QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); return id; } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index b347c327c5..8c2bb040a6 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -75,7 +75,7 @@ class SessionContext qpid::framing::SequenceNumber next; std::string name; - qpid::framing::SequenceNumber record(pn_delivery_t*, bool browse); + qpid::framing::SequenceNumber record(pn_delivery_t*); void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index bb59530883..7be625a1a3 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -150,7 +150,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (!codec) { //TODO: may still want to revise this... //send valid version header & close connection. - write(framing::ProtocolInitiation(framing::highestProtocolVersion)); + write(framing::ProtocolInitiation(factory->supportedVersion())); readError = true; aio->queueWriteClose(); } else { diff --git a/qpid/cpp/src/qpid/sys/ConnectionCodec.h b/qpid/cpp/src/qpid/sys/ConnectionCodec.h index 969a3877e3..8b5b69cdb4 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionCodec.h +++ b/qpid/cpp/src/qpid/sys/ConnectionCodec.h @@ -60,6 +60,8 @@ class ConnectionCodec : public Codec { virtual ConnectionCodec* create( OutputControl&, const std::string& id, const SecuritySettings& ) = 0; + + virtual framing::ProtocolVersion supportedVersion() const = 0; }; }; diff --git a/qpid/cpp/src/tests/assertions.py b/qpid/cpp/src/tests/assertions.py index f1db21b753..930afd124d 100644 --- a/qpid/cpp/src/tests/assertions.py +++ b/qpid/cpp/src/tests/assertions.py @@ -177,3 +177,18 @@ class AssertionTests (VersionTest): assert False, "Expected assertion to fail on unspecified option" except AssertionFailed: None except MessagingError: None + + def test_queue_autodelete_timeout(self): + name = str(uuid4()) + # create subscription queue with 0-10 to be sure of name + ssn_0_10 = self.create_connection("amqp0-10", True).session() + ssn_0_10.receiver("amq.direct; {link:{name:%s,timeout:30}}" % name) + self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 30}}}}" % name) + ssn_0_10_other = self.create_connection("amqp0-10", True).session() + ssn_0_10_other.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 30}}}}" % name) + try: + self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 60}}}}" % name) + ssn_0_10_other.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 60}}}}" % name) + assert False, "Expected assertion to fail for auto_delete_timeout" + except AssertionFailed: None + except MessagingError: None diff --git a/qpid/cpp/src/tests/txshift.cpp b/qpid/cpp/src/tests/txshift.cpp index bf85bee986..6ec28c7233 100644 --- a/qpid/cpp/src/tests/txshift.cpp +++ b/qpid/cpp/src/tests/txshift.cpp @@ -40,7 +40,7 @@ namespace tests { struct Args : public qpid::TestOptions { std::string workQueue; - size_t workers; + uint workers; Args() : workQueue("txshift-control"), workers(1) { @@ -178,7 +178,7 @@ int main(int argc, char** argv) worker.run(); } else { boost::ptr_vector<Worker> workers; - for (size_t i = 0; i < opts.workers; i++) { + for (uint i = 0; i < opts.workers; i++) { workers.push_back(new Worker(connection, opts.workQueue)); } std::for_each(workers.begin(), workers.end(), boost::bind(&Worker::start, _1)); |
