summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/examples/messaging/client.cpp11
-rw-r--r--qpid/cpp/examples/messaging/server.cpp30
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/amqp.cmake6
-rw-r--r--qpid/cpp/src/config.h.cmake3
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCompletion.h5
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp41
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/broker/LossyLvq.cpp30
-rw-r--r--qpid/cpp/src/qpid/broker/LossyLvq.h41
-rw-r--r--qpid/cpp/src/qpid/broker/LossyQueue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Lvq.h2
-rw-r--r--qpid/cpp/src/qpid/broker/MessageBuilder.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Protocol.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/Protocol.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFactory.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp293
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h15
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Domain.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp19
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Relay.cpp21
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Relay.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp22
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp30
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceSet.cpp29
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp1
-rw-r--r--qpid/cpp/src/qpid/linearstore/ISSUES62
-rw-r--r--qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp18
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp99
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h13
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp8
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp120
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h6
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp34
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp14
-rw-r--r--qpid/cpp/src/qpid/linearstore/journal/jerrno.h4
-rw-r--r--qpid/cpp/src/qpid/linearstore/management-schema.xml49
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp7
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp6
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp7
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h2
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionCodec.h2
-rw-r--r--qpid/cpp/src/tests/assertions.py15
-rw-r--r--qpid/cpp/src/tests/txshift.cpp4
57 files changed, 782 insertions, 388 deletions
diff --git a/qpid/cpp/examples/messaging/client.cpp b/qpid/cpp/examples/messaging/client.cpp
index 983f0a8878..44720ef3ce 100644
--- a/qpid/cpp/examples/messaging/client.cpp
+++ b/qpid/cpp/examples/messaging/client.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,7 @@ using std::string;
int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
std::string connectionOptions = argc > 2 ? argv[2] : "";
-
+
Connection connection(url, connectionOptions);
try {
connection.open();
@@ -62,10 +62,11 @@ int main(int argc, char** argv) {
Message request;
request.setReplyTo(responseQueue);
for (int i=0; i<4; i++) {
- request.setContent(s[i]);
+ request.setContentObject(s[i]);
sender.send(request);
Message response = receiver.fetch();
- std::cout << request.getContent() << " -> " << response.getContent() << std::endl;
+ std::cout << request.getContentObject() << " -> " << response.getContentObject() << std::endl;
+ session.acknowledge(response);
}
connection.close();
return 0;
diff --git a/qpid/cpp/examples/messaging/server.cpp b/qpid/cpp/examples/messaging/server.cpp
index aa271d91f9..ba9fa9e063 100644
--- a/qpid/cpp/examples/messaging/server.cpp
+++ b/qpid/cpp/examples/messaging/server.cpp
@@ -52,15 +52,31 @@ int main(int argc, char** argv) {
const Address& address = request.getReplyTo();
if (address) {
Sender sender = session.createSender(address);
- std::string s = request.getContent();
- std::transform(s.begin(), s.end(), s.begin(), toupper);
- Message response(s);
+ Message response;
+
+ qpid::types::Variant requestObj = request.getContentObject();
+ if (requestObj.getType() == qpid::types::VAR_STRING) {
+ // Received a string.
+ // Server returns request string in upper case with same encoding.
+ std::string s = requestObj;
+ std::transform(s.begin(), s.end(), s.begin(), toupper);
+ qpid::types::Variant responseObj(s);
+ responseObj.setEncoding( requestObj.getEncoding() );
+ response.setContentObject( responseObj );
+ } else {
+ // Received something other than a string.
+ // Server echos received object as a utf8 string.
+ qpid::types::Variant responseObj( requestObj.asString() );
+ responseObj.setEncoding( "utf8" );
+ response.setContentObject( requestObj );
+ }
sender.send(response);
- std::cout << "Processed request: "
- << request.getContent()
- << " -> "
- << response.getContent() << std::endl;
+ std::cout << "Processed request: "
+ << request.getContentObject()
+ << " -> "
+ << response.getContentObject() << std::endl;
session.acknowledge();
+ sender.close();
} else {
std::cerr << "Error: no reply address specified for request: " << request.getContent() << std::endl;
session.reject(request);
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index fe7a809cee..3e5165dfb0 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1077,6 +1077,7 @@ set (qpidbroker_SOURCES
qpid/broker/IngressCompletion.cpp
qpid/broker/Link.cpp
qpid/broker/LinkRegistry.cpp
+ qpid/broker/LossyLvq.cpp
qpid/broker/LossyQueue.cpp
qpid/broker/Lvq.cpp
qpid/broker/Message.cpp
diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake
index 3be9f520e0..b2ff10bd68 100644
--- a/qpid/cpp/src/amqp.cmake
+++ b/qpid/cpp/src/amqp.cmake
@@ -22,7 +22,7 @@
find_package(Proton 0.5)
set (amqp_default ${amqp_force})
-set (maximum_version 0.7)
+set (maximum_version 0.9)
if (Proton_FOUND)
if (Proton_VERSION GREATER ${maximum_version})
message(WARNING "Qpid proton ${Proton_VERSION} is not a tested version and might not be compatible, ${maximum_version} is highest tested; build may not work")
@@ -35,7 +35,11 @@ if (Proton_FOUND)
endif (NOT Proton_VERSION EQUAL 0.5)
if (Proton_VERSION GREATER 0.7)
set (USE_PROTON_TRANSPORT_CONDITION 1)
+ set (HAVE_PROTON_EVENTS 1)
endif (Proton_VERSION GREATER 0.7)
+ if (Proton_VERSION GREATER 0.8)
+ set (NO_PROTON_DELIVERY_TAG_T 1)
+ endif (Proton_VERSION GREATER 0.8)
else ()
message(STATUS "Qpid proton not found, amqp 1.0 support not enabled")
endif ()
diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake
index f8139262d5..777fc1b893 100644
--- a/qpid/cpp/src/config.h.cmake
+++ b/qpid/cpp/src/config.h.cmake
@@ -58,5 +58,6 @@
#cmakedefine HAVE_LOG_FTP
#cmakedefine HAVE_PROTON_TRACER
#cmakedefine USE_PROTON_TRANSPORT_CONDITION
-
+#cmakedefine HAVE_PROTON_EVENTS
+#cmakedefine NO_PROTON_DELIVERY_TAG_T
#endif /* QPID_CONFIG_H */
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index 87085b6d77..866f98071b 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -54,7 +54,7 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
}
framing::AMQFrame frame;
- while(frame.decode(in)) {
+ while(!pushClosed && frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
connection->received(frame);
}
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 43f39c2919..bd0dcbfc85 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -276,6 +276,7 @@ void SessionHandler::flush(bool expected, bool confirmed, bool completed) {
}
void SessionHandler::gap(const SequenceSet& /*commands*/) {
+ checkAttached();
throw NotImplementedException("session.gap not supported");
}
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
index 1ab69f32d3..cb5d58977b 100644
--- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h
+++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
@@ -111,13 +111,14 @@ class AsyncCompletion : public virtual RefCounted
qpid::sys::Mutex::ScopedLock l(callbackLock);
if (active) {
if (callback.get()) {
+ boost::intrusive_ptr<Callback> save = callback;
+ callback = boost::intrusive_ptr<Callback>(); // Nobody else can run callback.
inCallback = true;
{
qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
- callback->completed(sync);
+ save->completed(sync);
}
inCallback = false;
- callback = boost::intrusive_ptr<Callback>();
callbackLock.notifyAll();
}
active = false;
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 2afdc5a61d..8972040be5 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -93,14 +93,14 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
} else if (isOpen()) {
handler->connection.getChannel(frame.getChannel()).in(frame);
} else {
- handler->proxy.close(
+ handler->connection.close(
connection::CLOSE_CODE_FRAMING_ERROR,
"Connection not yet open, invalid frame received.");
}
}catch(ConnectionException& e){
- handler->proxy.close(e.code, e.what());
+ handler->connection.close(e.code, e.what());
}catch(std::exception& e){
- handler->proxy.close(541/*internal error*/, e.what());
+ handler->connection.close(connection::CLOSE_CODE_CONNECTION_FORCED, e.what());
}
}
@@ -234,21 +234,25 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
const framing::Array& /*capabilities*/, bool /*insist*/)
{
+ if (connection.getUserId().empty()) {
+ throw ConnectionForcedException("Not authenticated!");
+ }
+
if (connection.isFederationLink()) {
AclModule* acl = connection.getBroker().getAcl();
if (acl && acl->userAclRules()) {
if (!acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
- proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,
- QPID_MSG("ACL denied " << connection.getUserId()
- << " creating a federation link"));
+ connection.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,
+ QPID_MSG("ACL denied " << connection.getUserId()
+ << " creating a federation link"));
return;
}
} else {
if (connection.getBroker().isAuthenticating()) {
- proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,
- QPID_MSG("User " << connection.getUserId()
- << " federation connection denied. Systems with authentication "
- "enabled must specify ACL create link rules."));
+ connection.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,
+ QPID_MSG("User " << connection.getUserId()
+ << " federation connection denied. Systems with authentication "
+ "enabled must specify ACL create link rules."));
return;
}
}
@@ -302,6 +306,11 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
const framing::Array& supportedMechanisms,
const framing::Array& /*locales*/)
{
+ if (serverMode) {
+ throw ConnectionForcedException("Invalid protocol sequence.");
+ }
+
+
string requestedMechanism = connection.getAuthMechanism();
std::string username = connection.getUsername();
@@ -388,6 +397,10 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
void ConnectionHandler::Handler::secure(const string& challenge )
{
+ if (serverMode) {
+ throw ConnectionForcedException("Invalid protocol sequence.");
+ }
+
if (sasl.get()) {
string response = sasl->step(challenge);
proxy.secureOk(response);
@@ -402,6 +415,10 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax,
uint16_t /*heartbeatMin*/,
uint16_t heartbeatMax)
{
+ if (serverMode) {
+ throw ConnectionForcedException("Invalid protocol sequence.");
+ }
+
maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
connection.setFrameMax(maxFrameSize);
@@ -420,6 +437,10 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax,
void ConnectionHandler::Handler::openOk(const framing::Array& knownHosts)
{
+ if (serverMode) {
+ throw ConnectionForcedException("Invalid protocol sequence.");
+ }
+
for (Array::ValueVector::const_iterator i = knownHosts.begin(); i != knownHosts.end(); ++i) {
Url url((*i)->get<std::string>());
connection.getKnownHosts().push_back(url);
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
index 7af2fe3cb4..30155fb903 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
@@ -100,13 +100,14 @@ class ConnectionHandler : public framing::FrameHandler
std::auto_ptr<Handler> handler;
bool handle(const qpid::framing::AMQMethodBody& method);
+ void close(framing::connection::CloseCode code, const std::string& text);
public:
ConnectionHandler(amqp_0_10::Connection& connection, bool isClient );
- void close(framing::connection::CloseCode code, const std::string& text);
void heartbeat();
void handle(framing::AMQFrame& frame);
void setSecureConnection(SecureConnection* secured);
bool isOpen() { return handler->isOpen; }
+ friend class amqp_0_10::Connection;
};
diff --git a/qpid/cpp/src/qpid/broker/LossyLvq.cpp b/qpid/cpp/src/qpid/broker/LossyLvq.cpp
new file mode 100644
index 0000000000..f59ecc1925
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/LossyLvq.cpp
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LossyLvq.h"
+#include "MessageMap.h"
+
+namespace qpid {
+namespace broker {
+
+LossyLvq::LossyLvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
+ : Queue(n, s, ms, p, b), Lvq(n, m, s, ms, p, b), LossyQueue(n, s, ms, p, b) {}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/LossyLvq.h b/qpid/cpp/src/qpid/broker/LossyLvq.h
new file mode 100644
index 0000000000..e0a266ab77
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/LossyLvq.h
@@ -0,0 +1,41 @@
+#ifndef QPID_BROKER_LOSSYLVQ_H
+#define QPID_BROKER_LOSSYLVQ_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Lvq.h"
+#include "qpid/broker/LossyQueue.h"
+
+namespace qpid {
+namespace broker {
+class MessageMap;
+
+/**
+ * Combination of LossyQueue and Lvq behaviours.
+ */
+class LossyLvq : public Lvq, public LossyQueue
+{
+ public:
+ LossyLvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_LOSSYLVQ_H*/
diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.h b/qpid/cpp/src/qpid/broker/LossyQueue.h
index 3e62151d6f..705865f449 100644
--- a/qpid/cpp/src/qpid/broker/LossyQueue.h
+++ b/qpid/cpp/src/qpid/broker/LossyQueue.h
@@ -29,7 +29,7 @@ namespace broker {
/**
* Drops messages to prevent a breach of any configured maximum depth.
*/
-class LossyQueue : public Queue
+class LossyQueue : public virtual Queue
{
public:
LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
diff --git a/qpid/cpp/src/qpid/broker/Lvq.h b/qpid/cpp/src/qpid/broker/Lvq.h
index 335270a073..26ba2b4914 100644
--- a/qpid/cpp/src/qpid/broker/Lvq.h
+++ b/qpid/cpp/src/qpid/broker/Lvq.h
@@ -32,7 +32,7 @@ class MessageMap;
* conjunction with the MessageMap class. This requires an existing
* message to be 'replaced' by a newer message with the same key.
*/
-class Lvq : public Queue
+class Lvq : public virtual Queue
{
public:
Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
index 7cb99514d5..f5e9332052 100644
--- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -45,6 +45,9 @@ void MessageBuilder::handle(AMQFrame& frame)
switch(state) {
case METHOD:
checkType(METHOD_BODY, type);
+ if (!frame.getMethod()->isA<qpid::framing::MessageTransferBody>())
+ throw NotImplementedException(QPID_MSG("Unexpected method: " << *(frame.getMethod())));
+
exchange = frame.castBody<qpid::framing::MessageTransferBody>()->getDestination();
state = HEADER;
break;
diff --git a/qpid/cpp/src/qpid/broker/Protocol.cpp b/qpid/cpp/src/qpid/broker/Protocol.cpp
index 2ef8c66445..e9e7892499 100644
--- a/qpid/cpp/src/qpid/broker/Protocol.cpp
+++ b/qpid/cpp/src/qpid/broker/Protocol.cpp
@@ -35,8 +35,10 @@ ProtocolRegistry::ProtocolRegistry(const std::set<std::string>& e, Broker* b) :
qpid::sys::ConnectionCodec* ProtocolRegistry::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s)
{
- if (v == qpid::framing::ProtocolVersion(0, 10) && isEnabled(AMQP_0_10)) {
- return create_0_10(o, id, s, false);
+ if (v == qpid::framing::ProtocolVersion(0, 10)) {
+ if (isEnabled(AMQP_0_10)) {
+ return create_0_10(o, id, s, false);
+ }
}
qpid::sys::ConnectionCodec* codec = 0;
for (Protocols::const_iterator i = protocols.begin(); !codec && i != protocols.end(); ++i) {
@@ -51,7 +53,22 @@ qpid::sys::ConnectionCodec* ProtocolRegistry::create(qpid::sys::OutputControl& o
return create_0_10(o, id, s, true);
}
-bool ProtocolRegistry::isEnabled(const std::string& name)
+qpid::framing::ProtocolVersion ProtocolRegistry::supportedVersion() const
+{
+ if (isEnabled(AMQP_0_10)) {
+ return qpid::framing::ProtocolVersion(0,10);
+ } else {
+ for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) {
+ if (isEnabled(i->first)) {
+ return i->second->supportedVersion();
+ }
+ }
+ }
+ QPID_LOG(error, "No enabled protocols!");
+ return qpid::framing::ProtocolVersion(0,0);
+}
+
+bool ProtocolRegistry::isEnabled(const std::string& name) const
{
return enabled.empty()/*if nothing is explicitly enabled, assume everything is*/ || enabled.find(name) != enabled.end();
}
diff --git a/qpid/cpp/src/qpid/broker/Protocol.h b/qpid/cpp/src/qpid/broker/Protocol.h
index 59a631848e..a7dbc98fff 100644
--- a/qpid/cpp/src/qpid/broker/Protocol.h
+++ b/qpid/cpp/src/qpid/broker/Protocol.h
@@ -61,6 +61,7 @@ class Protocol
virtual qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) = 0;
virtual boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&) = 0;
virtual boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&) = 0;
+ virtual qpid::framing::ProtocolVersion supportedVersion() const = 0;
private:
};
@@ -70,6 +71,7 @@ class ProtocolRegistry : public Protocol, public qpid::sys::ConnectionCodec::Fac
public:
QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+ QPID_BROKER_EXTERN qpid::framing::ProtocolVersion supportedVersion() const;
QPID_BROKER_EXTERN boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&);
QPID_BROKER_EXTERN boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
QPID_BROKER_EXTERN Message decode(qpid::framing::Buffer&);
@@ -86,7 +88,7 @@ class ProtocolRegistry : public Protocol, public qpid::sys::ConnectionCodec::Fac
Broker* broker;
qpid::sys::ConnectionCodec* create_0_10(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&, bool);
- bool isEnabled(const std::string&);
+ bool isEnabled(const std::string&) const;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index d897029c85..b1f7d0524b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1258,6 +1258,10 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
if (has_userId)
result.first->setOwningUser(_userId);
+ if (result.first->getSettings().autoDeleteDelay) {
+ result.first->scheduleAutoDelete();
+ }
+
return result.first;
}
@@ -1296,10 +1300,10 @@ struct AutoDeleteTask : qpid::sys::TimerTask
}
};
-void Queue::scheduleAutoDelete()
+void Queue::scheduleAutoDelete(bool immediate)
{
if (canAutoDelete()) {
- if (settings.autoDeleteDelay) {
+ if (!immediate && settings.autoDeleteDelay) {
AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC));
autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(shared_from_this(), time));
broker->getTimer().add(autoDeleteTask);
@@ -1339,7 +1343,7 @@ bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
return o == owner;
}
-void Queue::releaseExclusiveOwnership()
+void Queue::releaseExclusiveOwnership(bool immediateExpiry)
{
bool unused;
{
@@ -1351,7 +1355,7 @@ void Queue::releaseExclusiveOwnership()
unused = !users.isUsed();
}
if (unused && settings.autodelete) {
- scheduleAutoDelete();
+ scheduleAutoDelete(immediateExpiry);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 65a91b8729..efca9b9d40 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -379,7 +379,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
inline const std::string& getName() const { return name; }
QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const;
- QPID_BROKER_EXTERN void releaseExclusiveOwnership();
+ QPID_BROKER_EXTERN void releaseExclusiveOwnership(bool immediateExpiry=false);
QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
@@ -389,7 +389,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
inline bool isAutoDelete() const { return settings.autodelete; }
inline bool isBrowseOnly() const { return settings.isBrowseOnly; }
QPID_BROKER_EXTERN bool canAutoDelete() const;
- QPID_BROKER_EXTERN void scheduleAutoDelete();
+ QPID_BROKER_EXTERN void scheduleAutoDelete(bool immediate=false);
QPID_BROKER_EXTERN bool isDeleted() const;
const QueueBindings& getBindings() const { return bindings; }
diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp
index 8104fff740..16cdea3b0a 100644
--- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/LossyLvq.h"
#include "qpid/broker/LossyQueue.h"
#include "qpid/broker/Lvq.h"
#include "qpid/broker/Messages.h"
@@ -51,10 +52,17 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que
boost::shared_ptr<QueueFlowLimit> flow_ptr(QueueFlowLimit::createLimit(name, settings));
//1. determine Queue type (i.e. whether we are subclassing Queue)
- // -> if 'ring' policy is in use then subclass
boost::shared_ptr<Queue> queue;
if (settings.dropMessagesAtLimit) {
- queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker));
+ // -> if 'ring' policy is in use then subclass
+ if (settings.lvqKey.size()) {
+ //combination of ring and lvq:
+ std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey));
+ queue = boost::shared_ptr<Queue>(new LossyLvq(name, map, settings, settings.durable ? store : 0, parent, broker));
+ } else {
+ //simple ring:
+ queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker));
+ }
} else if (settings.selfDestructAtLimit) {
queue = boost::shared_ptr<Queue>(new SelfDestructQueue(name, settings, settings.durable ? store : 0, parent, broker));
} else if (settings.lvqKey.size()) {
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index 04bbe8b944..3a93e2aac5 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -37,6 +37,9 @@
extern "C" {
#include <proton/engine.h>
#include <proton/error.h>
+#ifdef HAVE_PROTON_EVENTS
+#include <proton/event.h>
+#endif
}
namespace qpid {
@@ -117,8 +120,14 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker
: BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated),
connection(pn_connection()),
transport(pn_transport()),
+ collector(0),
out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false)
{
+#ifdef HAVE_PROTON_EVENTS
+ collector = pn_collector();
+ pn_connection_collect(connection, collector);
+#endif
+
if (pn_transport_bind(transport, connection)) {
//error
QPID_LOG(error, "Failed to bind transport to connection: " << getError());
@@ -157,6 +166,9 @@ Connection::~Connection()
getBroker().getConnectionObservers().closed(*this);
pn_transport_free(transport);
pn_connection_free(connection);
+#ifdef HAVE_PROTON_EVENTS
+ pn_collector_free(collector);
+#endif
}
pn_transport_t* Connection::getTransport()
@@ -222,10 +234,15 @@ size_t Connection::encode(char* buffer, size_t size)
void Connection::doOutput(size_t capacity)
{
- for (ssize_t n = pn_transport_pending(transport); n > 0 && n < (ssize_t) capacity; n = pn_transport_pending(transport)) {
- if (dispatch()) processDeliveries();
- else break;
- }
+ ssize_t n = 0;
+ do {
+ if (dispatch()) {
+ processDeliveries();
+ ssize_t next = pn_transport_pending(transport);
+ if (n == next) break;
+ n = next;
+ } else break;
+ } while (n > 0 && n < (ssize_t) capacity);
}
bool Connection::dispatch()
@@ -327,122 +344,92 @@ framing::ProtocolVersion Connection::getVersion() const
{
return qpid::framing::ProtocolVersion(1,0);
}
-namespace {
-pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE;
-pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
-}
void Connection::process()
{
QPID_LOG(trace, id << " process()");
+#ifdef HAVE_PROTON_EVENTS
+ pn_event_t *event = pn_collector_peek(collector);
+ while (event) {
+ switch (pn_event_type(event)) {
+ case PN_CONNECTION_REMOTE_OPEN:
+ doConnectionRemoteOpen();
+ break;
+ case PN_CONNECTION_REMOTE_CLOSE:
+ doConnectionRemoteClose();
+ break;
+ case PN_SESSION_REMOTE_OPEN:
+ doSessionRemoteOpen(pn_event_session(event));
+ break;
+ case PN_SESSION_REMOTE_CLOSE:
+ doSessionRemoteClose(pn_event_session(event));
+ break;
+ case PN_LINK_REMOTE_OPEN:
+ doLinkRemoteOpen(pn_event_link(event));
+ break;
+ case PN_LINK_REMOTE_CLOSE:
+ doLinkRemoteClose(pn_event_link(event));
+ break;
+ case PN_DELIVERY:
+ doDeliveryUpdated(pn_event_delivery(event));
+ break;
+ default:
+ break;
+ }
+ pn_collector_pop(collector);
+ event = pn_collector_peek(collector);
+ }
+
+#else // !HAVE_PROTON_EVENTS
+
+ const pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE;
+ const pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
+
if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
- QPID_LOG_CAT(debug, model, id << " connection opened");
- open();
- setContainerId(pn_connection_remote_container(connection));
+ doConnectionRemoteOpen();
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
- QPID_LOG_CAT(debug, model, id << " session begun");
- pn_session_open(s);
- boost::shared_ptr<Session> ssn(new Session(s, *this, out));
- sessions[s] = ssn;
+ doSessionRemoteOpen(s);
}
for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) {
- pn_link_open(l);
-
- Sessions::iterator session = sessions.find(pn_link_session(l));
- if (session == sessions.end()) {
- QPID_LOG(error, id << " Link attached on unknown session!");
- } else {
- try {
- session->second->attach(l);
- QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l));
- } catch (const Exception& e) {
- QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
- pn_condition_t* error = pn_link_condition(l);
- pn_condition_set_name(error, e.symbol());
- pn_condition_set_description(error, e.what());
- pn_link_close(l);
- } catch (const qpid::framing::UnauthorizedAccessException& e) {
- QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
- pn_condition_t* error = pn_link_condition(l);
- pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str());
- pn_condition_set_description(error, e.what());
- pn_link_close(l);
- } catch (const std::exception& e) {
- QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
- pn_condition_t* error = pn_link_condition(l);
- pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
- pn_condition_set_description(error, e.what());
- pn_link_close(l);
- }
- }
+ doLinkRemoteOpen(l);
}
processDeliveries();
for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) {
- pn_link_close(l);
- Sessions::iterator session = sessions.find(pn_link_session(l));
- if (session == sessions.end()) {
- QPID_LOG(error, id << " peer attempted to detach link on unknown session!");
- } else {
- session->second->detach(l);
- QPID_LOG_CAT(debug, model, id << " link detached");
- }
+ doLinkRemoteClose(l);
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) {
- pn_session_close(s);
- Sessions::iterator i = sessions.find(s);
- if (i != sessions.end()) {
- i->second->close();
- sessions.erase(i);
- QPID_LOG_CAT(debug, model, id << " session ended");
- } else {
- QPID_LOG(error, id << " peer attempted to close unrecognised session");
- }
+ doSessionRemoteClose(s);
}
if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
- QPID_LOG_CAT(debug, model, id << " connection closed");
- pn_connection_close(connection);
+ doConnectionRemoteClose();
}
+#endif // !HAVE_PROTON_EVENTS
}
namespace {
std::string convert(pn_delivery_tag_t in)
{
+#ifdef NO_PROTON_DELIVERY_TAG_T
+ return std::string(in.start, in.size);
+#else
return std::string(in.bytes, in.size);
+#endif
}
}
void Connection::processDeliveries()
{
- //handle deliveries
+#ifdef HAVE_PROTON_EVENTS
+ // with the event API, there's no way to selectively process only
+ // the delivery-related events. We have to process all events:
+ process();
+#else
for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) {
- pn_link_t* link = pn_delivery_link(delivery);
- try {
- if (pn_link_is_receiver(link)) {
- Sessions::iterator i = sessions.find(pn_link_session(link));
- if (i != sessions.end()) {
- i->second->readable(link, delivery);
- } else {
- pn_delivery_update(delivery, PN_REJECTED);
- }
- } else { //i.e. SENDER
- Sessions::iterator i = sessions.find(pn_link_session(link));
- if (i != sessions.end()) {
- QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
- i->second->writable(link, delivery);
- } else {
- QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
- }
- }
- } catch (const Exception& e) {
- QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what());
- pn_condition_t* error = pn_link_condition(link);
- pn_condition_set_name(error, e.symbol());
- pn_condition_set_description(error, e.what());
- pn_link_close(link);
- }
+ doDeliveryUpdated(delivery);
}
+#endif
}
std::string Connection::getError()
@@ -470,4 +457,132 @@ void Connection::closedByManagement()
closeRequested = true;
out.activateOutput();
}
+
+// the peer has issued an Open performative
+void Connection::doConnectionRemoteOpen()
+{
+ // respond in kind if we haven't yet
+ if ((pn_connection_state(connection) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
+ QPID_LOG_CAT(debug, model, id << " connection opened");
+ open();
+ setContainerId(pn_connection_remote_container(connection));
+ }
+}
+
+// the peer has issued a Close performative
+void Connection::doConnectionRemoteClose()
+{
+ if ((pn_connection_state(connection) & PN_LOCAL_CLOSED) == 0) {
+ QPID_LOG_CAT(debug, model, id << " connection closed");
+ pn_connection_close(connection);
+ }
+}
+
+// the peer has issued a Begin performative
+void Connection::doSessionRemoteOpen(pn_session_t *session)
+{
+ if ((pn_session_state(session) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
+ QPID_LOG_CAT(debug, model, id << " session begun");
+ pn_session_open(session);
+ boost::shared_ptr<Session> ssn(new Session(session, *this, out));
+ sessions[session] = ssn;
+ }
+}
+
+// the peer has issued an End performative
+void Connection::doSessionRemoteClose(pn_session_t *session)
+{
+ if ((pn_session_state(session) & PN_LOCAL_CLOSED) == 0) {
+ pn_session_close(session);
+ Sessions::iterator i = sessions.find(session);
+ if (i != sessions.end()) {
+ i->second->close();
+ sessions.erase(i);
+ QPID_LOG_CAT(debug, model, id << " session ended");
+ } else {
+ QPID_LOG(error, id << " peer attempted to close unrecognised session");
+ }
+ }
+}
+
+// the peer has issued an Attach performative
+void Connection::doLinkRemoteOpen(pn_link_t *link)
+{
+ if ((pn_link_state(link) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
+ pn_link_open(link);
+ Sessions::iterator session = sessions.find(pn_link_session(link));
+ if (session == sessions.end()) {
+ QPID_LOG(error, id << " Link attached on unknown session!");
+ } else {
+ try {
+ session->second->attach(link);
+ QPID_LOG_CAT(debug, protocol, id << " link " << link << " attached on " << pn_link_session(link));
+ } catch (const Exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ } catch (const qpid::framing::UnauthorizedAccessException& e) {
+ QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ } catch (const std::exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ }
+ }
+ }
+}
+
+// the peer has issued a Detach performative
+void Connection::doLinkRemoteClose(pn_link_t *link)
+{
+ if ((pn_link_state(link) & PN_LOCAL_CLOSED) == 0) {
+ pn_link_close(link);
+ Sessions::iterator session = sessions.find(pn_link_session(link));
+ if (session == sessions.end()) {
+ QPID_LOG(error, id << " peer attempted to detach link on unknown session!");
+ } else {
+ session->second->detach(link);
+ QPID_LOG_CAT(debug, model, id << " link detached");
+ }
+ }
+}
+
+// the status of the delivery has changed
+void Connection::doDeliveryUpdated(pn_delivery_t *delivery)
+{
+ pn_link_t* link = pn_delivery_link(delivery);
+ try {
+ if (pn_link_is_receiver(link)) {
+ Sessions::iterator i = sessions.find(pn_link_session(link));
+ if (i != sessions.end()) {
+ i->second->readable(link, delivery);
+ } else {
+ pn_delivery_update(delivery, PN_REJECTED);
+ }
+ } else { //i.e. SENDER
+ Sessions::iterator i = sessions.find(pn_link_session(link));
+ if (i != sessions.end()) {
+ QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
+ i->second->writable(link, delivery);
+ } else {
+ QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
+ }
+ }
+ } catch (const Exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ }
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index 17c5b0ecf0..ea4ce06163 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -31,6 +31,9 @@
struct pn_connection_t;
struct pn_session_t;
struct pn_transport_t;
+struct pn_collector_t;
+struct pn_link_t;
+struct pn_delivery_t;
namespace qpid {
namespace sys {
@@ -69,6 +72,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
pn_connection_t* connection;
pn_transport_t* transport;
+ pn_collector_t* collector;
qpid::sys::OutputControl& out;
const std::string id;
bool haveOutput;
@@ -86,6 +90,17 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
void open();
void readPeerProperties();
void closedByManagement();
+
+ private:
+ // handle Proton engine events
+ void doConnectionRemoteOpen();
+ void doConnectionRemoteClose();
+ void doSessionRemoteOpen(pn_session_t *session);
+ void doSessionRemoteClose(pn_session_t *session);
+ void doLinkRemoteOpen(pn_link_t *link);
+ void doLinkRemoteClose(pn_link_t *link);
+ void doDeliveryUpdated(pn_delivery_t *delivery);
+
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
index cc714c0730..c2d4782fc4 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
@@ -140,6 +140,10 @@ class InterconnectFactory : public BrokerContext, public qpid::sys::ConnectionCo
boost::shared_ptr<Domain>, BrokerContext&, boost::shared_ptr<Relay>);
qpid::sys::ConnectionCodec* create(const framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+ qpid::framing::ProtocolVersion supportedVersion() const
+ {
+ return qpid::framing::ProtocolVersion(1, 0);
+ }
bool connect();
void failed(int, std::string);
private:
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
index ce4c73dead..d4f73fc511 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -57,7 +57,7 @@ uint32_t Incoming::getCredit()
return credit;//TODO: proper flow control
}
-void Incoming::detached()
+void Incoming::detached(bool /*closed*/)
{
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
index 1a7064337d..ccf999a256 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
@@ -42,7 +42,7 @@ class Incoming : public ManagedIncomingLink
virtual ~Incoming();
virtual bool doWork();//do anything that requires output
virtual bool haveWork();//called when handling input to see whether any output work is needed
- virtual void detached();
+ virtual void detached(bool closed);
virtual void readable(pn_delivery_t* delivery) = 0;
void verify(const std::string& userid, const std::string& defaultRealm);
void wakeup();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 54993d071e..0136d5a0ed 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -32,6 +32,7 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
+#include "config.h"
namespace qpid {
namespace broker {
@@ -156,7 +157,7 @@ bool OutgoingFromQueue::canDeliver()
return deliveries[current].delivery == 0 && pn_link_credit(link);
}
-void OutgoingFromQueue::detached()
+void OutgoingFromQueue::detached(bool closed)
{
QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName());
queue->cancel(shared_from_this());
@@ -164,12 +165,14 @@ void OutgoingFromQueue::detached()
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
if (deliveries[i].msg) queue->release(deliveries[i].cursor, true);
}
- if (exclusive) queue->releaseExclusiveOwnership();
- else if (isControllingUser) queue->releaseFromUse(true);
+ if (exclusive) {
+ queue->releaseExclusiveOwnership(closed);
+ } else if (isControllingUser) {
+ queue->releaseFromUse(true);
+ }
cancelled = true;
}
-
OutgoingFromQueue::~OutgoingFromQueue()
{
if (!cancelled && isControllingUser) queue->releaseFromUse(true);
@@ -283,7 +286,11 @@ qpid::broker::OwnershipToken* OutgoingFromQueue::getSession()
OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0)
{
+#ifdef NO_PROTON_DELIVERY_TAG_T
+ tag.start = tagData;
+#else
tag.bytes = tagData;
+#endif
tag.size = TAG_WIDTH;
}
void OutgoingFromQueue::Record::init(size_t i)
@@ -304,7 +311,11 @@ void OutgoingFromQueue::Record::reset()
size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t)
{
assert(t.size == TAG_WIDTH);
+#ifdef NO_PROTON_DELIVERY_TAG_T
+ qpid::framing::Buffer buffer(const_cast<char*>(t.start)/*won't ever be written to*/, t.size);
+#else
qpid::framing::Buffer buffer(const_cast<char*>(t.bytes)/*won't ever be written to*/, t.size);
+#endif
return (size_t) buffer.getLong();
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index 27d8205fc8..d3825d0894 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -70,7 +70,7 @@ class Outgoing : public ManagedOutgoingLink
/**
* Signals that this link has been detached
*/
- virtual void detached() = 0;
+ virtual void detached(bool closed) = 0;
/**
* Called when a delivery is writable
*/
@@ -98,7 +98,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
void write(const char* data, size_t size);
void handle(pn_delivery_t* delivery);
bool canDeliver();
- void detached();
+ void detached(bool closed);
//Consumer interface:
bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
index 2ea381e2bc..621f25f04b 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
@@ -73,6 +73,7 @@ class ProtocolImpl : public BrokerContext, public Protocol
qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&);
boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
+ qpid::framing::ProtocolVersion supportedVersion() const;
private:
};
@@ -158,5 +159,10 @@ boost::shared_ptr<RecoverableMessage> ProtocolImpl::recover(qpid::framing::Buffe
}
}
+qpid::framing::ProtocolVersion ProtocolImpl::supportedVersion() const
+{
+ return qpid::framing::ProtocolVersion(1,0);
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
index 83b3e64ee6..587a11466a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
@@ -23,6 +23,7 @@
#include "qpid/log/Statement.h"
#include <algorithm>
#include <string.h>
+#include "config.h"
namespace qpid {
namespace broker {
@@ -126,7 +127,13 @@ bool OutgoingFromRelay::doWork()
{
relay->check();
relay->setCredit(pn_link_credit(link));
- return relay->send(link);
+ bool worked = relay->send(link);
+ pn_delivery_t *d = pn_link_current(link);
+ if (d && pn_delivery_writable(d)) {
+ handle(d);
+ return true;
+ }
+ return worked;
}
/**
* Called when a delivery is writable
@@ -163,7 +170,7 @@ void OutgoingFromRelay::handle(pn_delivery_t* delivery)
/**
* Signals that this link has been detached
*/
-void OutgoingFromRelay::detached()
+void OutgoingFromRelay::detached(bool /*closed*/)
{
relay->detached(this);
}
@@ -221,7 +228,7 @@ uint32_t IncomingToRelay::getCredit()
return relay->getCredit();
}
-void IncomingToRelay::detached()
+void IncomingToRelay::detached(bool /*closed*/)
{
relay->detached(this);
}
@@ -238,7 +245,11 @@ void BufferedTransfer::initIn(pn_link_t* link, pn_delivery_t* d)
//copy delivery tag
pn_delivery_tag_t dt = pn_delivery_tag(d);
tag.resize(dt.size);
+#ifdef NO_PROTON_DELIVERY_TAG_T
+ ::memmove(&tag[0], dt.start, dt.size);
+#else
::memmove(&tag[0], dt.bytes, dt.size);
+#endif
//set context
pn_delivery_set_context(d, this);
@@ -258,7 +269,11 @@ bool BufferedTransfer::settle()
void BufferedTransfer::initOut(pn_link_t* link)
{
pn_delivery_tag_t dt;
+#ifdef NO_PROTON_DELIVERY_TAG_T
+ dt.start = &tag[0];
+#else
dt.bytes = &tag[0];
+#endif
dt.size = tag.size();
out.handle = pn_delivery(link, dt);
//set context
diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.h b/qpid/cpp/src/qpid/broker/amqp/Relay.h
index ef700690fd..32f317bfe1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Relay.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Relay.h
@@ -100,7 +100,7 @@ class OutgoingFromRelay : public Outgoing
const std::string& target, const std::string& name, boost::shared_ptr<Relay>);
bool doWork();
void handle(pn_delivery_t* delivery);
- void detached();
+ void detached(bool closed);
void init();
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
@@ -118,7 +118,7 @@ class IncomingToRelay : public Incoming
bool settle();
bool doWork();
bool haveWork();
- void detached();
+ void detached(bool closed);
void readable(pn_delivery_t* delivery);
uint32_t getCredit();
private:
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 85da8c0c21..538883f29a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -48,6 +48,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
+#include "config.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/format.hpp>
#include <map>
@@ -505,10 +506,11 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
if (!settings.autodelete) settings.autodelete = autodelete;
altExchange = node.topic->getAlternateExchange();
}
- if (!settings.autoDeleteDelay) {
+ if (settings.original.find("qpid.auto_delete_timeout") == settings.original.end()) {
//only use delay from link if policy didn't specify one
settings.autoDeleteDelay = pn_terminus_get_timeout(source);
- settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
+ if (settings.autoDeleteDelay)
+ settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
}
if (settings.autoDeleteDelay) {
settings.autodelete = true;
@@ -577,7 +579,7 @@ void Session::detach(pn_link_t* link)
if (pn_link_is_sender(link)) {
OutgoingLinks::iterator i = outgoing.find(link);
if (i != outgoing.end()) {
- i->second->detached();
+ i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/);
boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get());
if (q && !q->isAutoDelete() && !q->isDeleted()) {
connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId());
@@ -588,7 +590,7 @@ void Session::detach(pn_link_t* link)
} else {
IncomingLinks::iterator i = incoming.find(link);
if (i != incoming.end()) {
- i->second->detached();
+ i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/);
incoming.erase(i);
QPID_LOG(debug, "Incoming link detached");
}
@@ -615,7 +617,11 @@ void Session::accepted(pn_delivery_t* delivery, bool sync)
void Session::readable(pn_link_t* link, pn_delivery_t* delivery)
{
pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+#ifdef NO_PROTON_DELIVERY_TAG_T
+ QPID_LOG(debug, "received delivery: " << std::string(tag.start, tag.size));
+#else
QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size));
+#endif
incomingMessageReceived();
IncomingLinks::iterator target = incoming.find(link);
if (target == incoming.end()) {
@@ -653,7 +659,7 @@ bool Session::dispatch()
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(s->first);
- s->second->detached();
+ s->second->detached(true);
outgoing.erase(s++);
output = true;
}
@@ -678,7 +684,7 @@ bool Session::dispatch()
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(i->first);
- i->second->detached();
+ i->second->detached(true);
incoming.erase(i++);
output = true;
}
@@ -690,10 +696,10 @@ bool Session::dispatch()
void Session::close()
{
for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
- i->second->detached();
+ i->second->detached(false);
}
for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) {
- i->second->detached();
+ i->second->detached(false);
}
outgoing.clear();
incoming.clear();
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index bbb310b0f4..0225ee74cb 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -55,6 +55,7 @@ using qpid::messaging::AssertionFailed;
using qpid::framing::ExchangeBoundResult;
using qpid::framing::ExchangeQueryResult;
using qpid::framing::FieldTable;
+using qpid::framing::FieldValue;
using qpid::framing::QueueQueryResult;
using qpid::framing::ReplyTo;
using qpid::framing::Uuid;
@@ -140,6 +141,11 @@ const std::string PREFIX_AMQ("amq.");
const std::string PREFIX_QPID("qpid.");
const Verifier verifier;
+
+bool areEquivalent(const FieldValue& a, const FieldValue& b)
+{
+ return ((a == b) || (a.convertsTo<int64_t>() && b.convertsTo<int64_t>() && a.get<int64_t>() == b.get<int64_t>()));
+}
}
struct Binding
@@ -534,19 +540,19 @@ Subscription::Subscription(const Address& address, const std::string& type)
reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)),
actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
- autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(true)),
+ autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(!(durable || reliable))),
exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)),
alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str())
{
- const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
- if (timeout) {
+
+ if ((Opt(address)/LINK).hasKey(TIMEOUT)) {
+ const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32());
- } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) {
- //if durable but not explicitly reliable, then set a non-zero
- //default for the autodelete timeout (previously this would
- //have defaulted to autodelete immediately anyway, so the risk
- //of the change causing problems is mitigated)
- queueOptions.setInt("qpid.auto_delete_delay", 15*60);
+ } else if (durable && !reliable && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) {
+ //if durable but not reliable, and auto-delete not
+ //explicitly set, then set a non-zero default for the
+ //autodelete timeout
+ queueOptions.setInt("qpid.auto_delete_timeout", 2*60);
}
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -609,7 +615,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
//create subscription queue:
session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
- arg::autoDelete=autoDeleteQueue && (!(durable || reliable)), arg::durable=durable,
+ arg::autoDelete=autoDeleteQueue, arg::durable=durable,
arg::alternateExchange=alternateExchange,
arg::arguments=queueOptions);
//'default' binding:
@@ -806,7 +812,7 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
- } else if (*i->second != *v) {
+ } else if (!areEquivalent(*i->second, *v)) {
throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}
@@ -906,7 +912,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
- } else if (*i->second != *v) {
+ } else if (!areEquivalent(*i->second, *v)) {
throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}
diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp
index 72fcd8a9e2..6510842c58 100644
--- a/qpid/cpp/src/qpid/framing/SequenceSet.cpp
+++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp
@@ -33,7 +33,18 @@ namespace framing {
namespace {
//each range contains 2 numbers, 4 bytes each
-uint16_t RANGE_SIZE = 2 * 4;
+uint16_t RANGE_SIZE = 2 * 4;
+int32_t MAX_RANGE = 2147483647;//2^31-1
+
+int32_t gap(const SequenceNumber& a, const SequenceNumber& b)
+{
+ return a < b ? b - a : a - b;
+}
+
+bool is_max_range(const SequenceNumber& a, const SequenceNumber& b)
+{
+ return gap(a, b) == MAX_RANGE;
+}
}
void SequenceSet::encode(Buffer& buffer) const
@@ -54,7 +65,21 @@ void SequenceSet::decode(Buffer& buffer)
throw IllegalArgumentException(QPID_MSG("Invalid size for sequence set: " << size));
for (uint16_t i = 0; i < count; i++) {
- add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong()));
+ SequenceNumber a(buffer.getLong());
+ SequenceNumber b(buffer.getLong());
+ if (b < a)
+ throw IllegalArgumentException(QPID_MSG("Invalid range in sequence set: " << a << " -> " << b));
+ if (is_max_range(a, b)) {
+ //RangeSet holds 'half-closed' ranges, where the end is
+ //one past the 'highest' value in the range. So if the
+ //range is already the maximum expressable with a 32bit
+ //sequence number, we can't represent it as a
+ //'half-closed' range, so we represent it as two ranges.
+ add(a, b-1);
+ add(b);
+ } else {
+ add(a, b);
+ }
}
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 0e87346ac1..870e4723b2 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -482,6 +482,7 @@ shared_ptr<PrimaryTxObserver> Primary::makeTxObserver(
{
shared_ptr<PrimaryTxObserver> observer =
PrimaryTxObserver::create(*this, haBroker, txBuffer);
+ sys::Mutex::ScopedLock l(lock);
txMap[observer->getTxQueue()->getName()] = observer;
return observer;
}
diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES
index 2d5a389615..4023ba9629 100644
--- a/qpid/cpp/src/qpid/linearstore/ISSUES
+++ b/qpid/cpp/src/qpid/linearstore/ISSUES
@@ -25,7 +25,7 @@ Current/pending:
------ ------- ----------------------
5359 - Linearstore: Implement new management schema and wire into store
5360 - Linearstore: Evaluate and rework logging to produce a consistent log output
- 5361 1145359 Linearstore: No tests for linearstore functionality currently exist
+* 5361 1145359 Linearstore: No tests for linearstore functionality currently exist
svn r.1564893 2014-02-05: Added tx-test-soak.sh
svn r.1564935 2014-02-05: Added license text to tx-test-soak.sh
svn r.1625283 2014-09-16: Basic python tests from legacystore ported over to linearstore
@@ -37,15 +37,22 @@ Current/pending:
** Basic performance tests
5464 - [linearstore] Incompletely created journal files accumulate in EFP
- 1088944 [Linearstore] store does not return all files to EFP after purging big queue <queue purge issue>
- 6043 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation
+* - 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation
- 1067480 [LinearStore] Provide a way to limit max count/size of empty files in EFP
- 1067429 [LinearStore] last file from deleted queue is not moved to EFP <queue delete issue>
- - 1067482 [LinearStore] Provide a way to prealocate empty pages in EFP
-* 5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs
- svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks
- svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new
- 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal
- svn r.1643053 2014-11-18: Proposed fix
+ - 1067482 [LinearStore] Provide a way to preallocate empty pages in EFP
+* 6303 1180660 [linearstore] Roll back auto-upgrade of store directory structure
+* 5362 1145363 Linearstore: No store tools exist for examining the journals
+ svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+ svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
+ svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze
+ svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
+ * Store analysis and status
+ * Recovery/reading of message content
+ * Empty file pool status and management
+
@@ -115,16 +122,6 @@ NO-JIRA - Added missing Apache copyright/license text
5651 - [C++ broker] segfault in qpid::linearstore::journal::jdir::clear_dir when declaring durable queue
svn r.1582730 2014-03-28 Proposed fix by Pavel Moravec
* Bug introduced by r.1578899.
- 5362 1145363 Linearstore: No store tools exist for examining the journals
- svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
- svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
- svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
- svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
- svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze
- svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
- * Store analysis and status
- * Recovery/reading of message content
- * Empty file pool status and management
5661 - [linearstore] Set default cmake build to exclude linearstore
svn r.1584379 2014-04-03 Proposed solution.
* Run ccmake, select BUILD_LINEARSTORE to change its value to ON to build.
@@ -147,8 +144,16 @@ NO-JIRA - Added missing Apache copyright/license text
svn r.1631360 2014-10-13 Proposed solution
6157 1150397 linearstore: segfault when 2 journals request new journal file from empty EFP
svn r.1632504 2014-10-17 Proposed solution by pmoravec
- 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute
+ 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal
+ svn r.1643053 2014-11-18: Proposed fix
+ 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute
svn r.1641689 2014-11-25 Proposed solution
+ 5671 1160367 [linearstore] Add ability to use disk partitions and select per-queue EFPs
+ svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks
+ svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new
+ svn r.1649081 2015-01-02: WIP: Specify new queue using qpid-config --durable together with --efp-partition-num and/or --efp-pool-file-size. Needs testing.
+ - 1148807 [linearstore] Restarting broker with empty journal raises confusing warning
+ Fixed by svn r.1649081 of bug 5671 / 1160367 above
Ordered checkin list:
@@ -187,14 +192,17 @@ no. svn r Q-JIRA RHBZ Date Alt Committer
28. 1596509 5767 1098118 2014-05-21 0.22-mrg (pmoravec)
29. 1596633 NO-JIRA 1078937 2014-05-21 (includes tools install update)
30. 1599243 5767 1098118 2014-06-02 0.22-mrg
-30. 1599243 5767 1098118 2014-06-02
-31. 1614665 5924 1124906 2014-07-30
-32. 1620426 6043 1089652 2014-08-25
-33. 1631360 6147 1152012 2014-10-13 (pmoravec)
-34. 1632504 6157 1150397 2014-10-17 (pmoravec)
-35. 1636598 5671 2014-11-04
-36. 1637985 5671 2014-11-10
-37. 1641689 6248 1167911 2014-11-25
+31. 1599243 5767 1098118 2014-06-02
+32. 1614665 5924 1124906 2014-07-30
+33. 1620426 6043 1089652 2014-08-25
+34. 1631360 6147 1152012 2014-10-13 (pmoravec)
+35. 1632504 6157 1150397 2014-10-17 (pmoravec)
+36. 1636598 5671 1160367 2014-11-04
+37. 1637985 5671 1160367 2014-11-10
+38. 1643053 6230 1165200 2014-11-18
+39. 1641689 6248 1167911 2014-11-25
+40. 1649081 5671 1160367 2015-01-02
+41. 1649082 NO-JIRA - 2015-01-02
See above sections for details on these checkins.
diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 77d5703636..70eac27f48 100644
--- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -148,7 +148,7 @@ void MessageStoreImpl::initManagement ()
mgmtObject = qmf::org::apache::qpid::linearstore::Store::shared_ptr (
new qmf::org::apache::qpid::linearstore::Store(agent, this, broker));
- mgmtObject->set_location(storeDir);
+ mgmtObject->set_storeDir(storeDir);
mgmtObject->set_tplIsInitialized(false);
mgmtObject->set_tplDirectory(getTplBaseDir());
mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * QLS_SBLK_SIZE_BYTES);
@@ -406,7 +406,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
if (queue_.getName().size() == 0)
{
- QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue.");
+ QLS_LOG(error, "Cannot create store for empty (null) queue name - queue create ignored.");
return;
}
@@ -449,15 +449,15 @@ qpid::linearstore::journal::EmptyFilePool*
MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) {
qpid::framing::FieldTable::ValuePtr value;
qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber;
- value = args_.get("qpid.efp_partition");
+ value = args_.get("qpid.efp_partition_num");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
- localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
+ localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition_num");
}
qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib;
- value = args_.get("qpid.efp_file_size");
+ value = args_.get("qpid.efp_pool_file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
- localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
+ localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(), "qpid.efp_pool_file_size");
}
return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib);
}
@@ -1488,21 +1488,21 @@ std::string MessageStoreImpl::getStoreTopLevelDir() {
std::string MessageStoreImpl::getJrnlBaseDir()
{
std::ostringstream dir;
- dir << storeDir << "/" << storeTopLevelDir << "/jrnl/" ;
+ dir << storeDir << "/" << storeTopLevelDir << "/jrnl2/" ;
return dir.str();
}
std::string MessageStoreImpl::getBdbBaseDir()
{
std::ostringstream dir;
- dir << storeDir << "/" << storeTopLevelDir << "/dat/" ;
+ dir << storeDir << "/" << storeTopLevelDir << "/dat2/" ;
return dir.str();
}
std::string MessageStoreImpl::getTplBaseDir()
{
std::ostringstream dir;
- dir << storeDir << "/" << storeTopLevelDir << "/tpl/" ;
+ dir << storeDir << "/" << storeTopLevelDir << "/tpl2/" ;
return dir.str();
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
index 18f4d3afc3..08db3f75bd 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
@@ -59,10 +59,16 @@ EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
EmptyFilePool::~EmptyFilePool() {}
void EmptyFilePool::initialize() {
-//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition " << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ << std::endl; // DEBUG
- std::vector<std::string> dirList;
+ if (::mkdir(efpDirectory_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { // Create EFP dir if it does not yet exist
+ if (errno != EEXIST) {
+ std::ostringstream oss;
+ oss << "directory=" << efpDirectory_ << " " << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_EFP_MKDIR, oss.str(), "EmptyFilePool", "initialize");
+ }
+ }
// Process empty files in main dir
+ std::vector<std::string> dirList;
jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
size_t dotPos = i->rfind(".");
@@ -122,14 +128,14 @@ const efpIdentity_t EmptyFilePool::getIdentity() const {
std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
std::string emptyFileName = popEmptyFile();
- std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/'));
+ std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
- if (moveFile(emptyFileName, newFileName)) {
+ if (!moveFile(emptyFileName, newFileName)) {
// Try again with new UUID for file name
newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName();
- if (moveFile(emptyFileName, newFileName)) {
+ if (!moveFile(emptyFileName, newFileName)) {
//std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG
- pushEmptyFile(emptyFileName);
+ pushEmptyFile(emptyFileName); // Return empty file to pool
std::ostringstream oss;
oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
@@ -138,7 +144,7 @@ std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
if (createSymLink(newFileName, symlinkName)) {
std::ostringstream oss;
oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile");
+ throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile");
}
return symlinkName;
}
@@ -189,12 +195,27 @@ efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirN
}
// --- protected functions ---
+void EmptyFilePool::checkIosState(std::ofstream& ofs,
+ const uint32_t jerrno,
+ const std::string& fqFileName,
+ const std::string& operation,
+ const std::string& errorMessage,
+ const std::string& className,
+ const std::string& fnName) {
+ if (!ofs.good()) {
+ if (ofs.is_open()) {
+ ofs.close();
+ }
+ std::ostringstream oss;
+ oss << "IO failure: eofbit=" << (ofs.eof()?"T":"F") << " failbit=" << (ofs.fail()?"T":"F") << " badbit="
+ << (ofs.bad()?"T":"F") << " file=" << fqFileName << " operation=" << operation << ": " << errorMessage;
+ throw jexception(jerrno, oss.str(), className, fnName);
+ }
+}
std::string EmptyFilePool::createEmptyFile() {
std::string efpfn = getEfpFileName();
- if (!overwriteFileContents(efpfn)) {
- // TODO: handle failure to prepare new file here
- }
+ overwriteFileContents(efpfn);
return efpfn;
}
@@ -226,24 +247,20 @@ void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) {
}
}
-bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
+void EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
::file_hdr_t fh;
::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
std::ofstream ofs(fqFileName.c_str(), std::ofstream::out | std::ofstream::binary);
- if (ofs.good()) {
- ofs.write((char*)&fh, sizeof(::file_hdr_t));
- uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
- while (rem--)
- ofs.put('\0');
- ofs.close();
- return true;
-//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
- } else {
- std::ostringstream oss;
- oss << "std::ofstream ofs: file=\"" << fqFileName.c_str() << "\"" << " failed to be open";
- throw jexception(jerrno::JERR_EFP_FOPEN, oss.str(), "EmptyFilePool", "overwriteFileContents");
+ checkIosState(ofs, jerrno::JERR_EFP_FOPEN, fqFileName, "constructor", "Failed to create file", "EmptyFilePool", "overwriteFileContents");
+ ofs.write((char*)&fh, sizeof(::file_hdr_t));
+ checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "write()", "Failed to write header", "EmptyFilePool", "overwriteFileContents");
+ uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
+ while (rem--) {
+ ofs.put('\0');
+ checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "put()", "Failed to put \0", "EmptyFilePool", "overwriteFileContents");
}
- return false;
+ ofs.close();
+//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
}
std::string EmptyFilePool::popEmptyFile() {
@@ -271,7 +288,7 @@ void EmptyFilePool::pushEmptyFile(const std::string fqFileName) {
void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
- if (moveFile(emptyFileName, returnedFileName)) {
+ if (!moveFile(emptyFileName, returnedFileName)) {
::unlink(emptyFileName.c_str());
//std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG
}
@@ -283,7 +300,7 @@ void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
overwriteFileContents(returnedFileName);
}
std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/'
- if (moveFile(returnedFileName, sanitizedEmptyFileName)) {
+ if (!moveFile(returnedFileName, sanitizedEmptyFileName)) {
::unlink(returnedFileName.c_str());
//std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG
} else {
@@ -395,18 +412,6 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
return true;
}
-// static
-int EmptyFilePool::moveFile(const std::string& from,
- const std::string& to) {
- if (::rename(from.c_str(), to.c_str())) {
- if (errno == EEXIST) return errno; // File name exists
- std::ostringstream oss;
- oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
- }
- return 0;
-}
-
//static
int EmptyFilePool::createSymLink(const std::string& fqFileName,
const std::string& fqLinkName) {
@@ -414,7 +419,7 @@ int EmptyFilePool::createSymLink(const std::string& fqFileName,
if (errno == EEXIST) return errno; // File name exists
std::ostringstream oss;
oss << "file=\"" << fqFileName << "\" symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "createSymLink");
+ throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "createSymLink");
}
return 0;
}
@@ -426,7 +431,7 @@ std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) {
if (len < 0) {
std::ostringstream oss;
oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink");
+ throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink");
}
::unlink(fqLinkName.c_str());
return std::string(buff, len);
@@ -455,4 +460,18 @@ bool EmptyFilePool::isSymlink(const std::string& fqName) {
}
+// static
+bool EmptyFilePool::moveFile(const std::string& from,
+ const std::string& to) {
+ if (::rename(from.c_str(), to.c_str())) {
+ if (errno == EEXIST) {
+ return false; // File name exists
+ }
+ std::ostringstream oss;
+ oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
+ }
+ return true;
+}
+
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
index 1a1264fa26..dc567ff917 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
@@ -87,23 +87,30 @@ public:
const efpPartitionNumber_t partitionNumber);
protected:
+ void checkIosState(std::ofstream& ofs,
+ const uint32_t jerrno,
+ const std::string& fqFileName,
+ const std::string& operation,
+ const std::string& errorMessage,
+ const std::string& className,
+ const std::string& fnName);
std::string createEmptyFile();
std::string getEfpFileName();
void initializeSubDirectory(const std::string& fqDirName);
- bool overwriteFileContents(const std::string& fqFileName);
+ void overwriteFileContents(const std::string& fqFileName);
std::string popEmptyFile();
void pushEmptyFile(const std::string fqFileName);
void returnEmptyFile(const std::string& emptyFileName);
void resetEmptyFileHeader(const std::string& fqFileName);
bool validateEmptyFile(const std::string& emptyFileName) const;
- static int moveFile(const std::string& fromFqPath,
- const std::string& toFqPath);
static int createSymLink(const std::string& fqFileName,
const std::string& fqLinkName);
static std::string deleteSymlink(const std::string& fqLinkName);
static bool isFile(const std::string& fqName);
static bool isSymlink(const std::string& fqName);
+ static bool moveFile(const std::string& fromFqPath,
+ const std::string& toFqPath);
};
}}}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
index 28e1b0b56e..a02679736e 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
@@ -74,6 +74,7 @@ void EmptyFilePoolManager::findEfpPartitions() {
if (!foundPartition) {
std::ostringstream oss1;
oss1 << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
+ << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_
<< "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
jdir::create_dir(oss1.str());
insertPartition(defaultPartitionNumber_, oss1.str());
@@ -165,9 +166,10 @@ EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIde
EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber,
const efpDataSize_kib_t efpDataSize_kib) {
EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber > 0 ? partitionNumber : defaultPartitionNumber_);
- if (efppp != 0)
- return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_);
- return 0;
+ if (efppp == 0) {
+ return 0;
+ }
+ return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_, true);
}
void EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
index a31855e0d8..12d2db74b8 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
@@ -32,6 +32,9 @@ namespace qpid {
namespace linearstore {
namespace journal {
+// static
+const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition
+
EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
const std::string& partitionDir,
const bool overwriteBeforeReturnFlag,
@@ -57,72 +60,31 @@ EmptyFilePoolPartition::~EmptyFilePoolPartition() {
void
EmptyFilePoolPartition::findEmptyFilePools() {
//std::cout << "*** EmptyFilePoolPartition::findEmptyFilePools(): Reading " << partitionDir_ << std::endl; // DEBUG
- std::vector<std::string> dirList;
- bool upgradeDirStructureFlag = false;
- std::string oldPartitionDir;
- jdir::read_dir(partitionDir_, dirList, true, false, false, false);
-//std::cout << "*** dirList.size()=" << dirList.size() << "; dirList.front()=" << dirList.front() << std::endl; // DEBUG
- if (dirList.size() == 1 && dirList.front().compare("efp") == 0) {
- upgradeDirStructureFlag = true;
- oldPartitionDir = partitionDir_ + "/efp";
-//std::cout << "*** oldPartitionDir=" << oldPartitionDir << std::endl; // DEBUG
- dirList.clear();
- jdir::read_dir(oldPartitionDir, dirList, true, false, false, false);
- }
+ std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
+ if (jdir::is_dir(efpDir)) {
+ std::vector<std::string> dirList;
+ jdir::read_dir(efpDir, dirList, true, false, false, true);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
- std::string fqFileName(partitionDir_ + "/" + *i);
- if (upgradeDirStructureFlag) {
- std::string fqOldFileName(partitionDir_ + "/efp/" + *i);
- if (::rename(fqOldFileName.c_str(), fqFileName.c_str())) {
- // File move failed
- std::ostringstream oss;
- oss << "File \'" << fqOldFileName << "\' could not be renamed to \'" << fqFileName << "\' (" << FORMAT_SYSERR(errno) << "); file deleted";
- journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
- if (::unlink(fqOldFileName.c_str())) {
- std::ostringstream oss;
- oss << "File \'" << fqOldFileName << "\' could not be deleted (" << FORMAT_SYSERR(errno) << "\'; file orphaned";
- journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
- }
- }
- }
- EmptyFilePool* efpp = 0;
- try {
- efpp = new EmptyFilePool(fqFileName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
- {
- slock l(efpMapMutex_);
- efpMap_[efpp->dataSize_kib()] = efpp;
- }
- }
- catch (const std::exception& e) {
- if (efpp != 0) {
- delete efpp;
- efpp = 0;
- }
- std::ostringstream oss;
- oss << "EmptyFilePool create failed: " << e.what();
- journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
- }
- if (efpp != 0) {
- efpp->initialize();
- }
- }
- if (upgradeDirStructureFlag) {
- std::string oldEfpDir(partitionDir_ + "/efp");
- if (::rmdir(oldEfpDir.c_str())) {
- // Unable to delete old "efp" dir
- std::ostringstream oss;
- oss << "Unable to delete old EFP directory \'" << oldEfpDir << "\' (" << FORMAT_SYSERR(errno) << "\'; directory orphaned";
- journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
- }
+ createEmptyFilePool(*i);
}
+ } else {
+ std::ostringstream oss;
+ oss << "Partition \"" << partitionDir_ << "\" does not contain top level EFP dir \"" << s_efpTopLevelDir_ << "\"";
+ journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+ }
}
-EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) {
- slock l(efpMapMutex_);
- efpMapItr_t i = efpMap_.find(efpDataSize_kib);
- if (i == efpMap_.end())
- return 0;
- return i->second;
+EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent) {
+ {
+ slock l(efpMapMutex_);
+ efpMapItr_t i = efpMap_.find(efpDataSize_kib);
+ if (i != efpMap_.end())
+ return i->second;
+ }
+ if (createIfNonExistent) {
+ return createEmptyFilePool(efpDataSize_kib);
+ }
+ return 0;
}
void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
@@ -183,7 +145,7 @@ std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNu
//static
efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::string& name) {
if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && ::isdigit(name[2]) && ::isdigit(name[3])) {
- long pn = ::strtol(name.c_str() + 1, 0, 0);
+ long pn = ::strtol(name.c_str() + 1, 0, 10);
if (pn == 0 && errno) {
return 0;
} else {
@@ -195,12 +157,42 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::strin
// --- protected functions ---
+EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) {
+ std::string fqEfpDirectoryName(partitionDir_ + "/" + EmptyFilePoolPartition::s_efpTopLevelDir_ + "/" + EmptyFilePool::dirNameFromDataSize(efpDataSize_kib));
+ return createEmptyFilePool(fqEfpDirectoryName);
+}
+
+EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const std::string fqEfpDirectoryName) {
+ EmptyFilePool* efpp = 0;
+ try {
+ efpp = new EmptyFilePool(fqEfpDirectoryName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
+ {
+ slock l(efpMapMutex_);
+ efpMap_[efpp->dataSize_kib()] = efpp;
+ }
+ }
+ catch (const std::exception& e) {
+ if (efpp != 0) {
+ delete efpp;
+ efpp = 0;
+ }
+ std::ostringstream oss;
+ oss << "EmptyFilePool create failed: " << e.what();
+ journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+ }
+ if (efpp != 0) {
+ efpp->initialize();
+ }
+ return efpp;
+}
+
void EmptyFilePoolPartition::validatePartitionDir() {
+ std::ostringstream ss;
if (!jdir::is_dir(partitionDir_)) {
- std::ostringstream ss;
ss << "Invalid partition directory: \'" << partitionDir_ << "\' is not a directory";
throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir");
}
+
// TODO: other validity checks here
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
index c653c6be6a..570e2b073f 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
@@ -37,6 +37,8 @@ class JournalLog;
class EmptyFilePoolPartition
{
+public:
+ static const std::string s_efpTopLevelDir_;
protected:
typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
typedef efpMap_t::iterator efpMapItr_t;
@@ -59,7 +61,7 @@ public:
virtual ~EmptyFilePoolPartition();
void findEmptyFilePools();
- EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
+ EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent);
void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const;
std::string getPartitionDirectory() const;
@@ -70,6 +72,8 @@ public:
static efpPartitionNumber_t getPartitionNumber(const std::string& name);
protected:
+ EmptyFilePool* createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
+ EmptyFilePool* createEmptyFilePool(const std::string fqEfpDirectoryName);
void validatePartitionDir();
};
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 3f39913422..73a16f01b7 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -43,6 +43,7 @@
#include "qpid/linearstore/journal/utils/file_hdr.h"
#include <sstream>
#include <string>
+#include <unistd.h>
#include <vector>
namespace qpid {
@@ -101,7 +102,11 @@ void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTr
analyzeJournalFileHeaders(efpIdentity);
if (journalEmptyFlag_) {
- *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP
+ if (uninitFileList_.empty()) {
+ *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP
+ } else {
+ *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
+ }
} else {
*emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
if (! *emptyFilePoolPtrPtr) {
@@ -294,6 +299,7 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
LinearFileController* lfcPtr) {
if (journalEmptyFlag_) {
if (uninitFileList_.size() > 0) {
+ // TODO: Handle case if uninitFileList_.size() > 1, but this should not happen in normal operation. Here we assume only one item in the list.
std::string uninitFile = uninitFileList_.back();
uninitFileList_.pop_back();
lfcPtr->restoreEmptyFile(uninitFile);
@@ -377,11 +383,28 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
bool hdrOk = readJournalFileHeader(*i, fileHeader, headerQueueName);
- if (!hdrOk || headerQueueName.empty()) {
+ bool hdrEmpty = ::is_file_hdr_reset(&fileHeader);
+ if (!hdrOk) {
std::ostringstream oss;
- oss << "Journal file " << (*i) << " is uninitialized or corrupted";
+ oss << "Journal file " << (*i) << " is corrupted or invalid";
journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+ } else if (hdrEmpty) {
+ // Read symlink, find efp directory name which is efp size in KiB
+ // TODO: place this bit into a common function as it is also used in EmptyFilePool.cpp::deleteSymlink()
+ char buff[1024];
+ ssize_t len = ::readlink((*i).c_str(), buff, 1024);
+ if (len < 0) {
+ std::ostringstream oss;
+ oss << "symlink=\"" << (*i) << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR__SYMLINK, oss.str(), "RecoveryManager", "analyzeJournalFileHeaders");
+ }
+ // Find second and third '/' from back of string, which contains the EFP directory name
+ *(::strrchr(buff, '/')) = '\0';
+ *(::strrchr(buff, '/')) = '\0';
+ int efpDataSize_kib = atoi(::strrchr(buff, '/') + 1);
uninitFileList_.push_back(*i);
+ efpIdentity.pn_ = fileHeader._efp_partition;
+ efpIdentity.ds_ = efpDataSize_kib;
} else if (headerQueueName.compare(queueName_) != 0) {
std::ostringstream oss;
oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
@@ -406,6 +429,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
}
}
+//std::cerr << "*** RecoveryManager::analyzeJournalFileHeaders() fileNumberMap_.size()=" << fileNumberMap_.size() << std::endl; // DEBUG
if (fileNumberMap_.empty()) {
journalEmptyFlag_ = true;
} else {
@@ -905,7 +929,9 @@ bool RecoveryManager::readJournalFileHeader(const std::string& journalFileName,
}
ifs.close();
::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t));
- if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) return false;
+ if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) {
+ return false;
+ }
queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len);
return true;
}
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
index 9d59039220..ce88e7809c 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
@@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108;
const uint32_t jerrno::JERR__RECNFOUND = 0x0109;
const uint32_t jerrno::JERR__NOTIMPL = 0x010a;
const uint32_t jerrno::JERR__NULL = 0x010b;
+const uint32_t jerrno::JERR__SYMLINK = 0x010c;
// class jcntl
const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
@@ -112,10 +113,11 @@ const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02;
const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03;
const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04;
const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05;
-const uint32_t jerrno::JERR_EFP_SYMLINK = 0x0d06;
-const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d07;
-const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d08;
-const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d09;
+const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d06;
+const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d07;
+const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d08;
+const uint32_t jerrno::JERR_EFP_FWRITE = 0x0d09;
+const uint32_t jerrno::JERR_EFP_MKDIR = 0x0d0a;
// Negative returns for some functions
const int32_t jerrno::AIO_TIMEOUT = -1;
@@ -140,6 +142,7 @@ jerrno::__init()
_err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found.";
_err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented";
_err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer";
+ _err_map[JERR__SYMLINK] = "JERR__SYMLINK: Symbolic link operation failed";
// class jcntl
_err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal.";
@@ -210,10 +213,11 @@ jerrno::__init()
_err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory";
_err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size";
_err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty";
- _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation failed";
_err_map[JERR_EFP_LSTAT] = "JERR_EFP_LSTAT: lstat() operation failed";
_err_map[JERR_EFP_BADFILETYPE] = "JERR_EFP_BADFILETYPE: File type incorrect for operation";
_err_map[JERR_EFP_FOPEN] = "JERR_EFP_FOPEN: Unable to fopen file for write";
+ _err_map[JERR_EFP_FWRITE] = "JERR_EFP_FWRITE: Write failed";
+ _err_map[JERR_EFP_MKDIR] = "JERR_EFP_MKDIR: Directory creation failed";
//_err_map[] = "";
diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
index 77b18b17e8..6e817682ca 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
@@ -60,6 +60,7 @@ namespace journal {
static const uint32_t JERR__RECNFOUND; ///< Record not found
static const uint32_t JERR__NOTIMPL; ///< Not implemented
static const uint32_t JERR__NULL; ///< Operation on null pointer
+ static const uint32_t JERR__SYMLINK; ///< Symbolic Link operation failed
// class jcntl
static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal
@@ -130,10 +131,11 @@ namespace journal {
static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory
static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size
static const uint32_t JERR_EFP_EMPTY; ///< EFP empty
- static const uint32_t JERR_EFP_SYMLINK; ///< Symbolic Link operation failed
static const uint32_t JERR_EFP_LSTAT; ///< lstat operation failed
static const uint32_t JERR_EFP_BADFILETYPE; ///< Bad file type
static const uint32_t JERR_EFP_FOPEN; ///< Unable to fopen file for write
+ static const uint32_t JERR_EFP_FWRITE; ///< Write failed
+ static const uint32_t JERR_EFP_MKDIR; ///< Directory creation failed
// Negative returns for some functions
static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return
diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml
index a55883a255..ebd388593e 100644
--- a/qpid/cpp/src/qpid/linearstore/management-schema.xml
+++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml
@@ -21,38 +21,24 @@
<class name="Store">
<property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/>
- <property name="location" type="sstr" access="RO" desc="Logical directory on disk"/>
- <!--property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/-->
- <!--property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/-->
+ <property name="storeDir" type="sstr" access="RO" desc="Logical directory on disk"/>
<property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/>
<property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/>
<property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/>
<property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/>
- <!--property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/-->
- <!--property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/-->
- <!--property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/-->
<statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/>
<statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/>
<statistic name="tplTxnCommits" type="count64" unit="record" desc="Total transaction commits on transaction prepared list"/>
<statistic name="tplTxnAborts" type="count64" unit="record" desc="Total transaction aborts on transaction prepared list"/>
- <statistic name="tplOutstandingAIOs" type="hilo32" unit="aio_op" desc="Deprecated"/>
</class>
<class name="Journal">
<property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/>
- <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="queueName" type="sstr" access="RC" index="y"/>
<property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/>
- <property name="baseFileName" type="sstr" access="RO" desc="Deprecated"/>
<property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/>
<property name="writePages" type="uint32" access="RO" unit="wpage" desc="Deprecated"/>
- <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/>
- <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Deprecated"/>
- <!--property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/-->
- <!--property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/-->
- <!--property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/-->
- <!--property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/-->
- <!--property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/-->
<statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/>
<statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/>
@@ -64,36 +50,5 @@
<statistic name="txnAborts" type="count64" unit="record" desc="Total transactional abort records on journal"/>
<statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
-<!--
- The following are not yet "wired up" in JournalImpl.cpp
--->
- <statistic name="freeFileCount" type="hilo32" unit="file" desc="Deprecated"/>
- <statistic name="availableFileCount" type="hilo32" unit="file" desc="Deprecated"/>
- <statistic name="writeWaitFailures" type="count64" unit="record" desc="Deprecated"/>
- <statistic name="writeBusyFailures" type="count64" unit="record" desc="Deprecated"/>
- <statistic name="readRecordCount" type="count64" unit="record" desc="Deprecated"/>
- <statistic name="readBusyFailures" type="count64" unit="record" desc="Deprecated"/>
- <statistic name="writePageCacheDepth" type="hilo32" unit="wpage" desc="Deprecated"/>
- <statistic name="readPageCacheDepth" type="hilo32" unit="rpage" desc="Deprecated"/>
-
- <!--method name="expand" desc="Increase number of files allocated for this journal">
- <arg name="by" type="uint32" dir="I" desc="Number of files to increase journal size by"/>
- </method-->
</class>
-
- <eventArguments>
- <!--arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/-->
- <arg name="fileSize" type="uint32" desc="Journal file size in bytes"/>
- <arg name="jrnlId" type="sstr" desc="Journal Id"/>
- <arg name="numEnq" type="uint32" desc="Number of recovered enqueues"/>
- <arg name="numFiles" type="uint16" desc="Number of journal files"/>
- <arg name="numTxn" type="uint32" desc="Number of recovered transactions"/>
- <arg name="numTxnDeq" type="uint32" desc="Number of recovered transactional dequeues"/>
- <arg name="numTxnEnq" type="uint32" desc="Number of recovered transactional enqueues"/>
- <arg name="what" type="sstr" desc="Description of event"/>
- </eventArguments>
- <event name="enqThresholdExceeded" sev="warn" args="jrnlId, what"/>
- <event name="created" sev="notice" args="jrnlId, fileSize, numFiles"/>
- <event name="full" sev="error" args="jrnlId, what"/>
- <event name="recovered" sev="notice" args="jrnlId, fileSize, numFiles, numEnq, numTxn, numTxnEnq, numTxnDeq"/>
</schema>
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 763deb33c6..7f19ca7ec0 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -242,7 +242,7 @@ bool replace(Variant::Map& map, const std::string& original, const std::string&
}
}
-const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
+const uint32_t DEFAULT_DURABLE_TIMEOUT(2*60);//2 minutes
const uint32_t DEFAULT_TIMEOUT(0);
}
@@ -267,7 +267,7 @@ AddressHelper::AddressHelper(const Address& address) :
bind(link, RELIABILITY, reliability);
durableNode = test(node, DURABLE);
durableLink = test(link, DURABLE);
- timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
+ timeout = get(link, TIMEOUT, durableLink && reliability != AT_LEAST_ONCE ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
std::string mode;
if (bind(address, MODE, mode)) {
if (mode == BROWSE) {
@@ -571,7 +571,8 @@ bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const
bool AddressHelper::isUnreliable() const
{
- return reliability == AT_MOST_ONCE || reliability == UNRELIABLE;
+ return reliability == AT_MOST_ONCE || reliability == UNRELIABLE ||
+ (reliability.empty() && browse); // A browser defaults to unreliable.
}
const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
index 66aee1ae22..3ee58cad8d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -44,7 +44,6 @@ class AddressHelper
const qpid::types::Variant::Map& getNodeProperties() const;
bool getLinkSource(std::string& out) const;
bool getLinkTarget(std::string& out) const;
- bool getBrowse() const { return browse; }
const qpid::types::Variant::Map& getLinkProperties() const;
static std::string getLinkName(const Address& address);
private:
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 9e3f95742b..fedab4286f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -292,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: ");
encoded->init(impl);
impl.setEncoded(encoded);
- impl.setInternalId(ssn->record(current, lnk->getBrowse()));
+ impl.setInternalId(ssn->record(current));
pn_link_advance(lnk->receiver);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 454106149d..5e0707056f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -36,12 +36,10 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co
address(a),
helper(address),
receiver(pn_receiver(session, name.c_str())),
- capacity(0), used(0)
-{}
-
+ capacity(0), used(0) {}
ReceiverContext::~ReceiverContext()
{
- //pn_link_free(receiver);
+ pn_link_free(receiver);
}
void ReceiverContext::setCapacity(uint32_t c)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
index 8ded487bf3..2b4e8e1986 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -60,8 +60,6 @@ class ReceiverContext
void verify();
Address getAddress() const;
bool hasCurrent();
- bool getBrowse() const { return helper.getBrowse(); }
-
private:
friend class ConnectionContext;
const std::string name;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 1a254c1846..2a48b2241a 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -30,6 +30,7 @@
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/log/Statement.h"
+#include "config.h"
extern "C" {
#include <proton/engine.h>
}
@@ -49,7 +50,7 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n, const
SenderContext::~SenderContext()
{
- //pn_link_free(sender);
+ pn_link_free(sender);
}
void SenderContext::close()
@@ -510,7 +511,11 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
{
pn_delivery_tag_t tag;
tag.size = sizeof(id);
+#ifdef NO_PROTON_DELIVERY_TAG_T
+ tag.start = reinterpret_cast<const char*>(&id);
+#else
tag.bytes = reinterpret_cast<const char*>(&id);
+#endif
token = pn_delivery(sender, tag);
pn_link_send(sender, encoded.getData(), encoded.getSize());
if (unreliable) {
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index f2b7b24b4c..824b958af3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -110,10 +110,11 @@ uint32_t SessionContext::getUnsettledAcks()
return 0;//TODO
}
-qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery, bool browse)
+qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
{
qpid::framing::SequenceNumber id = next++;
- if (!browse) unacked[id] = delivery;
+ if (!pn_delivery_settled(delivery))
+ unacked[id] = delivery;
QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery);
return id;
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index b347c327c5..8c2bb040a6 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -75,7 +75,7 @@ class SessionContext
qpid::framing::SequenceNumber next;
std::string name;
- qpid::framing::SequenceNumber record(pn_delivery_t*, bool browse);
+ qpid::framing::SequenceNumber record(pn_delivery_t*);
void acknowledge();
void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
index bb59530883..7be625a1a3 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -150,7 +150,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (!codec) {
//TODO: may still want to revise this...
//send valid version header & close connection.
- write(framing::ProtocolInitiation(framing::highestProtocolVersion));
+ write(framing::ProtocolInitiation(factory->supportedVersion()));
readError = true;
aio->queueWriteClose();
} else {
diff --git a/qpid/cpp/src/qpid/sys/ConnectionCodec.h b/qpid/cpp/src/qpid/sys/ConnectionCodec.h
index 969a3877e3..8b5b69cdb4 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionCodec.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionCodec.h
@@ -60,6 +60,8 @@ class ConnectionCodec : public Codec {
virtual ConnectionCodec* create(
OutputControl&, const std::string& id, const SecuritySettings&
) = 0;
+
+ virtual framing::ProtocolVersion supportedVersion() const = 0;
};
};
diff --git a/qpid/cpp/src/tests/assertions.py b/qpid/cpp/src/tests/assertions.py
index f1db21b753..930afd124d 100644
--- a/qpid/cpp/src/tests/assertions.py
+++ b/qpid/cpp/src/tests/assertions.py
@@ -177,3 +177,18 @@ class AssertionTests (VersionTest):
assert False, "Expected assertion to fail on unspecified option"
except AssertionFailed: None
except MessagingError: None
+
+ def test_queue_autodelete_timeout(self):
+ name = str(uuid4())
+ # create subscription queue with 0-10 to be sure of name
+ ssn_0_10 = self.create_connection("amqp0-10", True).session()
+ ssn_0_10.receiver("amq.direct; {link:{name:%s,timeout:30}}" % name)
+ self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 30}}}}" % name)
+ ssn_0_10_other = self.create_connection("amqp0-10", True).session()
+ ssn_0_10_other.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 30}}}}" % name)
+ try:
+ self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 60}}}}" % name)
+ ssn_0_10_other.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 60}}}}" % name)
+ assert False, "Expected assertion to fail for auto_delete_timeout"
+ except AssertionFailed: None
+ except MessagingError: None
diff --git a/qpid/cpp/src/tests/txshift.cpp b/qpid/cpp/src/tests/txshift.cpp
index bf85bee986..6ec28c7233 100644
--- a/qpid/cpp/src/tests/txshift.cpp
+++ b/qpid/cpp/src/tests/txshift.cpp
@@ -40,7 +40,7 @@ namespace tests {
struct Args : public qpid::TestOptions
{
std::string workQueue;
- size_t workers;
+ uint workers;
Args() : workQueue("txshift-control"), workers(1)
{
@@ -178,7 +178,7 @@ int main(int argc, char** argv)
worker.run();
} else {
boost::ptr_vector<Worker> workers;
- for (size_t i = 0; i < opts.workers; i++) {
+ for (uint i = 0; i < opts.workers; i++) {
workers.push_back(new Worker(connection, opts.workQueue));
}
std::for_each(workers.begin(), workers.end(), boost::bind(&Worker::start, _1));