diff options
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Completion.h | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Execution.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 38 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Future.h | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQMethodBody.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AccumulatedAck.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AccumulatedAck.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/ModelMethod.h | 3 | 
11 files changed, 72 insertions, 26 deletions
| diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 048c73c4b0..ae5810684d 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -85,7 +85,7 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran          throw ConnectionException(530, "Received odd number of elements in ranged mark");      } else {          for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { -            state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); +            state.ackRange(*i, *(++i));          }      }  } diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h index 000bba2138..68cff3f11a 100644 --- a/cpp/src/qpid/client/Completion.h +++ b/cpp/src/qpid/client/Completion.h @@ -43,8 +43,17 @@ public:          future.sync(*session);      } +    void wait() +    { +        future.wait(*session); +    } +      bool isComplete() { -        return future.isComplete(); +        return future.isComplete(*session); +    } + +    bool isCompleteUpTo() { +        return future.isCompleteUpTo(*session);      }  }; diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h index 809dcc7592..9caac45790 100644 --- a/cpp/src/qpid/client/Execution.h +++ b/cpp/src/qpid/client/Execution.h @@ -35,6 +35,8 @@ public:      virtual void sendFlushRequest() = 0;      virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;      virtual Demux& getDemux() = 0; +    virtual bool isComplete(const framing::SequenceNumber& id) = 0; +    virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;  };  }} diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 95cdc7032a..4e0ee05da2 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -63,7 +63,7 @@ void ExecutionHandler::handle(AMQFrame& frame)      AMQBody* body = frame.getBody();      if (!invoke(body, this)) {          if (!arriving) { -            arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); +            arriving = FrameSet::shared_ptr(new FrameSet(++incomingCounter));          }          arriving->append(frame);          if (arriving->isComplete()) { @@ -77,16 +77,12 @@ void ExecutionHandler::handle(AMQFrame& frame)  void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)  { -    SequenceNumber mark(cumulative); -    if (outgoing.lwm < mark) { -        outgoing.lwm = mark; -        completion.completed(outgoing.lwm); -    }      if (range.size() % 2) { //must be even number                  throw ConnectionException(530, "Received odd number of elements in ranged mark");      } else { -        //TODO: need to manage (record and accumulate) ranges such -        //that we can implictly move the mark when appropriate +        SequenceNumber mark(cumulative);         +        outgoingCompletionStatus.update(mark, range); +        completion.completed(outgoingCompletionStatus.mark);          //TODO: signal listeners of early notification?               } @@ -115,7 +111,7 @@ void ExecutionHandler::sync()  void ExecutionHandler::flushTo(const framing::SequenceNumber& point)  { -    if (point > outgoing.lwm) { +    if (point > outgoingCompletionStatus.mark) {          sendFlushRequest();      }          } @@ -128,7 +124,7 @@ void ExecutionHandler::sendFlushRequest()  void ExecutionHandler::syncTo(const framing::SequenceNumber& point)  { -    if (point > outgoing.lwm) { +    if (point > outgoingCompletionStatus.mark) {          sendSyncRequest();      }          } @@ -142,11 +138,11 @@ void ExecutionHandler::sendSyncRequest()  void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)  { -    if (id > completionStatus.mark) { +    if (id > incomingCompletionStatus.mark) {          if (cumulative) { -            completionStatus.update(completionStatus.mark, id); +            incomingCompletionStatus.update(incomingCompletionStatus.mark, id);          } else { -            completionStatus.update(id, id);             +            incomingCompletionStatus.update(id, id);                      }      }      if (send) { @@ -158,8 +154,8 @@ void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool  void ExecutionHandler::sendCompletion()  {      SequenceNumberSet range; -    completionStatus.collectRanges(range); -    AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range)); +    incomingCompletionStatus.collectRanges(range); +    AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range));      out(frame);      } @@ -170,7 +166,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker:  SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)  { -    SequenceNumber id = ++outgoing.hwm; +    SequenceNumber id = ++outgoingCounter;      if(l) {          completion.listenForResult(id, l);      } @@ -228,3 +224,13 @@ void ExecutionHandler::sendContent(const MethodContent& content)          out(header);         }  } + +bool ExecutionHandler::isComplete(const SequenceNumber& id) +{ +    return outgoingCompletionStatus.covers(id); +} + +bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id) +{ +    return outgoingCompletionStatus.mark >= id; +} diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 5f9cdff9d2..a3a3cde390 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -41,15 +41,16 @@ class ExecutionHandler :      public ChainableFrameHandler,      public Execution  { -    framing::Window incoming; -    framing::Window outgoing; +    framing::SequenceNumber incomingCounter; +    framing::AccumulatedAck incomingCompletionStatus; +    framing::SequenceNumber outgoingCounter; +    framing::AccumulatedAck outgoingCompletionStatus;      framing::FrameSet::shared_ptr arriving;      Correlator correlation;      CompletionTracker completion;      Demux demux;      framing::ProtocolVersion version;      uint64_t maxFrameSize; -    framing::AccumulatedAck completionStatus;      void complete(uint32_t mark, const framing::SequenceNumberSet& range);          void flush(); @@ -77,6 +78,9 @@ public:      void syncTo(const framing::SequenceNumber& point);      void flushTo(const framing::SequenceNumber& point); +    bool isComplete(const framing::SequenceNumber& id); +    bool isCompleteUpTo(const framing::SequenceNumber& id); +      void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }      Correlator& getCorrelator() { return correlation; }      CompletionTracker& getCompletionTracker() { return completion; } diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h index c2f3b426da..667a19e942 100644 --- a/cpp/src/qpid/client/Future.h +++ b/cpp/src/qpid/client/Future.h @@ -48,9 +48,16 @@ public:      void sync(SessionCore& session)      { -        if (!complete) { +        if (!isComplete(session)) { +            session.getExecution().syncTo(command); +            wait(session); +        } +    } + +    void wait(SessionCore& session) +    { +        if (!isComplete(session)) {              FutureCompletion callback; -            session.getExecution().flushTo(command);              session.getExecution().getCompletionTracker().listenForCompletion(                  command,                                                                       boost::bind(&FutureCompletion::completed, &callback) @@ -83,8 +90,12 @@ public:          }      } -    bool isComplete() { -        return complete; +    bool isComplete(SessionCore& session) { +        return complete || session.getExecution().isComplete(command); +    } + +    bool isCompleteUpTo(SessionCore& session) { +        return complete || session.getExecution().isCompleteUpTo(command);      }      void setCommandId(const framing::SequenceNumber& id) { command = id; } diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 8bad6ec374..f093e12594 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -101,6 +101,8 @@ Future SessionCore::send(const AMQBody& command)  {       checkClosed(); +    command.getMethod()->setSync(sync); +      Future f;      //any result/response listeners must be set before the command is sent      if (command.getMethod()->resultExpected()) { diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h index e195142221..09a5ea4f00 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.h +++ b/cpp/src/qpid/framing/AMQMethodBody.h @@ -64,6 +64,7 @@ class AMQMethodBody : public AMQBody {      virtual uint8_t type() const { return METHOD_BODY; }      virtual bool isSync() const { return false; /*only ModelMethods can have the sync flag set*/ } +    virtual void setSync(bool) const { /*only ModelMethods can have the sync flag set*/ }      AMQMethodBody* getMethod() { return this; }      const AMQMethodBody* getMethod() const { return this; } diff --git a/cpp/src/qpid/framing/AccumulatedAck.cpp b/cpp/src/qpid/framing/AccumulatedAck.cpp index 9daae5494c..219a68b96c 100644 --- a/cpp/src/qpid/framing/AccumulatedAck.cpp +++ b/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -100,6 +100,15 @@ void AccumulatedAck::collectRanges(SequenceNumberSet& set) const      }  } +void AccumulatedAck::update(const SequenceNumber cumulative, const SequenceNumberSet& range) +{ +    update(mark, cumulative); +    for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { +        update(*i, *(++i)); +    } +} + +  bool Range::contains(SequenceNumber i) const   {       return i >= start && i <= end;  diff --git a/cpp/src/qpid/framing/AccumulatedAck.h b/cpp/src/qpid/framing/AccumulatedAck.h index f75842968f..1f66197e2a 100644 --- a/cpp/src/qpid/framing/AccumulatedAck.h +++ b/cpp/src/qpid/framing/AccumulatedAck.h @@ -64,6 +64,7 @@ namespace qpid {              void clear();              bool covers(SequenceNumber tag) const;              void collectRanges(SequenceNumberSet& set) const; +            void update(const SequenceNumber cumulative, const SequenceNumberSet& range);          };          std::ostream& operator<<(std::ostream&, const Range&);          std::ostream& operator<<(std::ostream&, const AccumulatedAck&); diff --git a/cpp/src/qpid/framing/ModelMethod.h b/cpp/src/qpid/framing/ModelMethod.h index 8e2fafb019..f3c0fa5d65 100644 --- a/cpp/src/qpid/framing/ModelMethod.h +++ b/cpp/src/qpid/framing/ModelMethod.h @@ -30,13 +30,14 @@ namespace framing {  class ModelMethod : public AMQMethodBody   { -    ExecutionHeader header; +    mutable ExecutionHeader header;  public:          virtual ~ModelMethod() {}      virtual void encode(Buffer& buffer) const { header.encode(buffer); }      virtual void decode(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); }      virtual uint32_t size() const { return header.size(); }       virtual bool isSync() const { return header.getSync(); } +    virtual void setSync(bool on) const { header.setSync(on); }      ExecutionHeader& getHeader() { return header; }       const ExecutionHeader& getHeader()  const { return header; }   }; | 
