/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include #include #include #include #include #include #include "OptionParser.h" using namespace qpid::messaging; using namespace qpid::types; typedef std::vector string_vector; struct Options : OptionParser { std::string url; std::string address; int timeout; int count; std::string id; std::string replyto; string_vector properties; string_vector entries; std::string content; std::string connectionOptions; Options() : OptionParser("Usage: spout [OPTIONS] ADDRESS", "Send messages to the specified address"), url("127.0.0.1"), timeout(0), count(1) { add("broker,b", url, "url of broker to connect to"); add("timeout,t", timeout, "exit after the specified time"); add("count,c", count, "stop after count messages have been sent, zero disables"); add("id,i", id, "use the supplied id instead of generating one"); add("reply-to", replyto, "specify reply-to address"); add("property,P", properties, "specify message property"); add("map,M", entries, "specify entry for map content"); add("content", content, "specify textual content"); add("connection-options", connectionOptions, "connection options string in the form {name1:value1, name2:value2}"); } static bool nameval(const std::string& in, std::string& name, std::string& value) { std::string::size_type i = in.find("="); if (i == std::string::npos) { name = in; return false; } else { name = in.substr(0, i); if (i+1 < in.size()) { value = in.substr(i+1); return true; } else { return false; } } } static void setProperty(Message& message, const std::string& property) { std::string name; std::string value; if (nameval(property, name, value)) { message.getProperties()[name] = value; } else { message.getProperties()[name] = Variant(); } } void setProperties(Message& message) const { for (string_vector::const_iterator i = properties.begin(); i != properties.end(); ++i) { setProperty(message, *i); } } void setEntries(Variant::Map& content) const { for (string_vector::const_iterator i = entries.begin(); i != entries.end(); ++i) { std::string name; std::string value; if (nameval(*i, name, value)) { content[name] = value; } else { content[name] = Variant(); } } } bool checkAddress() { if (getArguments().empty()) { error("Address is required"); return false; } else { address = getArguments()[0]; return true; } } }; int main(int argc, char** argv) { Options options; if (options.parse(argc, argv) && options.checkAddress()) { Connection connection(options.url, options.connectionOptions); try { connection.open(); Session session = connection.createSession(); Sender sender = session.createSender(options.address); Message message; options.setProperties(message); if (options.entries.size()) { Variant::Map content; options.setEntries(content); encode(content, message); } else if (options.content.size()) { message.setContent(options.content); message.setContentType("text/plain"); } std::time_t start = std::time(0); for (int count = 0; (count < options.count || options.count == 0) && (options.timeout == 0 || std::difftime(std::time(0), start) < options.timeout); count++) { if (!options.replyto.empty()) message.setReplyTo(Address(options.replyto)); std::string id = options.id.empty() ? Uuid(true).str() : options.id; std::stringstream spoutid; spoutid << id << ":" << count; message.getProperties()["spout-id"] = spoutid.str(); sender.send(message); } session.sync(); connection.close(); return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; connection.close(); } } return 1; }