summaryrefslogtreecommitdiff
path: root/src/mongo/executor/remote_command_response.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2019-05-13 18:24:36 -0400
committerJason Carey <jcarey@argv.me>2019-06-06 09:00:28 -0400
commitefa1ae064b9206f82136a8d14cbb86d47e8754b1 (patch)
treec4666fa197f837b5a0feaa8f980299a8eed7146a /src/mongo/executor/remote_command_response.cpp
parentb1ff28c63836aa13112cf3499574160a5950c6ec (diff)
downloadmongo-efa1ae064b9206f82136a8d14cbb86d47e8754b1.tar.gz
SERVER-41133 Add TE::scheduleRemoteCommandOnAny
Add support for a mode for the task executor where rather than targetting a single host, we target any of a set of hosts. This should behave identically to scheduleRemoteCommand, except that we concurrently get() connections from the connection pool for each host, preferring the first which is available
Diffstat (limited to 'src/mongo/executor/remote_command_response.cpp')
-rw-r--r--src/mongo/executor/remote_command_response.cpp92
1 files changed, 70 insertions, 22 deletions
diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp
index cfe23390805..8baafdb3d67 100644
--- a/src/mongo/executor/remote_command_response.cpp
+++ b/src/mongo/executor/remote_command_response.cpp
@@ -38,48 +38,37 @@
namespace mongo {
namespace executor {
-RemoteCommandResponse::RemoteCommandResponse(ErrorCodes::Error code, std::string reason)
+RemoteCommandResponseBase::RemoteCommandResponseBase(ErrorCodes::Error code, std::string reason)
: status(code, reason){};
-RemoteCommandResponse::RemoteCommandResponse(ErrorCodes::Error code,
- std::string reason,
- Milliseconds millis)
+RemoteCommandResponseBase::RemoteCommandResponseBase(ErrorCodes::Error code,
+ std::string reason,
+ Milliseconds millis)
: elapsedMillis(millis), status(code, reason) {}
-RemoteCommandResponse::RemoteCommandResponse(Status s) : status(std::move(s)) {
+RemoteCommandResponseBase::RemoteCommandResponseBase(Status s) : status(std::move(s)) {
invariant(!isOK());
};
-RemoteCommandResponse::RemoteCommandResponse(Status s, Milliseconds millis)
+RemoteCommandResponseBase::RemoteCommandResponseBase(Status s, Milliseconds millis)
: elapsedMillis(millis), status(std::move(s)) {
invariant(!isOK());
};
-RemoteCommandResponse::RemoteCommandResponse(BSONObj dataObj, Milliseconds millis)
+RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis)
: data(std::move(dataObj)), elapsedMillis(millis) {
// The buffer backing the default empty BSONObj has static duration so it is effectively
// owned.
invariant(data.isOwned() || data.objdata() == BSONObj().objdata());
};
-RemoteCommandResponse::RemoteCommandResponse(Message messageArg,
- BSONObj dataObj,
- Milliseconds millis)
- : message(std::make_shared<const Message>(std::move(messageArg))),
- data(std::move(dataObj)),
- elapsedMillis(millis) {
- if (!data.isOwned()) {
- data.shareOwnershipWith(message->sharedBuffer());
- }
-}
-
// TODO(amidvidy): we currently discard output docs when we use this constructor. We should
// have RCR hold those too, but we need more machinery before that is possible.
-RemoteCommandResponse::RemoteCommandResponse(const rpc::ReplyInterface& rpcReply,
- Milliseconds millis)
- : RemoteCommandResponse(rpcReply.getCommandReply(), std::move(millis)) {}
+RemoteCommandResponseBase::RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply,
+ Milliseconds millis)
+ : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis)) {}
-bool RemoteCommandResponse::isOK() const {
+bool RemoteCommandResponseBase::isOK() const {
return status.isOK();
}
@@ -104,5 +93,64 @@ std::ostream& operator<<(std::ostream& os, const RemoteCommandResponse& response
return os << response.toString();
}
+RemoteCommandResponse::RemoteCommandResponse(const RemoteCommandOnAnyResponse& other)
+ : RemoteCommandResponseBase(other) {}
+
+RemoteCommandOnAnyResponse::RemoteCommandOnAnyResponse(boost::optional<HostAndPort> hp,
+ ErrorCodes::Error code,
+ std::string reason)
+ : RemoteCommandResponseBase(code, std::move(reason)), target(std::move(hp)) {}
+
+RemoteCommandOnAnyResponse::RemoteCommandOnAnyResponse(boost::optional<HostAndPort> hp,
+ ErrorCodes::Error code,
+ std::string reason,
+ Milliseconds millis)
+ : RemoteCommandResponseBase(code, std::move(reason), millis), target(std::move(hp)) {}
+
+RemoteCommandOnAnyResponse::RemoteCommandOnAnyResponse(boost::optional<HostAndPort> hp, Status s)
+ : RemoteCommandResponseBase(std::move(s)), target(std::move(hp)) {}
+
+RemoteCommandOnAnyResponse::RemoteCommandOnAnyResponse(boost::optional<HostAndPort> hp,
+ Status s,
+ Milliseconds millis)
+ : RemoteCommandResponseBase(std::move(s), millis), target(std::move(hp)) {}
+
+RemoteCommandOnAnyResponse::RemoteCommandOnAnyResponse(HostAndPort hp,
+ BSONObj dataObj,
+ Milliseconds millis)
+ : RemoteCommandResponseBase(std::move(dataObj), millis), target(std::move(hp)) {}
+
+RemoteCommandOnAnyResponse::RemoteCommandOnAnyResponse(HostAndPort hp,
+ const rpc::ReplyInterface& rpcReply,
+ Milliseconds millis)
+ : RemoteCommandResponseBase(rpcReply, millis), target(std::move(hp)) {}
+
+RemoteCommandOnAnyResponse::RemoteCommandOnAnyResponse(boost::optional<HostAndPort> hp,
+ const RemoteCommandResponse& other)
+ : RemoteCommandResponseBase(other), target(std::move(hp)) {}
+
+bool RemoteCommandOnAnyResponse::operator==(const RemoteCommandOnAnyResponse& rhs) const {
+ if (this == &rhs) {
+ return true;
+ }
+ SimpleBSONObjComparator bsonComparator;
+ return bsonComparator.evaluate(data == rhs.data) && elapsedMillis == rhs.elapsedMillis &&
+ target == rhs.target;
+}
+
+bool RemoteCommandOnAnyResponse::operator!=(const RemoteCommandOnAnyResponse& rhs) const {
+ return !(*this == rhs);
+}
+
+std::string RemoteCommandOnAnyResponse::toString() const {
+ return str::stream() << "RemoteOnAnyResponse -- "
+ << " cmd:" << data.toString() << " target: "
+ << (!target ? StringData("[none]") : StringData(target->toString()));
+}
+
+std::ostream& operator<<(std::ostream& os, const RemoteCommandOnAnyResponse& response) {
+ return os << response.toString();
+}
+
} // namespace executor
} // namespace mongo