diff options
Diffstat (limited to 'cpp/lib/broker/SessionHandlerImpl.h')
| -rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.h | 268 | 
1 files changed, 268 insertions, 0 deletions
| diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h new file mode 100644 index 0000000000..4b89dbeaa1 --- /dev/null +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -0,0 +1,268 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _SessionHandlerImpl_ +#define _SessionHandlerImpl_ + +#include <map> +#include <sstream> +#include <vector> +#include <exception> +#include <AMQFrame.h> +#include <AMQP_ClientProxy.h> +#include <AMQP_ServerOperations.h> +#include <AutoDelete.h> +#include <ExchangeRegistry.h> +#include <BrokerChannel.h> +#include <ConnectionToken.h> +#include <DirectExchange.h> +#include <OutputHandler.h> +#include <ProtocolInitiation.h> +#include <QueueRegistry.h> +#include <sys/SessionContext.h> +#include <sys/SessionHandler.h> +#include <sys/TimeoutHandler.h> +#include <TopicExchange.h> + +namespace qpid { +namespace broker { + +struct ChannelException : public std::exception { +    u_int16_t code; +    string text; +    ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} +    ~ChannelException() throw() {} +    const char* what() const throw() { return text.c_str(); } +}; + +struct ConnectionException : public std::exception { +    u_int16_t code; +    string text; +    ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} +    ~ConnectionException() throw() {} +    const char* what() const throw() { return text.c_str(); } +}; + +class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,  +                           public virtual qpid::framing::AMQP_ServerOperations,  +                           public virtual ConnectionToken +{ +    typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; +    typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + +    qpid::sys::SessionContext* context; +    qpid::framing::AMQP_ClientProxy client; +    QueueRegistry* queues; +    ExchangeRegistry* const exchanges; +    AutoDelete* const cleaner; +    const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + +    std::auto_ptr<BasicHandler> basicHandler; +    std::auto_ptr<ChannelHandler> channelHandler; +    std::auto_ptr<ConnectionHandler> connectionHandler; +    std::auto_ptr<ExchangeHandler> exchangeHandler; +    std::auto_ptr<QueueHandler> queueHandler; +    std::auto_ptr<TxHandler> txHandler; + +    std::map<u_int16_t, Channel*> channels; +    std::vector<Queue::shared_ptr> exclusiveQueues; + +    u_int32_t framemax; +    u_int16_t heartbeat; + +    void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); +    void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); +    void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + +    Channel* getChannel(u_int16_t channel); +    /** +     * Get named queue, never returns 0. +     * @return: named queue or default queue for channel if name="" +     * @exception: ChannelException if no queue of that name is found. +     * @exception: ConnectionException if no queue specified and channel has not declared one. +     */ +    Queue::shared_ptr getQueue(const string& name, u_int16_t channel); + +    Exchange::shared_ptr findExchange(const string& name); +     +  public: +    SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues,  +                       ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout); +    virtual void received(qpid::framing::AMQFrame* frame); +    virtual void initiated(qpid::framing::ProtocolInitiation* header); +    virtual void idleOut(); +    virtual void idleIn(); +    virtual void closed(); +    virtual ~SessionHandlerImpl(); + +    class ConnectionHandlerImpl : public virtual ConnectionHandler{ +        SessionHandlerImpl* parent; +      public: +        inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + +        // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 +        virtual void startOk(u_int16_t channel, const qpid::framing::FieldTable& clientProperties, const string& mechanism,  +                             const string& response, const string& locale);  +                 +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void secureOk(u_int16_t channel, const string& response);  +                 +        virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);  +                 +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void open(u_int16_t channel, const string& virtualHost, const string& capabilities, bool insist);  +                 +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText, u_int16_t classId,  +                           u_int16_t methodId);  +  +        virtual void closeOk(u_int16_t channel);  +                 +        virtual ~ConnectionHandlerImpl(){} +    }; +     +    class ChannelHandlerImpl : public virtual ChannelHandler{ +        SessionHandlerImpl* parent; +      public: +        inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} +         +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void open(u_int16_t channel, const string& outOfBand);  +         +        virtual void flow(u_int16_t channel, bool active);  +                 +        virtual void flowOk(u_int16_t channel, bool active);  +                 +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void close(u_int16_t channel, u_int16_t replyCode, const string& replyText,  +                           u_int16_t classId, u_int16_t methodId);  +                 +        virtual void closeOk(u_int16_t channel);  +                 +        virtual ~ChannelHandlerImpl(){} +    }; +     +    class ExchangeHandlerImpl : public virtual ExchangeHandler{ +        SessionHandlerImpl* parent; +      public: +        inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} +         +        // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 +        virtual void declare(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& type,  +                             bool passive, bool durable, bool autoDelete, bool internal, bool nowait,  +                             const qpid::framing::FieldTable& arguments);  +                 +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& exchange, bool ifUnused, bool nowait);  +                 +        virtual ~ExchangeHandlerImpl(){} +    }; + +     +    class QueueHandlerImpl : public virtual QueueHandler{ +        SessionHandlerImpl* parent; +      public: +        inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} +         +        // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 +        virtual void declare(u_int16_t channel, u_int16_t ticket, const string& queue,  +                             bool passive, bool durable, bool exclusive,  +                             bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments);  +                 +        // Change to match new code generator function signature (adding const to string& and FieldTable&) - kpvdr 2006-11-20 +        virtual void bind(u_int16_t channel, u_int16_t ticket, const string& queue,  +                          const string& exchange, const string& routingKey, bool nowait,  +                          const qpid::framing::FieldTable& arguments);  +                 +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void purge(u_int16_t channel, u_int16_t ticket, const string& queue,  +                           bool nowait);  +                 +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void delete_(u_int16_t channel, u_int16_t ticket, const string& queue, bool ifUnused, bool ifEmpty,  +                             bool nowait);  + +        virtual ~QueueHandlerImpl(){} +    }; + +    class BasicHandlerImpl : public virtual BasicHandler{ +        SessionHandlerImpl* parent; +      public: +        inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} +         +        virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);  +                     +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void consume(u_int16_t channel, u_int16_t ticket, const string& queue, const string& consumerTag,  +                             bool noLocal, bool noAck, bool exclusive, bool nowait);  +         +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void cancel(u_int16_t channel, const string& consumerTag, bool nowait);  +                 +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void publish(u_int16_t channel, u_int16_t ticket, const string& exchange, const string& routingKey,  +                             bool mandatory, bool immediate);  +                 +        // Change to match new code generator function signature (adding const to string&) - kpvdr 2006-11-20 +        virtual void get(u_int16_t channel, u_int16_t ticket, const string& queue, bool noAck);  +                 +        virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);  +                 +        virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);  +                 +        virtual void recover(u_int16_t channel, bool requeue);  +                 +        virtual ~BasicHandlerImpl(){} +    }; + +    class TxHandlerImpl : public virtual TxHandler{ +        SessionHandlerImpl* parent; +    public: +        TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} +        virtual ~TxHandlerImpl() {} +        virtual void select(u_int16_t channel); +        virtual void commit(u_int16_t channel); +        virtual void rollback(u_int16_t channel); +    }; + + +    inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); } +    inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); } +    inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); } +    inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); } +    inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); } +    inline virtual TxHandler* getTxHandler(){ return txHandler.get(); }        +  +    inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }        +    inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }        +    inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }        +    inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }        +    inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }  +     +    // Temporary add-in to resolve version conflicts: AMQP v8.0 still defines class Test; +    // however v0.9 will not - kpvdr 2006-11-17       +    inline virtual TestHandler* getTestHandler(){ throw ConnectionException(540, "Test class not implemented"); }        +}; + +} +} + + +#endif | 
