summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-05 19:53:44 +0000
committerAlan Conway <aconway@apache.org>2008-09-05 19:53:44 +0000
commit98ec4b4e9226b7d9221dfd5a8eeddd408e3e1caf (patch)
tree9a89658107dd1ecc21ee9cd9a0e979d77d80e7bb /cpp/src/qpid
parent34048060a1c00291e0d7a56725001deb3100c6e2 (diff)
downloadqpid-python-98ec4b4e9226b7d9221dfd5a8eeddd408e3e1caf.tar.gz
Fixed cluster membership notification.
Cluster events with RefCountedBuffers for queueing. PollableQueue clears bacth immediately. Improved perfdist: clients hit multiple brokers in a cluster. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@692521 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/RefCountedBuffer.cpp45
-rw-r--r--cpp/src/qpid/RefCountedBuffer.h68
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp51
-rw-r--r--cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp30
-rw-r--r--cpp/src/qpid/cluster/Cpg.h10
-rw-r--r--cpp/src/qpid/cluster/Event.cpp52
-rw-r--r--cpp/src/qpid/cluster/Event.h65
-rw-r--r--cpp/src/qpid/cluster/PollableQueue.h10
9 files changed, 276 insertions, 56 deletions
diff --git a/cpp/src/qpid/RefCountedBuffer.cpp b/cpp/src/qpid/RefCountedBuffer.cpp
new file mode 100644
index 0000000000..3a52b94412
--- /dev/null
+++ b/cpp/src/qpid/RefCountedBuffer.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "RefCountedBuffer.h"
+
+namespace qpid {
+
+RefCountedBuffer::RefCountedBuffer() : count(0) {}
+
+void RefCountedBuffer::destroy() const {
+ this->~RefCountedBuffer();
+ ::delete[] reinterpret_cast<const char*>(this);
+}
+
+char* RefCountedBuffer::addr() const {
+ return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer));
+}
+
+RefCountedBuffer::intrusive_ptr RefCountedBuffer::create(size_t n) {
+ char* store=::new char[n+sizeof(RefCountedBuffer)];
+ new(store) RefCountedBuffer;
+ return reinterpret_cast<RefCountedBuffer*>(store);
+}
+
+} // namespace qpid
+
+
diff --git a/cpp/src/qpid/RefCountedBuffer.h b/cpp/src/qpid/RefCountedBuffer.h
new file mode 100644
index 0000000000..af46cbb92a
--- /dev/null
+++ b/cpp/src/qpid/RefCountedBuffer.h
@@ -0,0 +1,68 @@
+#ifndef QPID_REFCOUNTEDBUFFER_H
+#define QPID_REFCOUNTEDBUFFER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/utility.hpp>
+#include <boost/detail/atomic_count.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+/**
+ * Reference-counted byte buffer.
+ * No alignment guarantees.
+ */
+class RefCountedBuffer : boost::noncopyable {
+ mutable boost::detail::atomic_count count;
+ RefCountedBuffer();
+ void destroy() const;
+ char* addr() const;
+
+public:
+
+ typedef boost::intrusive_ptr<RefCountedBuffer> intrusive_ptr;
+
+ /** Create a reference counted buffer of size n */
+ static intrusive_ptr create(size_t n);
+
+ /** Get a pointer to the start of the buffer. */
+ char* get() { return addr(); }
+ const char* get() const { return addr(); }
+ char& operator[](size_t i) { return get()[i]; }
+ const char& operator[](size_t i) const { return get()[i]; }
+
+ void addRef() const { ++count; }
+ void release() const { if (--count==0) destroy(); }
+ long refCount() { return count; }
+};
+
+} // namespace qpid
+
+// intrusive_ptr support.
+namespace boost {
+inline void intrusive_ptr_add_ref(const qpid::RefCountedBuffer* p) { p->addRef(); }
+inline void intrusive_ptr_release(const qpid::RefCountedBuffer* p) { p->release(); }
+}
+
+
+#endif /*!QPID_REFCOUNTEDBUFFER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f93203acbf..4d54a837ca 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -38,6 +38,7 @@
#include <algorithm>
#include <iterator>
#include <map>
+#include <ostream>
namespace qpid {
namespace cluster {
@@ -67,11 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
)
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(trace, "Joining cluster: " << name << " as " << self);
+ QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
cpg.join(name);
- mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
- ConnectionId(self,0));
-
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
}
@@ -94,7 +92,7 @@ void Cluster::leave() {
// Leave is called by from Broker destructor after the poller has
// been shut down. No dispatches can occur.
- QPID_LOG(debug, "Leaving cluster " << name.str());
+ QPID_LOG(notice, "Leaving cluster " << name.str());
cpg.leave(name);
// broker= is set to 0 when the final config-change is delivered.
while(broker) {
@@ -158,7 +156,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id)
if (i == connections.end()) { // New shadow connection.
assert(id.getMember() != self);
std::ostringstream mgmtId;
- mgmtId << name << ":" << id;
+ mgmtId << name.str() << ":" << id;
ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id));
i = connections.insert(value).first;
}
@@ -205,22 +203,50 @@ void Cluster::deliver(
}
}
+struct AddrList {
+ const cpg_address* addrs;
+ int count;
+ AddrList(const cpg_address* a, int n) : addrs(a), count(n) {}
+};
+
+ostream& operator<<(ostream& o, const AddrList& a) {
+ for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
+ const char* reasonString;
+ switch (p->reason) {
+ case CPG_REASON_JOIN: reasonString = " joined "; break;
+ case CPG_REASON_LEAVE: reasonString = " left ";break;
+ case CPG_REASON_NODEDOWN: reasonString = " node-down ";break;
+ case CPG_REASON_NODEUP: reasonString = " node-up ";break;
+ case CPG_REASON_PROCDOWN: reasonString = " process-down ";break;
+ default: reasonString = " ";
+ }
+ qpid::cluster::MemberId member(*p);
+ o << member << reasonString;
+ }
+ return o;
+}
+
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address */*joined*/, int /*nJoined*/)
+ cpg_address *joined, int nJoined)
{
- QPID_LOG(debug, "Cluster change: "
- << std::make_pair(current, nCurrent)
- << std::make_pair(left, nLeft));
+ QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
+ << AddrList(joined, nJoined) << AddrList(left, nLeft));
+
+ if (nJoined) // Notfiy new members of my URL.
+ mcastFrame(
+ AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
+ ConnectionId(self,0));
+
Mutex::ScopedLock l(lock);
for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
// Add new members when their URL notice arraives.
- if (std::find(left, left+nLeft, self) != left+nLeft)
+ if (find(left, left+nLeft, self) != left+nLeft)
broker = 0; // We have left the group, this is the final config change.
lock.notifyAll(); // Threads waiting for membership changes.
}
@@ -236,7 +262,8 @@ void Cluster::disconnect(sys::DispatchHandle& h) {
broker->shutdown();
}
-void Cluster::urlNotice(const MemberId& m, const std::string& url) {
+void Cluster::urlNotice(const MemberId& m, const string& url) {
+ QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
urls.insert(UrlMap::value_type(m,Url(url)));
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 4963400e10..630de97093 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -19,7 +19,6 @@
*
*/
-#include "qpid/cluster/types.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/PollableQueue.h"
#include "qpid/cluster/NoOpConnectionOutputHandler.h"
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 754b4abd58..96a5b3da43 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -184,33 +184,3 @@ ostream& operator<<(ostream& o, const ConnectionId& c) {
}
}} // namespace qpid::cluster
-
-
-// In proper namespace for ADL.
-
-std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) {
- const char* reasonString;
- switch (a.reason) {
- case CPG_REASON_JOIN: reasonString = " joined"; break;
- case CPG_REASON_LEAVE: reasonString = " left";break;
- case CPG_REASON_NODEDOWN: reasonString = " node-down";break;
- case CPG_REASON_NODEUP: reasonString = " node-up";break;
- case CPG_REASON_PROCDOWN: reasonString = " process-down";break;
- default: reasonString = "";
- }
- return o << qpid::cluster::MemberId(a.nodeid, a.pid) << reasonString;
-}
-
-std::ostream& operator<<(std::ostream& o, const cpg_name& name) {
- return o << std::string(name.value, name.length);
-}
-
-namespace std {
-ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
- for (cpg_address* p = a.first; p < a.first+a.second; ++p)
- o << *p << " ";
- return o;
-}
-}
-
-
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index fdc451fbbc..5ffd42e12a 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -158,8 +158,6 @@ class Cpg : public sys::IOHandle {
bool isShutdown;
};
-std::ostream& operator <<(std::ostream& out, const MemberId& id);
-
inline bool operator==(const cpg_name& a, const cpg_name& b) {
return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
}
@@ -167,12 +165,4 @@ inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b);
}} // namespace qpid::cluster
-// In proper namespaces for ADL
-std::ostream& operator <<(std::ostream& out, const cpg_name& name);
-std::ostream& operator<<(std::ostream& o, const cpg_address& a);
-namespace std {
-std::ostream& operator <<(std::ostream& out, std::pair<cpg_address*,int> addresses);
-}
-
-
#endif /*!CPG_H*/
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
new file mode 100644
index 0000000000..ff558842e4
--- /dev/null
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "types.h"
+#include "Event.h"
+#include "Cpg.h"
+#include "qpid/framing/Buffer.h"
+
+namespace qpid {
+namespace cluster {
+using framing::Buffer;
+
+const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/;
+
+Event::Event(EventType t, const ConnectionId c, const size_t s)
+ : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {}
+
+Event::Event(const MemberId& m, const char* d, size_t s)
+ : connection(m, 0), size(s-OVERHEAD), data(RefCountedBuffer::create(size))
+{
+ memcpy(data->get(), d, s);
+}
+
+void Event::mcast(const Cpg::Name& name, Cpg& cpg) {
+ char header[OVERHEAD];
+ Buffer b;
+ b.putOctet(type);
+ b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr()));
+ iovec iov[] = { { header, b.getPosition() }, { data.get(), size } };
+ cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+}
+
+
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
new file mode 100644
index 0000000000..3e4a19f7f3
--- /dev/null
+++ b/cpp/src/qpid/cluster/Event.h
@@ -0,0 +1,65 @@
+#ifndef QPID_CLUSTER_EVENT_H
+#define QPID_CLUSTER_EVENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Cpg.h"
+#include "qpid/RefCountedBuffer.h"
+
+namespace qpid {
+namespace cluster {
+
+// TODO aconway 2008-09-03: more efficient solution for shared
+// byte-stream data.
+//
+
+/**
+ * Events are sent to/received from the cluster.
+ * Refcounted so they can be stored on queues.
+ */
+struct Event {
+ public:
+ /** Create an event with for mcasting, with size bytes of space. */
+ Event(EventType t, const ConnectionId c, size_t size);
+
+ /** Create an event from delivered data. */
+ Event(const MemberId& m, const char* data, size_t size);
+
+ void mcast(const Cpg::Name& name, Cpg& cpg);
+
+ EventType getType() const { return type; }
+ ConnectionId getConnection() const { return connection; }
+ size_t getSize() const { return size; }
+ char* getData() { return data->get(); }
+
+ private:
+ static const size_t OVERHEAD;
+ EventType type;
+ ConnectionId connection;
+ size_t size;
+ RefCountedBuffer::intrusive_ptr data;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EVENT_H*/
diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h
index 29891da344..74da2df750 100644
--- a/cpp/src/qpid/cluster/PollableQueue.h
+++ b/cpp/src/qpid/cluster/PollableQueue.h
@@ -89,9 +89,13 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
batch.clear();
batch.swap(queue);
condition.clear();
- ScopedUnlock u(lock);
- callback(batch.begin(), batch.end()); // Process outside the lock to allow concurrent push.
- h.rewatch();
+ {
+ // Process outside the lock to allow concurrent push.
+ ScopedUnlock u(lock);
+ callback(batch.begin(), batch.end());
+ h.rewatch();
+ }
+ batch.clear();
}
}} // namespace qpid::cluster