diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 159 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 116 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 102 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 87 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Dispatchable.h | 52 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameHandler.h | 39 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/APRAcceptor.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/PosixAcceptor.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 16 |
11 files changed, 567 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp new file mode 100644 index 0000000000..30073c4551 --- /dev/null +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -0,0 +1,159 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Cluster.h" +#include "Cpg.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ClusterNotifyBody.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> +#include <algorithm> +#include <iterator> + +namespace qpid { +namespace cluster { +using namespace qpid::framing; +using namespace qpid::sys; +using namespace std; + +ostream& operator <<(ostream& out, const Cluster& cluster) { + return out << cluster.name.str() << "(" << cluster.self << ")"; +} + +void Cluster::notify() { + // TODO aconway 2007-06-25: Use proxy here. + AMQFrame frame(version, 0, + make_shared_ptr(new ClusterNotifyBody(version, url))); + handle(frame); +} + +Cluster::Cluster( + const std::string& name_, const std::string& url_, FrameHandler& next_, + ProtocolVersion ver) + : name(name_), url(url_), version(ver), + cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), + boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), + next(next_) +{ + self=Id(cpg->getLocalNoideId(), getpid()); + QPID_LOG(trace, *this << " Joining cluster."); + cpg->join(name); + notify(); + dispatcher=Thread(*this); +} + +Cluster::~Cluster() { + try { + QPID_LOG(trace, *this << " Leaving cluster."); + cpg->leave(name); + cpg.reset(); + dispatcher.join(); + } catch (const std::exception& e) { + QPID_LOG(error, "Exception leaving cluster " << e.what()); + } +} + +void Cluster::handle(AMQFrame& frame) { + QPID_LOG(trace, *this << " SEND: " << frame); + Buffer buf(frame.size()); + frame.encode(buf); + buf.flip(); + iovec iov = { buf.start(), frame.size() }; + cpg->mcast(name, &iov, 1); +} + +size_t Cluster::size() const { + Mutex::ScopedLock l(lock); + return members.size(); +} + +Cluster::MemberList Cluster::getMembers() const { + Mutex::ScopedLock l(lock); + MemberList result(members.size()); + std::transform(members.begin(), members.end(), result.begin(), + boost::bind(&MemberMap::value_type::second, _1)); + return result; +} + +void Cluster::cpgDeliver( + cpg_handle_t /*handle*/, + struct cpg_name* /* group */, + uint32_t nodeid, + uint32_t pid, + void* msg, + int msg_len) +{ + Id from(nodeid, pid); + Buffer buf(static_cast<char*>(msg), msg_len); + AMQFrame frame; + frame.decode(buf); + QPID_LOG(trace, *this << " RECV: " << frame); + // TODO aconway 2007-06-20: use visitor pattern. + ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); + if (notifyIn) { + Mutex::ScopedLock l(lock); + members[from].reset(new Member(notifyIn->getUrl())); + lock.notifyAll(); + } + else + next.handle(frame); +} + +void Cluster::cpgConfigChange( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address *ccMembers, int nMembers, + struct cpg_address *left, int nLeft, + struct cpg_address *joined, int nJoined +) +{ + QPID_LOG( + trace, + *this << " Configuration change. " << endl + << " Joined: " << make_pair(joined, nJoined) << endl + << " Left: " << make_pair(left, nLeft) << endl + << " Current: " << make_pair(ccMembers, nMembers)); + + { + Mutex::ScopedLock l(lock); + // Erase members that left. + for (int i = 0; i < nLeft; ++i) + members.erase(Id(left[i])); + lock.notifyAll(); + } + + // If there are new members (other than myself) then notify. + for (int i=0; i< nJoined; ++i) { + if (Id(joined[i]) != self) { + notify(); + break; + } + } + + // Note: New members are be added to my map when cpgDeliver + // gets a cluster.notify frame. +} + +void Cluster::run() { + cpg->dispatchBlocking(); +} + +}} // namespace qpid::cluster + + + diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h new file mode 100644 index 0000000000..1cbbb249f2 --- /dev/null +++ b/cpp/src/qpid/cluster/Cluster.h @@ -0,0 +1,116 @@ +#ifndef QPID_CLUSTER_CLUSTER_H +#define QPID_CLUSTER_CLUSTER_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/cluster/Cpg.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Runnable.h" +#include "qpid/shared_ptr.h" +#include "qpid/framing/ProtocolVersion.h" +#include <boost/scoped_ptr.hpp> +#include <map> +#include <vector> + +namespace qpid { +namespace cluster { + +/** + * Represents a cluster. Creating an instance joins current process + * to the cluster. + */ +class Cluster : public framing::FrameHandler, private sys::Runnable { + public: + /** Details of a cluster member */ + struct Member { + Member(const std::string& url_) : url(url_) {} + std::string url; + }; + + typedef std::vector<shared_ptr<const Member> > MemberList; + + /** + * Join a cluster. + * @param name of the cluster. + * @param url of this broker, sent to the cluster. + * @param next handler receives the frame when it has been + * acknowledged by the cluster. + */ + Cluster(const std::string& name, + const std::string& url, + framing::FrameHandler& next, + framing::ProtocolVersion); + + ~Cluster(); + + /** Multicast a frame to the cluster. */ + void handle(framing::AMQFrame&); + + /** Get the current cluster membership. */ + MemberList getMembers() const; + + /** Number of members in the cluster. */ + size_t size() const; + + private: + typedef Cpg::Id Id; + typedef std::map<Id, shared_ptr<Member> > MemberMap; + + void run(); + void notify(); + + void cpgDeliver( + cpg_handle_t /*handle*/, + struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/); + + void cpgConfigChange( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address */*members*/, int /*nMembers*/, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + ); + + Id self; + Cpg::Name name; + std::string url; + framing::ProtocolVersion version; + boost::scoped_ptr<Cpg> cpg; + framing::FrameHandler& next; + MemberMap members; + sys::Thread dispatcher; + + protected: + // Allow access from ClusterTest subclass. + mutable sys::Monitor lock; + + friend std::ostream& operator <<(std::ostream&, const Cluster&); +}; + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_CLUSTER_H*/ diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 858d25f37c..a979ce1eeb 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -17,12 +17,86 @@ */ #include "Cpg.h" +#include "qpid/sys/Mutex.h" +#include <vector> +#include <limits> +#include <iterator> namespace qpid { namespace cluster { using namespace std; +// Global vector of Cpg pointers by handle. +// TODO aconway 2007-06-12: Replace this with cpg_get/set_context, +// coming in in RHEL 5.1. +class Cpg::Handles +{ + public: + void put(cpg_handle_t handle, Cpg* object) { + sys::Mutex::ScopedLock l(lock); + assert(object); + uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. + if (index >= handles.size()) + handles.resize(index+1, 0); + handles[index] = object; + } + + Cpg* get(cpg_handle_t handle) { + sys::Mutex::ScopedLock l(lock); + uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. + assert(index < handles.size()); + assert(handles[index]); + return handles[index]; + } + + private: + sys::Mutex lock; + vector<Cpg*> handles; +}; + +Cpg::Handles Cpg::handles; + +// Global callback functions call per-object callbacks via handles vector. +void Cpg::globalDeliver ( + cpg_handle_t handle, + struct cpg_name *group, + uint32_t nodeid, + uint32_t pid, + void* msg, + int msg_len) +{ + handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len); +} + +void Cpg::globalConfigChange( + cpg_handle_t handle, + struct cpg_name *group, + struct cpg_address *members, int nMembers, + struct cpg_address *left, int nLeft, + struct cpg_address *joined, int nJoined +) +{ + handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); +} + +Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c) +{ + cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; + check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); + handles.put(handle, this); +} + +Cpg::~Cpg() { + try { + check(cpg_finalize(handle), "Error in shutdown of CPG"); + } + catch (...) { + handles.put(handle, 0); + throw; + } +} + string Cpg::errorStr(cpg_error_t err, const std::string& msg) { switch (err) { case CPG_OK: return msg+": ok"; @@ -56,6 +130,34 @@ std::string Cpg::cantMcastMsg(const Name& group) { return "Cannot mcast to CPG group "+group.str(); } +uint32_t Cpg::getLocalNoideId() const { + unsigned int nodeid; + check(cpg_local_get(handle, &nodeid), "Cannot get local node ID"); + assert(nodeid <= std::numeric_limits<uint32_t>::max()); + return nodeid; +} + +ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) { + ostream_iterator<Cpg::Id> i(o, " "); + std::copy(a.first, a.first+a.second, i); + return o; +} + +static int popbyte(uint32_t& n) { + uint8_t b=n&0xff; + n>>=8; + return b; +} + +ostream& operator <<(ostream& out, const Cpg::Id& id) { + uint32_t node=id.nodeId(); + out << popbyte(node); + for (int i = 0; i < 3; i++) + out << "." << popbyte(node); + return out << ":" << id.pid(); +} + + }} // namespace qpid::cpg diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index 6e61fa8a6e..6b157301a7 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -19,7 +19,9 @@ * */ -#include <stdexcept> +#include "qpid/Exception.h" +#include "qpid/cluster/Dispatchable.h" +#include <boost/function.hpp> #include <cassert> #ifdef CLUSTER extern "C" { @@ -34,11 +36,10 @@ namespace cluster { * Manages a single CPG handle, initialized in ctor, finialzed in destructor. * On error all functions throw Cpg::Exception */ -class Cpg { +class Cpg : public Dispatchable { public: - // FIXME aconway 2007-06-01: qpid::Exception - struct Exception : public std::runtime_error { - Exception(const std::string& msg) : runtime_error(msg) {} + struct Exception : public ::qpid::Exception { + Exception(const std::string& msg) : ::qpid::Exception(msg) {} }; struct Name : public cpg_name { @@ -54,26 +55,45 @@ class Cpg { std::string str() const { return std::string(value, length); } }; - static inline std::string str(const cpg_name& n) { + struct Id { + uint64_t id; + Id() : id(0) {} + Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; } + Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {} + + operator uint64_t() const { return id; } + uint32_t nodeId() const { return id >> 32; } + pid_t pid() const { return id & 0xFFFF; } + }; + + static std::string str(const cpg_name& n) { return std::string(n.value, n.length); } - // TODO aconway 2007-06-01: when cpg handle supports a context pointer - // use callback objects (boost::function) instead of free functions. - // + typedef boost::function<void ( + cpg_handle_t /*handle*/, + struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/)> DeliverFn; + + typedef boost::function<void ( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address */*members*/, int /*nMembers*/, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + )> ConfigChangeFn; + /** Open a CPG handle. *@param deliver - free function called when a message is delivered. *@param reconfig - free function called when CPG configuration changes. */ - Cpg(cpg_deliver_fn_t deliver, cpg_confchg_fn_t reconfig) { - cpg_callbacks_t callbacks = { deliver, reconfig }; - check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); - } + Cpg(DeliverFn deliver, ConfigChangeFn reconfig); - /** Disconnect */ - ~Cpg() { - check(cpg_finalize(handle), "Cannot finalize CPG"); - } + /** Disconnect from CPG. */ + ~Cpg(); /** Dispatch CPG events. *@param type one of @@ -85,6 +105,10 @@ class Cpg { check(cpg_dispatch(handle,type), "Error in CPG dispatch"); } + void dispatchOne() { dispatch(CPG_DISPATCH_ONE); } + void dispatchAll() { dispatch(CPG_DISPATCH_ALL); } + void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); } + void join(const Name& group) { check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group)); }; @@ -99,7 +123,14 @@ class Cpg { cantMcastMsg(group)); } + cpg_handle_t getHandle() const { return handle; } + + uint32_t getLocalNoideId() const; + private: + class Handles; + friend class Handles; + static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); static std::string cantLeaveMsg(const Name&); @@ -110,9 +141,31 @@ class Cpg { if (result != CPG_OK) throw Exception(errorStr(result, msg)); } + + static void globalDeliver( + cpg_handle_t /*handle*/, + struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/); + + static void globalConfigChange( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address */*members*/, int /*nMembers*/, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + ); + + static Handles handles; cpg_handle_t handle; + DeliverFn deliver; + ConfigChangeFn configChange; }; +std::ostream& operator <<(std::ostream& out, const Cpg::Id& id); +std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses); }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Dispatchable.h b/cpp/src/qpid/cluster/Dispatchable.h new file mode 100644 index 0000000000..e7f0df4218 --- /dev/null +++ b/cpp/src/qpid/cluster/Dispatchable.h @@ -0,0 +1,52 @@ +#ifndef QPID_CLUSTER_DISPATCHABLE_H +#define QPID_CLUSTER_DISPATCHABLE_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace cluster { + +/** + * Interface for classes that have some "events" that need dispatching + * in a thread. + */ +class Dispatchable +{ + public: + virtual ~Dispatchable() {} + + /** Dispatch one event in current thread. */ + virtual void dispatchOne() = 0; + /** Dispatch all available events, don't block. */ + virtual void dispatchAll() = 0; + /** Blocking loop to dispatch cluster events */ + virtual void dispatchBlocking() = 0; + + /** Wait for at least one event, then dispatch all available events. + * Don't block. Useful for tests. + */ + virtual void dispatchSome() { dispatchOne(); dispatchAll(); } + +}; + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_DISPATCHABLE_H*/ diff --git a/cpp/src/qpid/framing/FrameHandler.h b/cpp/src/qpid/framing/FrameHandler.h new file mode 100644 index 0000000000..817c569119 --- /dev/null +++ b/cpp/src/qpid/framing/FrameHandler.h @@ -0,0 +1,39 @@ +#ifndef QPID_FRAMING_FRAMEHANDLER_H +#define QPID_FRAMING_FRAMEHANDLER_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace framing { +class AMQFrame; + +class FrameHandler : private boost::noncopyable { + public: + virtual ~FrameHandler() {} + virtual void handle(AMQFrame& frame) = 0; +}; + +}} + + +#endif /*!QPID_FRAMING_FRAMEHANDLER_H*/ diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 5e624a956e..8d6bca8f29 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -36,6 +36,7 @@ class Acceptor : public qpid::SharedObject<Acceptor> static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; + virtual std::string getHost() const = 0; virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0; virtual void shutdown() = 0; }; diff --git a/cpp/src/qpid/sys/apr/APRAcceptor.cpp b/cpp/src/qpid/sys/apr/APRAcceptor.cpp index 0f0853b35d..8662e602c2 100644 --- a/cpp/src/qpid/sys/apr/APRAcceptor.cpp +++ b/cpp/src/qpid/sys/apr/APRAcceptor.cpp @@ -35,6 +35,7 @@ class APRAcceptor : public Acceptor public: APRAcceptor(int16_t port, int backlog, int threads, bool trace); virtual uint16_t getPort() const; + virtual std::string getHost() const; virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory); virtual void shutdown(); @@ -72,6 +73,12 @@ APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); } +std::string APRAcceptor::getHost() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->hostname; +} + uint16_t APRAcceptor::getPort() const { apr_sockaddr_t* address; CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); diff --git a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp index 1a5fceb56e..cbda216cfc 100644 --- a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp @@ -52,6 +52,7 @@ class EventChannelAcceptor : public Acceptor { ); uint16_t getPort() const; + std::string getHost() const; void run(ConnectionInputHandlerFactory* factory); @@ -100,6 +101,10 @@ uint16_t EventChannelAcceptor::getPort() const { return port; // Immutable no need for lock. } +uint16_t EventChannelAcceptor::getPort() const { + return port; // Immutable no need for lock. +} + void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) { { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/sys/posix/PosixAcceptor.cpp b/cpp/src/qpid/sys/posix/PosixAcceptor.cpp index af200d393d..0575380a14 100644 --- a/cpp/src/qpid/sys/posix/PosixAcceptor.cpp +++ b/cpp/src/qpid/sys/posix/PosixAcceptor.cpp @@ -32,6 +32,7 @@ void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } class PosixAcceptor : public Acceptor { public: virtual uint16_t getPort() const { fail(); return 0; } + virtual std::string getPort() const { fail(); return std::string(); } virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); } virtual void shutdown() { fail(); } }; diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index 39651fa821..50cbfa7c4d 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -112,7 +112,21 @@ int Socket::listen(int port, int backlog) return ntohs(name.sin_port); } - +std::string getHost() const { + // TODO aconway 2007-06-11: Won't work for ip6 + struct sockaddr_in name; + socklen_t namelen = sizeof(name); + if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) + throw QPID_POSIX_ERROR(errno); + uint32_t addr = name.sin_host.s_addr; + ostringstream os; + os << uint8_t(addr >> 24) << '.' + << uint8_t(addr >> 16) << '.' + << uint8_t(addr >> 8) << '.' + << uint8_t(addr); + return os.str(); +} + int Socket::fd() { return socket; |
