diff options
Diffstat (limited to 'cpp')
45 files changed, 5507 insertions, 2 deletions
diff --git a/cpp/include/qmf/Agent.h b/cpp/include/qmf/Agent.h new file mode 100644 index 0000000000..8ddea0fae9 --- /dev/null +++ b/cpp/include/qmf/Agent.h @@ -0,0 +1,84 @@ +#ifndef QMF_AGENT_H +#define QMF_AGENT_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qmf/exceptions.h" +#include "qpid/messaging/Duration.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class AgentImpl; + class ConsoleEvent; + class Query; + class DataAddr; + class SchemaId; + class Schema; + + class Agent : public qmf::Handle<AgentImpl> { + public: + QMF_EXTERN Agent(AgentImpl* impl = 0); + QMF_EXTERN Agent(const Agent&); + QMF_EXTERN Agent& operator=(const Agent&); + QMF_EXTERN ~Agent(); + + QMF_EXTERN std::string getName() const; + QMF_EXTERN uint32_t getEpoch() const; + QMF_EXTERN std::string getVendor() const; + QMF_EXTERN std::string getProduct() const; + QMF_EXTERN std::string getInstance() const; + QMF_EXTERN const qpid::types::Variant& getAttribute(const std::string&) const; + QMF_EXTERN const qpid::types::Variant::Map& getAttributes() const; + + QMF_EXTERN ConsoleEvent query(const Query&, qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE); + QMF_EXTERN ConsoleEvent query(const std::string&, qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE); + QMF_EXTERN uint32_t queryAsync(const Query&); + QMF_EXTERN uint32_t queryAsync(const std::string&); + + QMF_EXTERN ConsoleEvent callMethod(const std::string&, const qpid::types::Variant::Map&, const DataAddr&, + qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE); + QMF_EXTERN uint32_t callMethodAsync(const std::string&, const qpid::types::Variant::Map&, const DataAddr&); + + QMF_EXTERN uint32_t getPackageCount() const; + QMF_EXTERN const std::string& getPackage(uint32_t) const; + QMF_EXTERN uint32_t getSchemaIdCount(const std::string&) const; + QMF_EXTERN SchemaId getSchemaId(const std::string&, uint32_t) const; + QMF_EXTERN Schema getSchema(const SchemaId&, qpid::messaging::Duration timeout=qpid::messaging::Duration::MINUTE); + + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<Agent>; + friend struct AgentImplAccess; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/AgentEvent.h b/cpp/include/qmf/AgentEvent.h new file mode 100644 index 0000000000..59a41c3267 --- /dev/null +++ b/cpp/include/qmf/AgentEvent.h @@ -0,0 +1,74 @@ +#ifndef QMF_AGENT_EVENT_H +#define QMF_AGENT_EVENT_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qpid/types/Variant.h" + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class AgentEventImpl; + class Query; + class DataAddr; + + enum AgentEventCode { + AGENT_AUTH_QUERY = 1, + AGENT_AUTH_SUBSCRIBE = 2, + AGENT_QUERY = 3, + AGENT_METHOD = 4, + AGENT_SUBSCRIBE_BEGIN = 5, + AGENT_SUBSCRIBE_TOUCH = 6, + AGENT_SUBSCRIBE_END = 7, + AGENT_THREAD_FAILED = 8 + }; + + class AgentEvent : public qmf::Handle<AgentEventImpl> { + public: + QMF_EXTERN AgentEvent(AgentEventImpl* impl = 0); + QMF_EXTERN AgentEvent(const AgentEvent&); + QMF_EXTERN AgentEvent& operator=(const AgentEvent&); + QMF_EXTERN ~AgentEvent(); + + QMF_EXTERN AgentEventCode getType() const; + QMF_EXTERN const std::string& getUserId() const; + QMF_EXTERN Query getQuery() const; + QMF_EXTERN bool hasDataAddr() const; + QMF_EXTERN DataAddr getDataAddr() const; + QMF_EXTERN const std::string& getMethodName() const; + QMF_EXTERN qpid::types::Variant::Map& getArguments(); + QMF_EXTERN qpid::types::Variant::Map& getArgumentSubtypes(); + QMF_EXTERN void addReturnArgument(const std::string&, const qpid::types::Variant&, const std::string& st=""); + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<AgentEvent>; + friend struct AgentEventImplAccess; +#endif + }; +} + +#endif diff --git a/cpp/include/qmf/AgentSession.h b/cpp/include/qmf/AgentSession.h new file mode 100644 index 0000000000..5259aee360 --- /dev/null +++ b/cpp/include/qmf/AgentSession.h @@ -0,0 +1,93 @@ +#ifndef QMF_AGENT_SESSION_H +#define QMF_AGENT_SESSION_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Connection.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class AgentSessionImpl; + class AgentEvent; + class Schema; + class Data; + class DataAddr; + + class AgentSession : public qmf::Handle<AgentSessionImpl> { + public: + QMF_EXTERN AgentSession(AgentSessionImpl* impl = 0); + QMF_EXTERN AgentSession(const AgentSession&); + QMF_EXTERN AgentSession& operator=(const AgentSession&); + QMF_EXTERN ~AgentSession(); + + /** + * + * The options string is of the form "{key:value,key:value}". The following keys are supported: + * + * interval:N - Heartbeat interval in seconds [default: 60] + * external:{True,False} - Use external data storage (queries are pass-through) [default: False] + * allow-queries:{True,False} - If True: automatically allow all queries [default] + * If False: generate an AUTH_QUERY event to allow per-query authorization + * allow-methods:{True,False} - If True: automatically allow all methods [default] + * If False: generate an AUTH_METHOD event to allow per-method authorization + */ + QMF_EXTERN AgentSession(qpid::messaging::Connection&, const std::string& options=""); + QMF_EXTERN void setDomain(const std::string&); + QMF_EXTERN void setVendor(const std::string&); + QMF_EXTERN void setProduct(const std::string&); + QMF_EXTERN void setInstance(const std::string&); + QMF_EXTERN void setAttribute(const std::string&, const qpid::types::Variant&); + QMF_EXTERN const std::string& getName() const; + QMF_EXTERN void open(); + QMF_EXTERN void close(); + QMF_EXTERN bool nextEvent(AgentEvent&, qpid::messaging::Duration timeout=qpid::messaging::Duration::FOREVER); + + QMF_EXTERN void registerSchema(Schema&); + QMF_EXTERN DataAddr addData(Data&, const std::string& name="", bool persistent=false); + QMF_EXTERN void delData(const DataAddr&); + + QMF_EXTERN void authAccept(AgentEvent&); + QMF_EXTERN void authReject(AgentEvent&, const std::string& diag=""); + QMF_EXTERN void raiseException(AgentEvent&, const std::string&); + QMF_EXTERN void raiseException(AgentEvent&, const Data&); + QMF_EXTERN void response(AgentEvent&, const Data&); + QMF_EXTERN void complete(AgentEvent&); + QMF_EXTERN void methodSuccess(AgentEvent&); + QMF_EXTERN void raiseEvent(const Data&); + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<AgentSession>; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/ConsoleEvent.h b/cpp/include/qmf/ConsoleEvent.h new file mode 100644 index 0000000000..61f625b137 --- /dev/null +++ b/cpp/include/qmf/ConsoleEvent.h @@ -0,0 +1,73 @@ +#ifndef QMF_CONSOLE_EVENT_H +#define QMF_CONSOLE_EVENT_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qpid/types/Variant.h" + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class ConsoleEventImpl; + class Agent; + class Data; + + enum ConsoleEventCode { + CONSOLE_AGENT_ADD = 1, + CONSOLE_AGENT_AGE = 2, + CONSOLE_EVENT = 3, + CONSOLE_QUERY_RESPONSE = 4, + CONSOLE_METHOD_RESPONSE = 5, + CONSOLE_EXCEPTION = 6, + CONSOLE_SUBSCRIBE_UPDATE = 7, + CONSOLE_THREAD_FAILED = 8 + }; + + class ConsoleEvent : public qmf::Handle<ConsoleEventImpl> { + public: + QMF_EXTERN ConsoleEvent(ConsoleEventImpl* impl = 0); + QMF_EXTERN ConsoleEvent(const ConsoleEvent&); + QMF_EXTERN ConsoleEvent& operator=(const ConsoleEvent&); + QMF_EXTERN ~ConsoleEvent(); + + QMF_EXTERN ConsoleEventCode getType() const; + QMF_EXTERN uint32_t getCorrelator() const; + QMF_EXTERN Agent getAgent() const; + QMF_EXTERN uint32_t getDataCount() const; + QMF_EXTERN Data getData(uint32_t) const; + QMF_EXTERN bool isFinal() const; + QMF_EXTERN const qpid::types::Variant::Map& getArguments() const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<ConsoleEvent>; + friend struct ConsoleEventImplAccess; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/ConsoleSession.h b/cpp/include/qmf/ConsoleSession.h new file mode 100644 index 0000000000..0e58a647d6 --- /dev/null +++ b/cpp/include/qmf/ConsoleSession.h @@ -0,0 +1,65 @@ +#ifndef QMF_CONSOLE_SESSION_H +#define QMF_CONSOLE_SESSION_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qmf/Agent.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Connection.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class ConsoleSessionImpl; + class ConsoleEvent; + + class ConsoleSession : public qmf::Handle<ConsoleSessionImpl> { + public: + QMF_EXTERN ConsoleSession(ConsoleSessionImpl* impl = 0); + QMF_EXTERN ConsoleSession(const ConsoleSession&); + QMF_EXTERN ConsoleSession& operator=(const ConsoleSession&); + QMF_EXTERN ~ConsoleSession(); + + QMF_EXTERN ConsoleSession(qpid::messaging::Connection&, const std::string& options=""); + QMF_EXTERN void setDomain(const std::string&); + QMF_EXTERN void setAgentFilter(const std::string&); + QMF_EXTERN void open(); + QMF_EXTERN void close(); + QMF_EXTERN bool nextEvent(ConsoleEvent&, qpid::messaging::Duration timeout=qpid::messaging::Duration::FOREVER); + QMF_EXTERN uint32_t getAgentCount() const; + QMF_EXTERN Agent getAgent(uint32_t) const; + QMF_EXTERN Agent getConnectedBrokerAgent() const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<ConsoleSession>; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/Data.h b/cpp/include/qmf/Data.h new file mode 100644 index 0000000000..27af1c4b04 --- /dev/null +++ b/cpp/include/qmf/Data.h @@ -0,0 +1,71 @@ +#ifndef QMF_DATA_H +#define QMF_DATA_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qmf/exceptions.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class DataImpl; + class SchemaId; + class DataAddr; + class Agent; + + class Data : public qmf::Handle<DataImpl> { + public: + QMF_EXTERN Data(DataImpl* impl = 0); + QMF_EXTERN Data(const Data&); + QMF_EXTERN Data& operator=(const Data&); + QMF_EXTERN ~Data(); + + QMF_EXTERN Data(const SchemaId&); + QMF_EXTERN void setSchema(const SchemaId&); + QMF_EXTERN void setAddr(const DataAddr&); + QMF_EXTERN void setProperty(const std::string&, const qpid::types::Variant&); + QMF_EXTERN void overwriteProperties(const qpid::types::Variant::Map&); + QMF_EXTERN bool hasSchema() const; + QMF_EXTERN bool hasAddr() const; + QMF_EXTERN const SchemaId& getSchemaId() const; + QMF_EXTERN const DataAddr& getAddr() const; + QMF_EXTERN const qpid::types::Variant& getProperty(const std::string&) const; + QMF_EXTERN const qpid::types::Variant::Map& getProperties() const; + QMF_EXTERN bool hasAgent() const; + QMF_EXTERN const Agent& getAgent() const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<Data>; + friend struct DataImplAccess; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/DataAddr.h b/cpp/include/qmf/DataAddr.h new file mode 100644 index 0000000000..fd8a8599c1 --- /dev/null +++ b/cpp/include/qmf/DataAddr.h @@ -0,0 +1,62 @@ +#ifndef QMF_DATA_ADDR_H +#define QMF_DATA_ADDR_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class DataAddrImpl; + + class DataAddr : public qmf::Handle<DataAddrImpl> { + public: + QMF_EXTERN DataAddr(DataAddrImpl* impl = 0); + QMF_EXTERN DataAddr(const DataAddr&); + QMF_EXTERN DataAddr& operator=(const DataAddr&); + QMF_EXTERN ~DataAddr(); + + QMF_EXTERN bool operator==(const DataAddr&); + QMF_EXTERN bool operator<(const DataAddr&); + + QMF_EXTERN DataAddr(const std::string& name, const std::string& agentName, uint32_t agentEpoch=0); + QMF_EXTERN const std::string& getName() const; + QMF_EXTERN const std::string& getAgentName() const; + QMF_EXTERN uint32_t getAgentEpoch() const; + QMF_EXTERN qpid::types::Variant::Map asMap() const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<DataAddr>; + friend struct DataAddrImplAccess; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/Handle.h b/cpp/include/qmf/Handle.h new file mode 100644 index 0000000000..510e2993aa --- /dev/null +++ b/cpp/include/qmf/Handle.h @@ -0,0 +1,70 @@ +#ifndef QMF_HANDLE_H +#define QMF_HANDLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/ImportExport.h" + +namespace qmf { + +template <class> class PrivateImplRef; + +/** \ingroup qmf + * A handle is like a pointer: refers to an underlying implementation object. + * Copying the handle does not copy the object. + * + * Handles can be null, like a 0 pointer. Use isValid(), isNull() or the + * conversion to bool to test for a null handle. + */ +template <class T> class Handle { + public: + + /**@return true if handle is valid, i.e. not null. */ + QMF_EXTERN bool isValid() const { return impl; } + + /**@return true if handle is null. It is an error to call any function on a null handle. */ + QMF_EXTERN bool isNull() const { return !impl; } + + /** Conversion to bool supports idiom if (handle) { handle->... } */ + QMF_EXTERN operator bool() const { return impl; } + + /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */ + QMF_EXTERN bool operator !() const { return !impl; } + + void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; } + + protected: + typedef T Impl; + QMF_EXTERN Handle() :impl() {} + + // Not implemented,subclasses must implement. + QMF_EXTERN Handle(const Handle&); + QMF_EXTERN Handle& operator=(const Handle&); + + Impl* impl; + + friend class PrivateImplRef<T>; +}; + +} // namespace qmf + +#endif /*!QMF_HANDLE_H*/ diff --git a/cpp/include/qmf/ImportExport.h b/cpp/include/qmf/ImportExport.h new file mode 100644 index 0000000000..f5e1d9127c --- /dev/null +++ b/cpp/include/qmf/ImportExport.h @@ -0,0 +1,33 @@ +#ifndef QMF_IMPORT_EXPORT_H +#define QMF_IMPORT_EXPORT_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#if defined(WIN32) && !defined(QPID_DECLARE_STATIC) +# if defined(QMF_EXPORT) || defined (qmfcommon_EXPORTS) +# define QMF_EXTERN __declspec(dllexport) +# else +# define QMF_EXTERN __declspec(dllimport) +# endif +#else +# define QMF_EXTERN +#endif + +#endif diff --git a/cpp/include/qmf/Query.h b/cpp/include/qmf/Query.h new file mode 100644 index 0000000000..71ef31b104 --- /dev/null +++ b/cpp/include/qmf/Query.h @@ -0,0 +1,65 @@ +#ifndef QMF_QUERY_H +#define QMF_QUERY_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class QueryImpl; + class SchemaId; + class DataAddr; + + class Query : public qmf::Handle<QueryImpl> { + public: + QMF_EXTERN Query(QueryImpl* impl = 0); + QMF_EXTERN Query(const Query&); + QMF_EXTERN Query& operator=(const Query&); + QMF_EXTERN ~Query(); + + QMF_EXTERN Query(const std::string& className, const std::string& package="", const std::string& predicate=""); + QMF_EXTERN Query(const SchemaId&); + QMF_EXTERN Query(const DataAddr&); + + QMF_EXTERN const DataAddr& getDataAddr() const; + QMF_EXTERN const SchemaId& getSchemaId() const; + QMF_EXTERN const std::string& getClassName() const; + QMF_EXTERN const std::string& getPackageName() const; + QMF_EXTERN void addPredicate(const std::string&, const qpid::types::Variant&); + QMF_EXTERN const qpid::types::Variant::Map& getPredicate() const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<Query>; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/Schema.h b/cpp/include/qmf/Schema.h new file mode 100644 index 0000000000..cf316138c1 --- /dev/null +++ b/cpp/include/qmf/Schema.h @@ -0,0 +1,76 @@ +#ifndef QMF_SCHEMA_H +#define QMF_SCHEMA_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qpid/sys/IntegerTypes.h" +#include "qmf/Handle.h" +#include "qmf/SchemaTypes.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class SchemaImpl; + class SchemaId; + class SchemaProperty; + class SchemaMethod; + + class Schema : public qmf::Handle<SchemaImpl> { + public: + QMF_EXTERN Schema(SchemaImpl* impl = 0); + QMF_EXTERN Schema(const Schema&); + QMF_EXTERN Schema& operator=(const Schema&); + QMF_EXTERN ~Schema(); + + QMF_EXTERN Schema(int, const std::string&, const std::string&); + QMF_EXTERN const SchemaId& getSchemaId() const; + + QMF_EXTERN void finalize(); + QMF_EXTERN bool isFinalized() const; + QMF_EXTERN void addProperty(const SchemaProperty&); + QMF_EXTERN void addMethod(const SchemaMethod&); + QMF_EXTERN void setDesc(const std::string&); + QMF_EXTERN const std::string& getDesc() const; + + QMF_EXTERN void setDefaultSeverity(int); + QMF_EXTERN int getDefaultSeverity() const; + + QMF_EXTERN uint32_t getPropertyCount() const; + QMF_EXTERN SchemaProperty getProperty(uint32_t) const; + + QMF_EXTERN uint32_t getMethodCount() const; + QMF_EXTERN SchemaMethod getMethod(uint32_t) const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<Schema>; + friend struct SchemaImplAccess; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/SchemaId.h b/cpp/include/qmf/SchemaId.h new file mode 100644 index 0000000000..13fb1cb902 --- /dev/null +++ b/cpp/include/qmf/SchemaId.h @@ -0,0 +1,61 @@ +#ifndef QMF_SCHEMA_ID_H +#define QMF_SCHEMA_ID_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qpid/types/Uuid.h" +#include "qmf/SchemaTypes.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class SchemaIdImpl; + + class SchemaId : public qmf::Handle<SchemaIdImpl> { + public: + QMF_EXTERN SchemaId(SchemaIdImpl* impl = 0); + QMF_EXTERN SchemaId(const SchemaId&); + QMF_EXTERN SchemaId& operator=(const SchemaId&); + QMF_EXTERN ~SchemaId(); + + QMF_EXTERN SchemaId(int, const std::string&, const std::string&); + QMF_EXTERN void setHash(const qpid::types::Uuid&); + QMF_EXTERN int getType() const; + QMF_EXTERN const std::string& getPackageName() const; + QMF_EXTERN const std::string& getName() const; + QMF_EXTERN const qpid::types::Uuid& getHash() const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<SchemaId>; + friend struct SchemaIdImplAccess; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/SchemaMethod.h b/cpp/include/qmf/SchemaMethod.h new file mode 100644 index 0000000000..47302b62b9 --- /dev/null +++ b/cpp/include/qmf/SchemaMethod.h @@ -0,0 +1,65 @@ +#ifndef QMF_SCHEMA_METHOD_H +#define QMF_SCHEMA_METHOD_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/ImportExport.h" +#include "qpid/sys/IntegerTypes.h" +#include "qmf/Handle.h" +#include "qmf/SchemaTypes.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class SchemaMethodImpl; + class SchemaProperty; + + class SchemaMethod : public qmf::Handle<SchemaMethodImpl> { + public: + QMF_EXTERN SchemaMethod(SchemaMethodImpl* impl = 0); + QMF_EXTERN SchemaMethod(const SchemaMethod&); + QMF_EXTERN SchemaMethod& operator=(const SchemaMethod&); + QMF_EXTERN ~SchemaMethod(); + + QMF_EXTERN SchemaMethod(const std::string&, const std::string& o=""); + + QMF_EXTERN void setDesc(const std::string&); + QMF_EXTERN void addArgument(const SchemaProperty&); + + QMF_EXTERN const std::string& getName() const; + QMF_EXTERN const std::string& getDesc() const; + QMF_EXTERN uint32_t getArgumentCount() const; + QMF_EXTERN SchemaProperty getArgument(uint32_t) const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<SchemaMethod>; + friend struct SchemaMethodImplAccess; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/SchemaProperty.h b/cpp/include/qmf/SchemaProperty.h new file mode 100644 index 0000000000..2e770c2ef1 --- /dev/null +++ b/cpp/include/qmf/SchemaProperty.h @@ -0,0 +1,74 @@ +#ifndef QMF_SCHEMA_PROPERTY_H +#define QMF_SCHEMA_PROPERTY_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qpid/types/Uuid.h" +#include "qpid/types/Variant.h" +#include "qmf/SchemaTypes.h" +#include <string> + +namespace qmf { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class SchemaPropertyImpl; + + class SchemaProperty : public Handle<SchemaPropertyImpl> { + public: + QMF_EXTERN SchemaProperty(SchemaPropertyImpl* impl = 0); + QMF_EXTERN SchemaProperty(const SchemaProperty&); + QMF_EXTERN SchemaProperty& operator=(const SchemaProperty&); + QMF_EXTERN ~SchemaProperty(); + + QMF_EXTERN SchemaProperty(const std::string&, int, const std::string& o=""); + + QMF_EXTERN void setAccess(int); + QMF_EXTERN void setIndex(bool); + QMF_EXTERN void setOptional(bool); + QMF_EXTERN void setUnit(const std::string&); + QMF_EXTERN void setDesc(const std::string&); + QMF_EXTERN void setSubtype(const std::string&); + QMF_EXTERN void setDirection(int); + + QMF_EXTERN const std::string& getName() const; + QMF_EXTERN int getAccess() const; + QMF_EXTERN bool isIndex() const; + QMF_EXTERN bool isOptional() const; + QMF_EXTERN const std::string& getUnit() const; + QMF_EXTERN const std::string& getDesc() const; + QMF_EXTERN const std::string& getSubtype() const; + QMF_EXTERN int getDirection() const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<SchemaProperty>; + friend struct SchemaPropertyImplAccess; +#endif + }; + +} + +#endif diff --git a/cpp/include/qmf/SchemaTypes.h b/cpp/include/qmf/SchemaTypes.h new file mode 100644 index 0000000000..af3da612e5 --- /dev/null +++ b/cpp/include/qmf/SchemaTypes.h @@ -0,0 +1,56 @@ +#ifndef QMF_SCHEMA_TYPES_H +#define QMF_SCHEMA_TYPES_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qmf { + + const int SCHEMA_TYPE_DATA = 1; + const int SCHEMA_TYPE_EVENT = 2; + + const int SCHEMA_DATA_VOID = 1; + const int SCHEMA_DATA_BOOL = 2; + const int SCHEMA_DATA_INT = 3; + const int SCHEMA_DATA_FLOAT = 4; + const int SCHEMA_DATA_STRING = 5; + const int SCHEMA_DATA_MAP = 6; + const int SCHEMA_DATA_LIST = 7; + const int SCHEMA_DATA_UUID = 8; + + const int ACCESS_READ_CREATE = 1; + const int ACCESS_READ_WRITE = 2; + const int ACCESS_READ_ONLY = 3; + + const int DIR_IN = 1; + const int DIR_OUT = 2; + const int DIR_IN_OUT = 3; + + const int SEV_EMERG = 0; + const int SEV_ALERT = 1; + const int SEV_CRIT = 2; + const int SEV_ERROR = 3; + const int SEV_WARN = 4; + const int SEV_NOTICE = 5; + const int SEV_INFORM = 6; + const int SEV_DEBUG = 7; +} + +#endif diff --git a/cpp/include/qmf/exceptions.h b/cpp/include/qmf/exceptions.h new file mode 100644 index 0000000000..7959499d63 --- /dev/null +++ b/cpp/include/qmf/exceptions.h @@ -0,0 +1,59 @@ +#ifndef QMF_EXCEPTIONS_H +#define QMF_EXCEPTIONS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/ImportExport.h" +#include "qpid/types/Exception.h" +#include "qpid/types/Variant.h" + +namespace qmf { + +/** \ingroup qmf + */ + + struct QmfException : public qpid::types::Exception { + QMF_EXTERN QmfException(const std::string& msg); + QMF_EXTERN virtual ~QmfException() throw(); + + qpid::types::Variant::Map detail; + }; + + struct KeyNotFound : public QmfException { + QMF_EXTERN KeyNotFound(const std::string& msg); + QMF_EXTERN virtual ~KeyNotFound() throw(); + }; + + struct IndexOutOfRange : public QmfException { + QMF_EXTERN IndexOutOfRange(); + QMF_EXTERN virtual ~IndexOutOfRange() throw(); + }; + + struct OperationTimedOut : public QmfException { + QMF_EXTERN OperationTimedOut(); + QMF_EXTERN virtual ~OperationTimedOut() throw(); + }; + +} + +#endif + diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk index dca8262333..ef02cf7562 100644 --- a/cpp/src/qmf.mk +++ b/cpp/src/qmf.mk @@ -22,7 +22,8 @@ # lib_LTLIBRARIES += \ libqmf.la \ - libqmfengine.la + libqmfengine.la \ + libqmf2.la # # Public headers for the QMF API @@ -30,6 +31,29 @@ lib_LTLIBRARIES += \ QMF_API = \ ../include/qpid/agent/ManagementAgent.h \ ../include/qpid/agent/QmfAgentImportExport.h + +# +# Public headers for the QMF2 API +# +QMF2_API = \ + ../include/qmf/AgentEvent.h \ + ../include/qmf/Agent.h \ + ../include/qmf/AgentSession.h \ + ../include/qmf/ConsoleEvent.h \ + ../include/qmf/ConsoleSession.h \ + ../include/qmf/DataAddr.h \ + ../include/qmf/Data.h \ + ../include/qmf/exceptions.h \ + ../include/qmf/Handle.h \ + ../include/qmf/ImportExport.h \ + ../include/qmf/Query.h \ + ../include/qmf/Schema.h \ + ../include/qmf/SchemaId.h \ + ../include/qmf/SchemaMethod.h \ + ../include/qmf/SchemaProperty.h \ + ../include/qmf/SchemaTypes.h + + # # Public headers for the QMF Engine API # @@ -51,13 +75,45 @@ QMF_ENGINE_API = \ # Public header files nobase_include_HEADERS += \ $(QMF_API) \ - $(QMF_ENGINE_API) + $(QMF_ENGINE_API) \ + $(QMF2_API) libqmf_la_SOURCES = \ $(QMF_API) \ qpid/agent/ManagementAgentImpl.cpp \ qpid/agent/ManagementAgentImpl.h +libqmf2_la_SOURCES = \ + $(QMF2_API) \ + qmf/Agent.cpp \ + qmf/AgentEvent.cpp \ + qmf/AgentEventImpl.h \ + qmf/AgentImpl.h \ + qmf/AgentSession.cpp \ + qmf/ConsoleEvent.cpp \ + qmf/ConsoleEventImpl.h \ + qmf/ConsoleSession.cpp \ + qmf/ConsoleSessionImpl.h \ + qmf/DataAddr.cpp \ + qmf/DataAddrImpl.h \ + qmf/Data.cpp \ + qmf/DataImpl.h \ + qmf/exceptions.cpp \ + qmf/Hash.cpp \ + qmf/Hash.h \ + qmf/PrivateImplRef.h \ + qmf/Query.cpp \ + qmf/Schema.cpp \ + qmf/SchemaCache.cpp \ + qmf/SchemaCache.h \ + qmf/SchemaId.cpp \ + qmf/SchemaIdImpl.h \ + qmf/SchemaImpl.h \ + qmf/SchemaMethod.cpp \ + qmf/SchemaMethodImpl.h \ + qmf/SchemaProperty.cpp \ + qmf/SchemaPropertyImpl.h + libqmfengine_la_SOURCES = \ $(QMF_ENGINE_API) \ qmf/engine/Agent.cpp \ @@ -88,10 +144,13 @@ libqmfengine_la_SOURCES = \ qmf/engine/ValueImpl.h libqmf_la_LIBADD = libqmfengine.la +libqmf2_la_LIBADD = libqpidmessaging.la libqpidtypes.la libqmfengine_la_LIBADD = libqpidclient.la QMF_VERSION_INFO = 1:0:0 +QMF2_VERSION_INFO = 1:0:0 QMFENGINE_VERSION_INFO = 1:1:0 libqmf_la_LDFLAGS = -version-info $(QMF_VERSION_INFO) +libqmf2_la_LDFLAGS = -version-info $(QMF2_VERSION_INFO) libqmfengine_la_LDFLAGS = -version-info $(QMFENGINE_VERSION_INFO) diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp new file mode 100644 index 0000000000..c7ccea35d5 --- /dev/null +++ b/cpp/src/qmf/Agent.cpp @@ -0,0 +1,578 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/AgentImpl.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/ConsoleEventImpl.h" +#include "qmf/ConsoleSession.h" +#include "qmf/DataImpl.h" +#include "qmf/Query.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/management/Buffer.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> + +using qpid::types::Variant; +using qpid::messaging::Duration; +using qpid::messaging::Message; +using qpid::messaging::Sender; +using namespace std; +using namespace qmf; + +typedef PrivateImplRef<Agent> PI; + +Agent::Agent(AgentImpl* impl) { PI::ctor(*this, impl); } +Agent::Agent(const Agent& s) : qmf::Handle<AgentImpl>() { PI::copy(*this, s); } +Agent::~Agent() { PI::dtor(*this); } +Agent& Agent::operator=(const Agent& s) { return PI::assign(*this, s); } +string Agent::getName() const { return isValid() ? impl->getName() : ""; } +uint32_t Agent::getEpoch() const { return isValid() ? impl->getEpoch() : 0; } +string Agent::getVendor() const { return isValid() ? impl->getVendor() : ""; } +string Agent::getProduct() const { return isValid() ? impl->getProduct() : ""; } +string Agent::getInstance() const { return isValid() ? impl->getInstance() : ""; } +const Variant& Agent::getAttribute(const string& k) const { return impl->getAttribute(k); } +const Variant::Map& Agent::getAttributes() const { return impl->getAttributes(); } +ConsoleEvent Agent::query(const Query& q, Duration t) { return impl->query(q, t); } +ConsoleEvent Agent::query(const string& q, Duration t) { return impl->query(q, t); } +uint32_t Agent::queryAsync(const Query& q) { return impl->queryAsync(q); } +uint32_t Agent::queryAsync(const string& q) { return impl->queryAsync(q); } +ConsoleEvent Agent::callMethod(const string& m, const Variant::Map& a, const DataAddr& d, Duration t) { return impl->callMethod(m, a, d, t); } +uint32_t Agent::callMethodAsync(const string& m, const Variant::Map& a, const DataAddr& d) { return impl->callMethodAsync(m, a, d); } +uint32_t Agent::getPackageCount() const { return impl->getPackageCount(); } +const string& Agent::getPackage(uint32_t i) const { return impl->getPackage(i); } +uint32_t Agent::getSchemaIdCount(const string& p) const { return impl->getSchemaIdCount(p); } +SchemaId Agent::getSchemaId(const string& p, uint32_t i) const { return impl->getSchemaId(p, i); } +Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(s, t); } + + + +AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : + name(n), epoch(e), session(s), touched(true), untouchedCount(0), + nextCorrelator(1), schemaCache(s.schemaCache) +{ +} + +const Variant& AgentImpl::getAttribute(const string& k) const +{ + Variant::Map::const_iterator iter = attributes.find(k); + if (iter == attributes.end()) + throw KeyNotFound(k); + return iter->second; +} + +ConsoleEvent AgentImpl::query(const Query& query, Duration timeout) +{ + boost::shared_ptr<SyncContext> context(new SyncContext()); + uint32_t correlator; + ConsoleEvent result; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + contextMap[correlator] = context; + } + try { + sendQuery(query, correlator); + { + uint64_t milliseconds = timeout.getMilliseconds(); + qpid::sys::Mutex::ScopedLock cl(context->lock); + if (!context->response.isValid() || !context->response.isFinal()) + context->cond.wait(context->lock, + qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (context->response.isValid() && context->response.isFinal()) + result = context->response; + else { + auto_ptr<ConsoleEventImpl> impl(new ConsoleEventImpl(CONSOLE_EXCEPTION)); + Data exception(new DataImpl()); + exception.setProperty("error_text", "Timed out waiting for the agent to respond"); + impl->addData(exception); + result = ConsoleEvent(impl.release()); + } + } + } catch (qpid::types::Exception&) { + } + + { + qpid::sys::Mutex::ScopedLock l(lock); + contextMap.erase(correlator); + } + + return result; +} + + +ConsoleEvent AgentImpl::query(const string& text, Duration timeout) +{ + return query(stringToQuery(text), timeout); +} + + +uint32_t AgentImpl::queryAsync(const Query& query) +{ + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } + + sendQuery(query, correlator); + return correlator; +} + + +uint32_t AgentImpl::queryAsync(const string& text) +{ + return queryAsync(stringToQuery(text)); +} + + +ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout) +{ + boost::shared_ptr<SyncContext> context(new SyncContext()); + uint32_t correlator; + ConsoleEvent result; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + contextMap[correlator] = context; + } + try { + sendMethod(method, args, addr, correlator); + { + uint64_t milliseconds = timeout.getMilliseconds(); + qpid::sys::Mutex::ScopedLock cl(context->lock); + if (!context->response.isValid()) + context->cond.wait(context->lock, + qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (context->response.isValid()) + result = context->response; + else { + auto_ptr<ConsoleEventImpl> impl(new ConsoleEventImpl(CONSOLE_EXCEPTION)); + Data exception(new DataImpl()); + exception.setProperty("error_text", "Timed out waiting for the agent to respond"); + impl->addData(exception); + result = ConsoleEvent(impl.release()); + } + } + } catch (qpid::types::Exception&) { + } + + { + qpid::sys::Mutex::ScopedLock l(lock); + contextMap.erase(correlator); + } + + return result; +} + + +uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr) +{ + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } + + sendMethod(method, args, addr, correlator); + return correlator; +} + + +uint32_t AgentImpl::getPackageCount() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + + // + // Populate the package set. + // + for (set<SchemaId>::const_iterator iter = schemaIdSet.begin(); iter != schemaIdSet.end(); iter++) + packageSet.insert(iter->getPackageName()); + + return packageSet.size(); +} + + +const string& AgentImpl::getPackage(uint32_t idx) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + uint32_t count(0); + for (set<string>::const_iterator iter = packageSet.begin(); iter != packageSet.end(); iter++) { + if (idx == count) + return *iter; + count++; + } + throw IndexOutOfRange(); +} + + +uint32_t AgentImpl::getSchemaIdCount(const string& pname) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + uint32_t count(0); + for (set<SchemaId>::const_iterator iter = schemaIdSet.begin(); iter != schemaIdSet.end(); iter++) + if (iter->getPackageName() == pname) + count++; + return count; +} + + +SchemaId AgentImpl::getSchemaId(const string& pname, uint32_t idx) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + uint32_t count(0); + for (set<SchemaId>::const_iterator iter = schemaIdSet.begin(); iter != schemaIdSet.end(); iter++) { + if (iter->getPackageName() == pname) { + if (idx == count) + return *iter; + count++; + } + } + throw IndexOutOfRange(); +} + + +Schema AgentImpl::getSchema(const SchemaId& id, Duration timeout) +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (!schemaCache->haveSchema(id)) + // + // The desired schema is not in the cache. We need to asynchronously query the remote + // agent for the information. The call to schemaCache->getSchema will block waiting for + // the response to be received. + // + sendSchemaRequest(id); + + return schemaCache->getSchema(id, timeout); +} + + +void AgentImpl::handleException(const Variant::Map& content, const Message& msg) +{ + const string& cid(msg.getCorrelationId()); + Variant::Map::const_iterator aIter; + uint32_t correlator; + boost::shared_ptr<SyncContext> context; + + try { correlator = boost::lexical_cast<uint32_t>(cid); } + catch(const boost::bad_lexical_cast&) { correlator = 0; } + + { + qpid::sys::Mutex::ScopedLock l(lock); + map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator); + if (iter != contextMap.end()) + context = iter->second; + } + + if (context.get() != 0) { + // + // This exception is associated with a synchronous request. + // + qpid::sys::Mutex::ScopedLock cl(context->lock); + context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_EXCEPTION)); + ConsoleEventImplAccess::get(context->response).addData(new DataImpl(content, this)); + ConsoleEventImplAccess::get(context->response).setAgent(this); + context->cond.notify(); + } else { + // + // This exception is associated with an asynchronous request. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_EXCEPTION)); + eventImpl->setCorrelator(correlator); + eventImpl->setAgent(this); + eventImpl->addData(new DataImpl(content, this)); + session.enqueueEvent(eventImpl.release()); + } +} + + +void AgentImpl::handleMethodResponse(const Variant::Map& response, const Message& msg) +{ + const string& cid(msg.getCorrelationId()); + Variant::Map::const_iterator aIter; + Variant::Map argMap; + uint32_t correlator; + boost::shared_ptr<SyncContext> context; + + QPID_LOG(trace, "RCVD MethodResponse map=" << response); + + aIter = response.find("_arguments"); + if (aIter != response.end()) + argMap = aIter->second.asMap(); + + try { correlator = boost::lexical_cast<uint32_t>(cid); } + catch(const boost::bad_lexical_cast&) { correlator = 0; } + + { + qpid::sys::Mutex::ScopedLock l(lock); + map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator); + if (iter != contextMap.end()) + context = iter->second; + } + + if (context.get() != 0) { + // + // This response is associated with a synchronous request. + // + qpid::sys::Mutex::ScopedLock cl(context->lock); + context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_METHOD_RESPONSE)); + ConsoleEventImplAccess::get(context->response).setArguments(argMap); + ConsoleEventImplAccess::get(context->response).setAgent(this); + context->cond.notify(); + } else { + // + // This response is associated with an asynchronous request. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_METHOD_RESPONSE)); + eventImpl->setCorrelator(correlator); + eventImpl->setAgent(this); + eventImpl->setArguments(argMap); + session.enqueueEvent(eventImpl.release()); + } +} + + +void AgentImpl::handleDataIndication(const Variant::List&, const Message&) +{ + // TODO +} + + +void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& msg) +{ + const string& cid(msg.getCorrelationId()); + Variant::Map::const_iterator aIter; + const Variant::Map& props(msg.getProperties()); + uint32_t correlator; + bool final(false); + boost::shared_ptr<SyncContext> context; + + aIter = props.find("partial"); + if (aIter == props.end()) + final = true; + + try { correlator = boost::lexical_cast<uint32_t>(cid); } + catch(const boost::bad_lexical_cast&) { correlator = 0; } + + { + qpid::sys::Mutex::ScopedLock l(lock); + map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator); + if (iter != contextMap.end()) + context = iter->second; + } + + if (context.get() != 0) { + // + // This response is associated with a synchronous request. + // + qpid::sys::Mutex::ScopedLock cl(context->lock); + if (!context->response.isValid()) + context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE)); + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Data data(new DataImpl(lIter->asMap(), this)); + ConsoleEventImplAccess::get(context->response).addData(data); + if (data.hasSchema()) + learnSchemaId(data.getSchemaId()); + } + if (final) { + ConsoleEventImplAccess::get(context->response).setFinal(); + ConsoleEventImplAccess::get(context->response).setAgent(this); + context->cond.notify(); + } + } else { + // + // This response is associated with an asynchronous request. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE)); + eventImpl->setCorrelator(correlator); + eventImpl->setAgent(this); + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Data data(new DataImpl(lIter->asMap(), this)); + eventImpl->addData(data); + if (data.hasSchema()) + learnSchemaId(data.getSchemaId()); + } + if (final) + eventImpl->setFinal(); + session.enqueueEvent(eventImpl.release()); + } +} + + +Query AgentImpl::stringToQuery(const std::string& text) +{ + qpid::messaging::AddressParser parser(text); + Variant::Map map; + Variant::Map::const_iterator iter; + string className; + string packageName; + + parser.parseMap(map); + + iter = map.find("class"); + if (iter != map.end()) + className = iter->second.asString(); + + iter = map.find("package"); + if (iter != map.end()) + packageName = iter->second.asString(); + + Query query(className, packageName); + + iter = map.find("where"); + if (iter != map.end()) { + const Variant::Map& pred(iter->second.asMap()); + for (iter = pred.begin(); iter != pred.end(); iter++) + query.addPredicate(iter->first, iter->second); + } + + return query; +} + + +void AgentImpl::sendQuery(const Query& query, uint32_t correlator) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers["method"] = "request"; + headers["qmf.opcode"] = "_query_request"; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + map["_what"] = "OBJECT"; + + const DataAddr& dataAddr(query.getDataAddr()); + const SchemaId& schemaId(query.getSchemaId()); + + if (dataAddr.isValid()) + map["_object_id"] = dataAddr.asMap(); + else { + string className; + string packageName; + if (schemaId.isValid()) { + className = schemaId.getName(); + packageName = schemaId.getPackageName(); + } else { + className = query.getClassName(); + if (className.empty()) + throw QmfException("Invalid Query"); + packageName = query.getPackageName(); + } + Variant::Map idMap; + idMap["_class_name"] = className; + if (!packageName.empty()) + idMap["_package_name"] = packageName; + map["_schema_id"] = idMap; + } + + // + // TODO: Encode a simple-predicate if present. + // + + msg.setReplyTo(session.replyAddress); + msg.setCorrelationId(boost::lexical_cast<string>(correlator)); + encode(map, msg); + Sender sender(session.session.createSender(session.directBase + "/" + name)); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT QueryRequest to=" << name); +} + + +void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const DataAddr& addr, uint32_t correlator) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers["method"] = "request"; + headers["qmf.opcode"] = "_method_request"; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + map["_method_name"] = method; + map["_object_id"] = addr.asMap(); + map["_arguments"] = args; + + msg.setReplyTo(session.replyAddress); + msg.setCorrelationId(boost::lexical_cast<string>(correlator)); + encode(map, msg); + Sender sender(session.session.createSender(session.directBase + "/" + name)); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name); +} + +void AgentImpl::sendSchemaRequest(const SchemaId& id) +{ + // TODO: Check agent's capability value to determine which kind of schema request to make + +#define RAW_BUFFER_SIZE 1024 + char rawBuffer[RAW_BUFFER_SIZE]; + qpid::management::Buffer buffer(rawBuffer, RAW_BUFFER_SIZE); + + buffer.putOctet('A'); + buffer.putOctet('M'); + buffer.putOctet('2'); + buffer.putOctet('S'); + buffer.putLong(nextCorrelator++); + buffer.putShortString(id.getPackageName()); + buffer.putShortString(id.getName()); + buffer.putBin128(id.getHash().data()); + + string content(rawBuffer, buffer.getPosition()); + + Message msg; + msg.setReplyTo(session.replyAddress); + msg.setContent(content); + Sender sender(session.session.createSender(session.directBase + "/" + name)); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT V1SchemaRequest to=" << name); +} + + +void AgentImpl::learnSchemaId(const SchemaId& id) +{ + schemaCache->declareSchemaId(id); + schemaIdSet.insert(id); +} + + +AgentImpl& AgentImplAccess::get(Agent& item) +{ + return *item.impl; +} + + +const AgentImpl& AgentImplAccess::get(const Agent& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/AgentEvent.cpp b/cpp/src/qmf/AgentEvent.cpp new file mode 100644 index 0000000000..fbdce686d4 --- /dev/null +++ b/cpp/src/qmf/AgentEvent.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/AgentEventImpl.h" +#include "qmf/PrivateImplRef.h" + +using namespace std; +using namespace qmf; +using qpid::types::Variant; + +typedef PrivateImplRef<AgentEvent> PI; + +AgentEvent::AgentEvent(AgentEventImpl* impl) { PI::ctor(*this, impl); } +AgentEvent::AgentEvent(const AgentEvent& s) : qmf::Handle<AgentEventImpl>() { PI::copy(*this, s); } +AgentEvent::~AgentEvent() { PI::dtor(*this); } +AgentEvent& AgentEvent::operator=(const AgentEvent& s) { return PI::assign(*this, s); } + +AgentEventCode AgentEvent::getType() const { return impl->getType(); } +const string& AgentEvent::getUserId() const { return impl->getUserId(); } +Query AgentEvent::getQuery() const { return impl->getQuery(); } +bool AgentEvent::hasDataAddr() const { return impl->hasDataAddr(); } +DataAddr AgentEvent::getDataAddr() const { return impl->getDataAddr(); } +const string& AgentEvent::getMethodName() const { return impl->getMethodName(); } +qpid::types::Variant::Map& AgentEvent::getArguments() { return impl->getArguments(); } +qpid::types::Variant::Map& AgentEvent::getArgumentSubtypes() { return impl->getArgumentSubtypes(); } +void AgentEvent::addReturnArgument(const std::string& k, const qpid::types::Variant& v, const std::string& s) { impl->addReturnArgument(k, v, s); } + +uint32_t AgentEventImpl::enqueueData(const Data& data) +{ + qpid::sys::Mutex::ScopedLock l(lock); + dataQueue.push(data); + return dataQueue.size(); +} + + +Data AgentEventImpl::dequeueData() +{ + qpid::sys::Mutex::ScopedLock l(lock); + if (dataQueue.empty()) + return Data(); + Data data(dataQueue.front()); + dataQueue.pop(); + return data; +} + + +void AgentEventImpl::addReturnArgument(const string& key, const Variant& val, const string& subtype) +{ + outArguments[key] = val; + if (!subtype.empty()) + outArgumentSubtypes[key] = subtype; +} + + +AgentEventImpl& AgentEventImplAccess::get(AgentEvent& item) +{ + return *item.impl; +} + + +const AgentEventImpl& AgentEventImplAccess::get(const AgentEvent& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/AgentEventImpl.h b/cpp/src/qmf/AgentEventImpl.h new file mode 100644 index 0000000000..058e31d78d --- /dev/null +++ b/cpp/src/qmf/AgentEventImpl.h @@ -0,0 +1,93 @@ +#ifndef _QMF_AGENT_EVENT_IMPL_H_ +#define _QMF_AGENT_EVENT_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/messaging/Address.h" +#include "qmf/AgentEvent.h" +#include "qmf/Query.h" +#include "qmf/DataAddr.h" +#include "qmf/Data.h" +#include <queue> + +namespace qmf { + class AgentEventImpl : public virtual qpid::RefCounted { + public: + // + // Impl-only methods + // + AgentEventImpl(AgentEventCode e) : eventType(e) {} + void setUserId(const std::string& u) { userId = u; } + void setQuery(const Query& q) { query = q; } + void setDataAddr(const DataAddr& d) { dataAddr = d; } + void setMethodName(const std::string& m) { methodName = m; } + void setArguments(const qpid::types::Variant::Map& a) { arguments = a; } + void setArgumentSubtypes(const qpid::types::Variant::Map& a) { argumentSubtypes = a; } + void setReplyTo(const qpid::messaging::Address& r) { replyTo = r; } + const qpid::messaging::Address& getReplyTo() { return replyTo; } + void setCorrelationId(const std::string& c) { correlationId = c; } + const std::string& getCorrelationId() { return correlationId; } + const qpid::types::Variant::Map& getReturnArguments() const { return outArguments; } + const qpid::types::Variant::Map& getReturnArgumentSubtypes() const { return outArgumentSubtypes; } + uint32_t enqueueData(const Data&); + Data dequeueData(); + + // + // Methods from API handle + // + AgentEventCode getType() const { return eventType; } + const std::string& getUserId() const { return userId; } + Query getQuery() const { return query; } + bool hasDataAddr() const { return dataAddr.isValid(); } + DataAddr getDataAddr() const { return dataAddr; } + const std::string& getMethodName() const { return methodName; } + qpid::types::Variant::Map& getArguments() { return arguments; } + qpid::types::Variant::Map& getArgumentSubtypes() { return argumentSubtypes; } + void addReturnArgument(const std::string&, const qpid::types::Variant&, const std::string&); + + private: + const AgentEventCode eventType; + std::string userId; + qpid::messaging::Address replyTo; + std::string correlationId; + Query query; + DataAddr dataAddr; + std::string methodName; + qpid::types::Variant::Map arguments; + qpid::types::Variant::Map argumentSubtypes; + qpid::types::Variant::Map outArguments; + qpid::types::Variant::Map outArgumentSubtypes; + + qpid::sys::Mutex lock; + std::queue<Data> dataQueue; + }; + + struct AgentEventImplAccess + { + static AgentEventImpl& get(AgentEvent&); + static const AgentEventImpl& get(const AgentEvent&); + }; +} + +#endif diff --git a/cpp/src/qmf/AgentImpl.h b/cpp/src/qmf/AgentImpl.h new file mode 100644 index 0000000000..d5d2a2fdb2 --- /dev/null +++ b/cpp/src/qmf/AgentImpl.h @@ -0,0 +1,112 @@ +#ifndef _QMF_AGENT_IMPL_H_ +#define _QMF_AGENT_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/Agent.h" +#include "qmf/ConsoleEventImpl.h" +#include "qmf/ConsoleSessionImpl.h" +#include "qmf/SchemaCache.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Message.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include <boost/shared_ptr.hpp> +#include <map> +#include <set> + +namespace qmf { + class AgentImpl : public virtual qpid::RefCounted { + public: + // + // Impl-only methods + // + AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s); + void setAttribute(const std::string& k, const qpid::types::Variant& v) { attributes[k] = v; } + void setAttribute(const std::string& k, const std::string& v) { attributes[k] = v; } + void touch() { touched = true; } + uint32_t age() { untouchedCount = touched ? 0 : untouchedCount + 1; return untouchedCount; } + void handleException(const qpid::types::Variant::Map&, const qpid::messaging::Message&); + void handleMethodResponse(const qpid::types::Variant::Map&, const qpid::messaging::Message&); + void handleDataIndication(const qpid::types::Variant::List&, const qpid::messaging::Message&); + void handleQueryResponse(const qpid::types::Variant::List&, const qpid::messaging::Message&); + + // + // Methods from API handle + // + const std::string& getName() const { return name; } + uint32_t getEpoch() const { return epoch; } + std::string getVendor() const { return getAttribute("_vendor").asString(); } + std::string getProduct() const { return getAttribute("_product").asString(); } + std::string getInstance() const { return getAttribute("_instance").asString(); } + const qpid::types::Variant& getAttribute(const std::string& k) const; + const qpid::types::Variant::Map& getAttributes() const { return attributes; } + + ConsoleEvent query(const Query& q, qpid::messaging::Duration t); + ConsoleEvent query(const std::string& q, qpid::messaging::Duration t); + uint32_t queryAsync(const Query& q); + uint32_t queryAsync(const std::string& q); + + ConsoleEvent callMethod(const std::string& m, const qpid::types::Variant::Map& a, const DataAddr&, qpid::messaging::Duration t); + uint32_t callMethodAsync(const std::string& m, const qpid::types::Variant::Map& a, const DataAddr&); + + uint32_t getPackageCount() const; + const std::string& getPackage(uint32_t i) const; + uint32_t getSchemaIdCount(const std::string& p) const; + SchemaId getSchemaId(const std::string& p, uint32_t i) const; + Schema getSchema(const SchemaId& s, qpid::messaging::Duration t); + + private: + struct SyncContext { + qpid::sys::Mutex lock; + qpid::sys::Condition cond; + ConsoleEvent response; + }; + + mutable qpid::sys::Mutex lock; + std::string name; + uint32_t epoch; + ConsoleSessionImpl& session; + bool touched; + uint32_t untouchedCount; + qpid::types::Variant::Map attributes; + uint32_t nextCorrelator; + std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap; + boost::shared_ptr<SchemaCache> schemaCache; + mutable std::set<std::string> packageSet; + std::set<SchemaId> schemaIdSet; + + Query stringToQuery(const std::string&); + void sendQuery(const Query&, uint32_t); + void sendMethod(const std::string&, const qpid::types::Variant::Map&, const DataAddr&, uint32_t); + void sendSchemaRequest(const SchemaId&); + void learnSchemaId(const SchemaId&); + }; + + struct AgentImplAccess + { + static AgentImpl& get(Agent&); + static const AgentImpl& get(const Agent&); + }; +} + +#endif diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp new file mode 100644 index 0000000000..28c324cc02 --- /dev/null +++ b/cpp/src/qmf/AgentSession.cpp @@ -0,0 +1,944 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/AgentSession.h" +#include "qmf/AgentEventImpl.h" +#include "qmf/SchemaIdImpl.h" +#include "qmf/SchemaImpl.h" +#include "qmf/DataAddrImpl.h" +#include "qmf/DataImpl.h" +#include "qmf/Query.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/management/Buffer.h" +#include <queue> +#include <map> +#include <set> +#include <iostream> +#include <memory> + +using namespace std; +using namespace qpid::messaging; +using namespace qmf; +using qpid::types::Variant; + +namespace qmf { + class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { + public: + ~AgentSessionImpl(); + + // + // Methods from API handle + // + AgentSessionImpl(Connection& c, const string& o); + void setDomain(const string& d) { checkOpen(); domain = d; } + void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; } + void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; } + void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; } + void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; } + const string& getName() const { return agentName; } + void open(); + void close(); + bool nextEvent(AgentEvent& e, Duration t); + + void registerSchema(Schema& s); + DataAddr addData(Data& d, const string& n, bool persist); + void delData(const DataAddr&); + + void authAccept(AgentEvent& e); + void authReject(AgentEvent& e, const string& m); + void raiseException(AgentEvent& e, const string& s); + void raiseException(AgentEvent& e, const Data& d); + void response(AgentEvent& e, const Data& d); + void complete(AgentEvent& e); + void methodSuccess(AgentEvent& e); + void raiseEvent(const Data& d); + + private: + typedef map<DataAddr, Data, DataAddrCompare> DataIndex; + + mutable qpid::sys::Mutex lock; + qpid::sys::Condition cond; + Connection connection; + Session session; + string domain; + Variant::Map attributes; + Variant::Map options; + string agentName; + bool opened; + queue<AgentEvent> eventQueue; + qpid::sys::Thread* thread; + bool threadCanceled; + uint32_t bootSequence; + uint32_t interval; + uint64_t lastHeartbeat; + uint64_t lastVisit; + bool externalStorage; + bool autoAllowQueries; + bool autoAllowMethods; + uint64_t schemaUpdateTime; + string directBase; + string topicBase; + + set<string> packages; + map<SchemaId, Schema, SchemaIdCompare> schemata; + DataIndex globalIndex; + map<SchemaId, DataIndex, SchemaIdCompare> schemaIndex; + + void checkOpen(); + void setAgentName(); + void enqueueEvent(const AgentEvent&); + void handleLocateRequest(const Variant::Map& content, const Message& msg); + void handleMethodRequest(const Variant::Map& content, const Message& msg); + void handleQueryRequest(const Variant::Map& content, const Message& msg); + void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); + void dispatch(Message); + void sendHeartbeat(); + bool predicateMatch(const Query&, const Data&); + void flushResponses(AgentEvent&, bool); + void periodicProcessing(uint64_t); + void run(); + }; +} + +typedef qmf::PrivateImplRef<AgentSession> PI; + +AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); } +AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); } +AgentSession::~AgentSession() { PI::dtor(*this); } +AgentSession& AgentSession::operator=(const AgentSession& s) { return PI::assign(*this, s); } + +AgentSession::AgentSession(Connection& c, const string& o) { PI::ctor(*this, new AgentSessionImpl(c, o)); } +void AgentSession::setDomain(const string& d) { impl->setDomain(d); } +void AgentSession::setVendor(const string& v) { impl->setVendor(v); } +void AgentSession::setProduct(const string& p) { impl->setProduct(p); } +void AgentSession::setInstance(const string& i) { impl->setInstance(i); } +void AgentSession::setAttribute(const string& k, const qpid::types::Variant& v) { impl->setAttribute(k, v); } +const string& AgentSession::getName() const { return impl->getName(); } +void AgentSession::open() { impl->open(); } +void AgentSession::close() { impl->close(); } +bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); } +void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); } +DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); } +void AgentSession::delData(const DataAddr& a) { impl->delData(a); } +void AgentSession::authAccept(AgentEvent& e) { impl->authAccept(e); } +void AgentSession::authReject(AgentEvent& e, const string& m) { impl->authReject(e, m); } +void AgentSession::raiseException(AgentEvent& e, const string& s) { impl->raiseException(e, s); } +void AgentSession::raiseException(AgentEvent& e, const Data& d) { impl->raiseException(e, d); } +void AgentSession::response(AgentEvent& e, const Data& d) { impl->response(e, d); } +void AgentSession::complete(AgentEvent& e) { impl->complete(e); } +void AgentSession::methodSuccess(AgentEvent& e) { impl->methodSuccess(e); } +void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); } + +//======================================================================================== +// Impl Method Bodies +//======================================================================================== + +AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : + connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), + bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), externalStorage(false), + autoAllowQueries(true), autoAllowMethods(true), + schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) +{ + // + // Set Capability Level to 1 + // + attributes["_capability_level"] = 1; + + if (!options.empty()) { + qpid::messaging::AddressParser parser(options); + Variant::Map optMap; + Variant::Map::const_iterator iter; + + parser.parseMap(optMap); + + iter = optMap.find("domain"); + if (iter != optMap.end()) + domain = iter->second.asString(); + + iter = optMap.find("interval"); + if (iter != optMap.end()) { + interval = iter->second.asUint32(); + if (interval < 1) + interval = 1; + } + + iter = optMap.find("external"); + if (iter != optMap.end()) + externalStorage = iter->second.asBool(); + + iter = optMap.find("allow-queries"); + if (iter != optMap.end()) + autoAllowQueries = iter->second.asBool(); + + iter = optMap.find("allow-methods"); + if (iter != optMap.end()) + autoAllowMethods = iter->second.asBool(); + } +} + + +AgentSessionImpl::~AgentSessionImpl() +{ + if (opened) + close(); +} + + +void AgentSessionImpl::open() +{ + if (opened) + throw QmfException("The session is already open"); + opened = true; + + // Establish messaging addresses + setAgentName(); + directBase = "qmf." + domain + ".direct"; + topicBase = "qmf." + domain + ".topic"; + + // Create AMQP session, receivers, and senders + session = connection.createSession(); + Receiver directRx = session.createReceiver(directBase + "/" + agentName); + Receiver topicRx = session.createReceiver(topicBase + "/console.#"); + + directRx.setCapacity(64); + topicRx.setCapacity(64); + + // Start the receiver thread + threadCanceled = false; + thread = new qpid::sys::Thread(*this); + + // Send an initial agent heartbeat message + sendHeartbeat(); +} + + +void AgentSessionImpl::close() +{ + if (!opened) + throw QmfException("The session is already closed"); + + // Stop and join the receiver thread + threadCanceled = true; + thread->join(); + delete thread; + + // Close the AMQP session + session.close(); + opened = false; +} + + +bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) +{ + uint64_t milliseconds = timeout.getMilliseconds(); + qpid::sys::Mutex::ScopedLock l(lock); + + if (eventQueue.empty()) + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + + if (!eventQueue.empty()) { + event = eventQueue.front(); + eventQueue.pop(); + return true; + } + + return false; +} + + +void AgentSessionImpl::registerSchema(Schema& schema) +{ + qpid::sys::Mutex::ScopedLock l(lock); + schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + + if (!schema.isFinalized()) + schema.finalize(); + + const SchemaId& schemaId(schema.getSchemaId()); + const string& packageName(schemaId.getPackageName()); + + packages.insert(packageName); + schemata[schemaId] = schema; + schemaIndex[schemaId] = DataIndex(); +} + + +DataAddr AgentSessionImpl::addData(Data& data, const string& name, bool persistent) +{ + if (externalStorage) + throw QmfException("addData() must not be called when the 'external' option is enabled."); + + string dataName; + if (name.empty()) + dataName = qpid::types::Uuid(true).str(); + else + dataName = name; + + DataAddr addr(dataName, agentName, persistent ? 0 : bootSequence); + data.setAddr(addr); + + { + qpid::sys::Mutex::ScopedLock l(lock); + DataIndex::const_iterator iter = globalIndex.find(addr); + if (iter != globalIndex.end()) + throw QmfException("Duplicate Data Address"); + + globalIndex[addr] = data; + if (data.hasSchema()) + schemaIndex[data.getSchemaId()][addr] = data; + } + + // + // TODO: Survey active subscriptions to see if they need to hear about this new data. + // + + return addr; +} + + +void AgentSessionImpl::delData(const DataAddr& addr) +{ + { + qpid::sys::Mutex::ScopedLock l(lock); + DataIndex::iterator iter = globalIndex.find(addr); + if (iter == globalIndex.end()) + return; + if (iter->second.hasSchema()) { + const SchemaId& schemaId(iter->second.getSchemaId()); + schemaIndex[schemaId].erase(addr); + } + globalIndex.erase(iter); + } + + // + // TODO: Survey active subscriptions to see if they need to hear about this deleted data. + // +} + + +void AgentSessionImpl::authAccept(AgentEvent& authEvent) +{ + auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_QUERY)); + eventImpl->setQuery(authEvent.getQuery()); + eventImpl->setUserId(authEvent.getUserId()); + eventImpl->setReplyTo(AgentEventImplAccess::get(authEvent).getReplyTo()); + eventImpl->setCorrelationId(AgentEventImplAccess::get(authEvent).getCorrelationId()); + AgentEvent event(eventImpl.release()); + + if (externalStorage) { + enqueueEvent(event); + return; + } + + const Query& query(authEvent.getQuery()); + if (query.getDataAddr().isValid()) { + { + qpid::sys::Mutex::ScopedLock l(lock); + DataIndex::const_iterator iter = globalIndex.find(query.getDataAddr()); + if (iter != globalIndex.end()) + response(event, iter->second); + } + complete(event); + return; + } + + if (query.getSchemaId().isValid()) { + { + qpid::sys::Mutex::ScopedLock l(lock); + map<SchemaId, DataIndex>::const_iterator iter = schemaIndex.find(query.getSchemaId()); + if (iter != schemaIndex.end()) + for (DataIndex::const_iterator dIter = iter->second.begin(); dIter != iter->second.end(); dIter++) + if (predicateMatch(query, dIter->second)) + response(event, dIter->second); + } + complete(event); + return; + } + + const string& className(query.getClassName()); + const string& packageName(query.getPackageName()); + + if (className.empty()) { + raiseException(event, "Query is Invalid"); + return; + } + + { + qpid::sys::Mutex::ScopedLock l(lock); + map<SchemaId, DataIndex>::const_iterator sIter; + for (sIter = schemaIndex.begin(); sIter != schemaIndex.end(); sIter++) { + const SchemaId& schemaId(sIter->first); + if (schemaId.getName() == className && + (packageName.empty() || schemaId.getPackageName() == packageName)) + for (DataIndex::const_iterator dIter = sIter->second.begin(); dIter != sIter->second.end(); dIter++) + if (predicateMatch(query, dIter->second)) + response(event, dIter->second); + } + } + complete(event); +} + + +void AgentSessionImpl::authReject(AgentEvent& event, const string& error) +{ + raiseException(event, "Action Forbidden - " + error); +} + + +void AgentSessionImpl::raiseException(AgentEvent& event, const string& error) +{ + Data exception(new DataImpl()); + exception.setProperty("error_text", error); + raiseException(event, exception); +} + + +void AgentSessionImpl::raiseException(AgentEvent& event, const Data& data) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_exception"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = agentName; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); + const DataImpl& dataImpl(DataImplAccess::get(data)); + + msg.setCorrelationId(eventImpl.getCorrelationId()); + encode(dataImpl.asMap(), msg); + Sender sender(session.createSender(eventImpl.getReplyTo())); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT Exception to=" << eventImpl.getReplyTo()); +} + + +void AgentSessionImpl::response(AgentEvent& event, const Data& data) +{ + AgentEventImpl& impl(AgentEventImplAccess::get(event)); + uint32_t count = impl.enqueueData(data); + if (count >= 8) + flushResponses(event, false); +} + + +void AgentSessionImpl::complete(AgentEvent& event) +{ + flushResponses(event, true); +} + + +void AgentSessionImpl::methodSuccess(AgentEvent& event) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.agent"] = agentName; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); + + const Variant::Map& outArgs(eventImpl.getReturnArguments()); + const Variant::Map& outSubtypes(eventImpl.getReturnArgumentSubtypes()); + + map["_arguments"] = outArgs; + if (!outSubtypes.empty()) + map["_subtypes"] = outSubtypes; + + msg.setCorrelationId(eventImpl.getCorrelationId()); + encode(map, msg); + Sender sender(session.createSender(eventImpl.getReplyTo())); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT MethodResponse to=" << eventImpl.getReplyTo()); +} + + +void AgentSessionImpl::raiseEvent(const Data& data) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + std::stringstream address; + + address << topicBase << "/agent.ind.event"; + + // TODO: add severity.package.class to key + // or modify to send only to subscriptions with matching queries + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = agentName; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + encode(DataImplAccess::get(data).asMap(), msg); + Sender sender(session.createSender(address.str())); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT EventIndication to=" << address.str()); +} + + +void AgentSessionImpl::checkOpen() +{ + if (opened) + throw QmfException("Operation must be performed before calling open()"); +} + + +void AgentSessionImpl::enqueueEvent(const AgentEvent& event) +{ + qpid::sys::Mutex::ScopedLock l(lock); + bool notify = eventQueue.empty(); + eventQueue.push(event); + if (notify) + cond.notify(); +} + + +void AgentSessionImpl::setAgentName() +{ + Variant::Map::iterator iter; + string vendor; + string product; + string instance; + + iter = attributes.find("_vendor"); + if (iter == attributes.end()) + attributes["_vendor"] = vendor; + else + vendor = iter->second.asString(); + + iter = attributes.find("_product"); + if (iter == attributes.end()) + attributes["_product"] = product; + else + product = iter->second.asString(); + + iter = attributes.find("_instance"); + if (iter == attributes.end()) { + instance = qpid::types::Uuid(true).str(); + attributes["_instance"] = instance; + } else + instance = iter->second.asString(); + + agentName = vendor + ":" + product + ":" + instance; + attributes["_name"] = agentName; +} + + +void AgentSessionImpl::handleLocateRequest(const Variant::Map&, const Message& msg) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + + Message reply; + Variant::Map map; + Variant::Map& headers(reply.getProperties()); + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = agentName; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + map["_values"] = attributes; + map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + map["_values"].asMap()["epoch"] = bootSequence; + map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime; + + encode(map, reply); + Sender sender = session.createSender(msg.getReplyTo()); + sender.send(reply); + QPID_LOG(trace, "SENT AgentLocateResponse to=" << msg.getReplyTo()); + sender.close(); +} + + +void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg) +{ + QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo()); + + // + // Construct an AgentEvent to be sent to the application. + // + auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_METHOD)); + eventImpl->setUserId(msg.getUserId()); + eventImpl->setReplyTo(msg.getReplyTo()); + eventImpl->setCorrelationId(msg.getCorrelationId()); + + Variant::Map::const_iterator iter; + + iter = content.find("_method_name"); + if (iter == content.end()) { + AgentEvent event(eventImpl.release()); + raiseException(event, "Malformed MethodRequest: missing _method_name field"); + return; + } + eventImpl->setMethodName(iter->second.asString()); + + iter = content.find("_object_id"); + if (iter != content.end()) + eventImpl->setDataAddr(DataAddr(new DataAddrImpl(iter->second.asMap()))); + + iter = content.find("_arguments"); + if (iter != content.end()) + eventImpl->setArguments(iter->second.asMap()); + + iter = content.find("_subtypes"); + if (iter != content.end()) + eventImpl->setArgumentSubtypes(iter->second.asMap()); + + enqueueEvent(AgentEvent(eventImpl.release())); +} + + +void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg) +{ + QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo()); + + // + // Construct an AgentEvent to be sent to the application or directly handled by the agent. + // + auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_AUTH_QUERY)); + eventImpl->setUserId(msg.getUserId()); + eventImpl->setReplyTo(msg.getReplyTo()); + eventImpl->setCorrelationId(msg.getCorrelationId()); + + Query query; + Variant::Map::const_iterator iter; + + iter = content.find("_what"); + if (iter == content.end()) { + QPID_LOG(error, "Received QueryRequest with no _what element"); + return; + } + + if (iter->second.asString() == "OBJECT") { + // + // This is an object query, handle the various flavors of query. + // + iter = content.find("_object_id"); + if (iter != content.end()) { + auto_ptr<DataAddrImpl> addrImpl(new DataAddrImpl(iter->second.asMap())); + query = Query(DataAddr(addrImpl.release())); + } else { + iter = content.find("_schema_id"); + if (iter != content.end()) { + const Variant::Map& map(iter->second.asMap()); + string className; + string packageName; + + iter = map.find("_class_name"); + if (iter == map.end()) { + QPID_LOG(error, "Received QueryRequest with no invalid schemaId"); + return; + } + + className = iter->second.asString(); + iter = map.find("_package_name"); + if (iter != map.end()) + packageName = iter->second.asString(); + + query = Query(className, packageName); + } else { + QPID_LOG(error, "Received QueryRequest with no valid elements"); + return; + } + } + + eventImpl->setQuery(query); + + if (autoAllowQueries) { + AgentEvent ae(eventImpl.release()); + authAccept(ae); + } else + enqueueEvent(AgentEvent(eventImpl.release())); + + } else if (iter->second.asString() == "SCHEMA") { + // TODO: process a v2 schema request + } +} + + +void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, uint32_t seq, const Message& msg) +{ + string packageName; + string className; + uint8_t hashBits[16]; + + buffer.getShortString(packageName); + buffer.getShortString(className); + buffer.getBin128(hashBits); + + QPID_LOG(trace, "RCVD QMFv1 SchemaRequest for " << packageName << ":" << className); + + qpid::types::Uuid hash(hashBits); + map<SchemaId, Schema>::const_iterator iter; + string replyContent; + + SchemaId dataId(SCHEMA_TYPE_DATA, packageName, className); + dataId.setHash(hash); + + iter = schemata.find(dataId); + if (iter != schemata.end()) + replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); + else { + SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className); + eventId.setHash(hash); + iter = schemata.find(dataId); + if (iter != schemata.end()) + replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); + else + return; + } + + Message reply; + Variant::Map& headers(reply.getProperties()); + + headers["qmf.agent"] = agentName; + reply.setContent(replyContent); + + Sender sender = session.createSender(msg.getReplyTo()); + sender.send(reply); + QPID_LOG(trace, "SENT QMFv1 SchemaResponse to=" << msg.getReplyTo()); + sender.close(); +} + + +void AgentSessionImpl::dispatch(Message msg) +{ + const Variant::Map& properties(msg.getProperties()); + Variant::Map::const_iterator iter; + + iter = properties.find("x-amqp-0-10.app-id"); + if (iter != properties.end() && iter->second.asString() == "qmf2") { + // + // Dispatch a QMFv2 formatted message + // + iter = properties.find("qmf.opcode"); + if (iter == properties.end()) { + QPID_LOG(trace, "Message received with no 'qmf.opcode' header"); + return; + } + + if (msg.getContentType() != "amqp/map") { + QPID_LOG(trace, "Message received with content type '" << msg.getContentType() << + "'. Expected 'amqp/map'"); + return; + } + + Variant::Map content; + decode(msg, content); + + const string& opcode = iter->second.asString(); + + if (opcode == "_agent_locate_request") handleLocateRequest(content, msg); + else if (opcode == "_method_request") handleMethodRequest(content, msg); + else if (opcode == "_query_request") handleQueryRequest(content, msg); + else { + QPID_LOG(trace, "Unknown QMFv2 opcode: " << opcode); + } + } else { + // + // Dispatch a QMFv1 formatted message + // + const string& body(msg.getContent()); + if (body.size() < 8) + return; + qpid::management::Buffer buffer(const_cast<char*>(body.c_str()), body.size()); + + if (buffer.getOctet() != 'A') return; + if (buffer.getOctet() != 'M') return; + if (buffer.getOctet() != '2') return; + char v1Opcode(buffer.getOctet()); + uint32_t seq(buffer.getLong()); + + if (v1Opcode == 'S') handleV1SchemaRequest(buffer, seq, msg); + else { + QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode); + } + } +} + + +void AgentSessionImpl::sendHeartbeat() +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + std::stringstream address; + + address << topicBase << "/agent.ind.heartbeat"; + + // append .<vendor>.<product> to address key if present. + Variant::Map::const_iterator v; + if ((v = attributes.find("_vendor")) != attributes.end() && !v->second.getString().empty()) { + address << "." << v->second.getString(); + if ((v = attributes.find("_product")) != attributes.end() && !v->second.getString().empty()) { + address << "." << v->second.getString(); + } + } + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = agentName; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + map["_values"] = attributes; + map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + map["_values"].asMap()["epoch"] = bootSequence; + map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime; + + encode(map, msg); + Sender sender = session.createSender(address.str()); + sender.send(msg); + QPID_LOG(trace, "SENT AgentHeartbeat name=" << agentName); + sender.close(); +} + + +bool AgentSessionImpl::predicateMatch(const Query&, const Data&) +{ + // TODO: Implement a proper predicate match + return true; +} + + +void AgentSessionImpl::flushResponses(AgentEvent& event, bool final) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = agentName; + headers["x-amqp-0-10.app-id"] = "qmf2"; + if (!final) + headers["partial"] = Variant(); + + Variant::List body; + AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); + Data data(eventImpl.dequeueData()); + while (data.isValid()) { + DataImpl& dataImpl(DataImplAccess::get(data)); + body.push_back(dataImpl.asMap()); + data = eventImpl.dequeueData(); + } + + msg.setCorrelationId(eventImpl.getCorrelationId()); + encode(body, msg); + Sender sender(session.createSender(eventImpl.getReplyTo())); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT QueryResponse to=" << eventImpl.getReplyTo()); +} + + +void AgentSessionImpl::periodicProcessing(uint64_t seconds) +{ + // + // The granularity of this timer is seconds. Don't waste time looking for work if + // it's been less than a second since we last visited. + // + if (seconds == lastVisit) + return; + lastVisit = seconds; + + // + // First time through, set lastHeartbeat to the current time. + // + if (lastHeartbeat == 0) + lastHeartbeat = seconds; + + // + // If the hearbeat interval has elapsed, send a heartbeat. + // + if (seconds - lastHeartbeat >= interval) { + lastHeartbeat = seconds; + sendHeartbeat(); + } + + // + // TODO: process any active subscriptions on their intervals. + // +} + + +void AgentSessionImpl::run() +{ + QPID_LOG(debug, "AgentSession thread started for agent " << agentName); + + try { + while (!threadCanceled) { + periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC); + + Receiver rx; + bool valid = session.nextReceiver(rx, Duration::SECOND); + if (threadCanceled) + break; + if (valid) { + try { + dispatch(rx.fetch()); + } catch (qpid::types::Exception& e) { + QPID_LOG(error, "Exception caught in message dispatch: " << e.what()); + } + session.acknowledge(); + } + } + } catch (qpid::types::Exception& e) { + QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what()); + enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED))); + } + + QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); +} + + diff --git a/cpp/src/qmf/ConsoleEvent.cpp b/cpp/src/qmf/ConsoleEvent.cpp new file mode 100644 index 0000000000..d5775a86b4 --- /dev/null +++ b/cpp/src/qmf/ConsoleEvent.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/ConsoleEventImpl.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" + +using namespace std; +using namespace qmf; +using qpid::types::Variant; + +typedef PrivateImplRef<ConsoleEvent> PI; + +ConsoleEvent::ConsoleEvent(ConsoleEventImpl* impl) { PI::ctor(*this, impl); } +ConsoleEvent::ConsoleEvent(const ConsoleEvent& s) : qmf::Handle<ConsoleEventImpl>() { PI::copy(*this, s); } +ConsoleEvent::~ConsoleEvent() { PI::dtor(*this); } +ConsoleEvent& ConsoleEvent::operator=(const ConsoleEvent& s) { return PI::assign(*this, s); } + +ConsoleEventCode ConsoleEvent::getType() const { return impl->getType(); } +uint32_t ConsoleEvent::getCorrelator() const { return impl->getCorrelator(); } +Agent ConsoleEvent::getAgent() const { return impl->getAgent(); } +uint32_t ConsoleEvent::getDataCount() const { return impl->getDataCount(); } +Data ConsoleEvent::getData(uint32_t i) const { return impl->getData(i); } +bool ConsoleEvent::isFinal() const { return impl->isFinal(); } +const Variant::Map& ConsoleEvent::getArguments() const { return impl->getArguments(); } + +Data ConsoleEventImpl::getData(uint32_t i) const { + uint32_t count = 0; + for (list<Data>::const_iterator iter = dataList.begin(); iter != dataList.end(); iter++) { + if (count++ == i) + return *iter; + } + throw IndexOutOfRange(); +} + + +ConsoleEventImpl& ConsoleEventImplAccess::get(ConsoleEvent& item) +{ + return *item.impl; +} + + +const ConsoleEventImpl& ConsoleEventImplAccess::get(const ConsoleEvent& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/ConsoleEventImpl.h b/cpp/src/qmf/ConsoleEventImpl.h new file mode 100644 index 0000000000..fe7405bb06 --- /dev/null +++ b/cpp/src/qmf/ConsoleEventImpl.h @@ -0,0 +1,71 @@ +#ifndef _QMF_CONSOLE_EVENT_IMPL_H_ +#define _QMF_CONSOLE_EVENT_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/ConsoleEvent.h" +#include "qmf/Agent.h" +#include "qmf/Data.h" +#include "qpid/types/Variant.h" +#include <list> + +namespace qmf { + class ConsoleEventImpl : public virtual qpid::RefCounted { + public: + // + // Impl-only methods + // + ConsoleEventImpl(ConsoleEventCode e) : eventType(e), correlator(0), final(false) {} + void setCorrelator(uint32_t c) { correlator = c; } + void setAgent(const Agent& a) { agent = a; } + void addData(const Data& d) { dataList.push_back(Data(d)); } + void setFinal() { final = true; } + void setArguments(const qpid::types::Variant::Map& a) { arguments = a; } + + // + // Methods from API handle + // + ConsoleEventCode getType() const { return eventType; } + uint32_t getCorrelator() const { return correlator; } + Agent getAgent() const { return agent; } + uint32_t getDataCount() const { return dataList.size(); } + Data getData(uint32_t i) const; + bool isFinal() const { return final; } + const qpid::types::Variant::Map& getArguments() const { return arguments; } + + private: + const ConsoleEventCode eventType; + uint32_t correlator; + Agent agent; + bool final; + std::list<Data> dataList; + qpid::types::Variant::Map arguments; + }; + + struct ConsoleEventImplAccess + { + static ConsoleEventImpl& get(ConsoleEvent&); + static const ConsoleEventImpl& get(const ConsoleEvent&); + }; +} + +#endif diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp new file mode 100644 index 0000000000..18986222c1 --- /dev/null +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -0,0 +1,422 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/PrivateImplRef.h" +#include "qmf/ConsoleSessionImpl.h" +#include "qmf/AgentImpl.h" +#include "qmf/SchemaId.h" +#include "qmf/SchemaImpl.h" +#include "qmf/ConsoleEventImpl.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" + +using namespace std; +using namespace qpid::messaging; +using namespace qmf; +using qpid::types::Variant; + +typedef qmf::PrivateImplRef<ConsoleSession> PI; + +ConsoleSession::ConsoleSession(ConsoleSessionImpl* impl) { PI::ctor(*this, impl); } +ConsoleSession::ConsoleSession(const ConsoleSession& s) : qmf::Handle<ConsoleSessionImpl>() { PI::copy(*this, s); } +ConsoleSession::~ConsoleSession() { PI::dtor(*this); } +ConsoleSession& ConsoleSession::operator=(const ConsoleSession& s) { return PI::assign(*this, s); } + +ConsoleSession::ConsoleSession(Connection& c, const string& o) { PI::ctor(*this, new ConsoleSessionImpl(c, o)); } +void ConsoleSession::setDomain(const string& d) { impl->setDomain(d); } +void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); } +void ConsoleSession::open() { impl->open(); } +void ConsoleSession::close() { impl->close(); } +bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); } +uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } +Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } +Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } + +//======================================================================================== +// Impl Method Bodies +//======================================================================================== + +ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : + connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), + lastVisit(0), schemaCache(new SchemaCache()) +{ + if (!options.empty()) { + qpid::messaging::AddressParser parser(options); + Variant::Map optMap; + Variant::Map::const_iterator iter; + + parser.parseMap(optMap); + + iter = optMap.find("domain"); + if (iter != optMap.end()) + domain = iter->second.asString(); + } +} + + +ConsoleSessionImpl::~ConsoleSessionImpl() +{ + if (opened) + close(); +} + + +void ConsoleSessionImpl::setAgentFilter(const string&) +{ + // + // TODO: Setup the new agent filter + // TODO: Purge the agent list of any agents that don't match the filter + // TODO: Send an agent locate with the new filter + // +} + + +void ConsoleSessionImpl::open() +{ + if (opened) + throw QmfException("The session is already open"); + + // Establish messaging addresses + directBase = "qmf." + domain + ".direct"; + topicBase = "qmf." + domain + ".topic"; + + string myKey("qmf-console-" + qpid::types::Uuid(true).str()); + + replyAddress = Address(directBase + "/" + myKey + ";{node:{type:topic}}"); + + // Create AMQP session, receivers, and senders + session = connection.createSession(); + Receiver directRx = session.createReceiver(replyAddress); + Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating + Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}"); + + directRx.setCapacity(64); + topicRx.setCapacity(64); + legacyRx.setCapacity(64); + + // Start the receiver thread + threadCanceled = false; + thread = new qpid::sys::Thread(*this); + + // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. + sendBrokerLocate(); + + opened = true; +} + + +void ConsoleSessionImpl::close() +{ + if (!opened) + throw QmfException("The session is already closed"); + + // Stop and join the receiver thread + threadCanceled = true; + thread->join(); + delete thread; + + // Close the AMQP session + session.close(); + opened = false; +} + + +bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) +{ + uint64_t milliseconds = timeout.getMilliseconds(); + qpid::sys::Mutex::ScopedLock l(lock); + + if (eventQueue.empty()) + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + + if (!eventQueue.empty()) { + event = eventQueue.front(); + eventQueue.pop(); + return true; + } + + return false; +} + + +uint32_t ConsoleSessionImpl::getAgentCount() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return agents.size(); +} + + +Agent ConsoleSessionImpl::getAgent(uint32_t i) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + uint32_t count = 0; + for (map<string, Agent>::const_iterator iter = agents.begin(); iter != agents.end(); iter++) + if (count++ == i) + return iter->second; + throw IndexOutOfRange(); +} + + +void ConsoleSessionImpl::enqueueEvent(const ConsoleEvent& event) +{ + qpid::sys::Mutex::ScopedLock l(lock); + enqueueEventLH(event); +} + + +void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event) +{ + bool notify = eventQueue.empty(); + eventQueue.push(event); + if (notify) + cond.notify(); +} + + +void ConsoleSessionImpl::dispatch(Message msg) +{ + const Variant::Map& properties(msg.getProperties()); + Variant::Map::const_iterator iter; + + iter = properties.find("x-amqp-0-10.app-id"); + if (iter != properties.end() && iter->second.asString() == "qmf2") { + // + // Dispatch a QMFv2 formatted message + // + iter = properties.find("qmf.opcode"); + if (iter == properties.end()) { + QPID_LOG(trace, "Message received with no 'qmf.opcode' header"); + return; + } + const string& opcode = iter->second.asString(); + + iter = properties.find("qmf.agent"); + if (iter == properties.end()) { + QPID_LOG(trace, "Message received with no 'qmf.agent' header"); + return; + } + const string& agentName = iter->second.asString(); + + Agent agent; + { + qpid::sys::Mutex::ScopedLock l(lock); + map<string, Agent>::iterator aIter = agents.find(agentName); + if (aIter != agents.end()) { + agent = aIter->second; + AgentImplAccess::get(agent).touch(); + } + } + + if (msg.getContentType() == "amqp/map" && + (opcode == "_agent_heartbeat_indication" || opcode == "_agent_locate_response")) { + // + // This is the one case where it's ok (necessary actually) to receive a QMFv2 + // message from an unknown agent (how else are they going to get known?) + // + Variant::Map content; + decode(msg, content); + handleAgentUpdate(agentName, content, msg); + return; + } + + if (!agent.isValid()) { + QPID_LOG(trace, "Received a QMFv2 message with opcode=" << opcode << + " from an unknown agent " << agentName); + return; + } + + AgentImpl& agentImpl(AgentImplAccess::get(agent)); + + if (msg.getContentType() == "amqp/map") { + Variant::Map content; + decode(msg, content); + + if (opcode == "_exception") agentImpl.handleException(content, msg); + else if (opcode == "_method_response") agentImpl.handleMethodResponse(content, msg); + else + QPID_LOG(error, "Received a map-formatted QMFv2 message with opcode=" << opcode); + + return; + } + + if (msg.getContentType() == "amqp/list") { + Variant::List content; + decode(msg, content); + + if (opcode == "_query_response") agentImpl.handleQueryResponse(content, msg); + else if (opcode == "_data_indication") agentImpl.handleDataIndication(content, msg); + else + QPID_LOG(error, "Received a list-formatted QMFv2 message with opcode=" << opcode); + + return; + } + } else { + // + // Dispatch a QMFv1 formatted message + // + const string& body(msg.getContent()); + if (body.size() < 8) + return; + qpid::management::Buffer buffer(const_cast<char*>(body.c_str()), body.size()); + + if (buffer.getOctet() != 'A') return; + if (buffer.getOctet() != 'M') return; + if (buffer.getOctet() != '2') return; + char v1Opcode(buffer.getOctet()); + uint32_t seq(buffer.getLong()); + + if (v1Opcode == 's') handleV1SchemaResponse(buffer, seq, msg); + else { + QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode); + } + } +} + + +void ConsoleSessionImpl::sendBrokerLocate() +{ + Message msg; + Variant::Map& headers(msg.getProperties()); + + headers["method"] = "request"; + headers["qmf.opcode"] = "_agent_locate_request"; + headers["x-amqp-0-10.app-id"] = "qmf2"; + + msg.setReplyTo(replyAddress); + msg.setCorrelationId("broker-locate"); + Sender sender(session.createSender(directBase + "/broker")); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT AgentLocate to broker"); +} + + +void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg) +{ + Variant::Map::const_iterator iter; + Agent agent; + uint32_t epoch(0); + string cid(msg.getCorrelationId()); + + iter = content.find("_values"); + if (iter == content.end()) + return; + Variant::Map attrs(iter->second.asMap()); + + // + // TODO: Check this agent against the agent filter. Exit if it doesn't match. + // (only if this isn't the connected broker agent) + // + + iter = content.find("epoch"); + if (iter != content.end()) + epoch = iter->second.asUint32(); + + { + qpid::sys::Mutex::ScopedLock l(lock); + map<string, Agent>::iterator aIter = agents.find(agentName); + if (aIter == agents.end()) { + auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this)); + for (iter = attrs.begin(); iter != attrs.end(); iter++) + if (iter->first != "epoch") + impl->setAttribute(iter->first, iter->second); + agent = Agent(impl.release()); + agents[agentName] = agent; + + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD)); + eventImpl->setAgent(agent); + enqueueEventLH(ConsoleEvent(eventImpl.release())); + } else + agent = aIter->second; + + if (cid == "broker-locate") + connectedBrokerAgent = agent; + } + + AgentImplAccess::get(agent).touch(); + + // + // Changes we are interested in: + // + // agentEpoch - indicates that the agent restarted since we last heard from it + // schemaUpdated - indicates that the agent has registered new schemata + // +} + + +void ConsoleSessionImpl::handleV1SchemaResponse(qpid::management::Buffer& buffer, uint32_t, const Message&) +{ + QPID_LOG(trace, "RCVD V1SchemaResponse"); + Schema schema(new SchemaImpl(buffer)); + schemaCache->declareSchema(schema); +} + + +void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) +{ + // + // The granularity of this timer is seconds. Don't waste time looking for work if + // it's been less than a second since we last visited. + // + if (seconds == lastVisit) + return; + lastVisit = seconds; + + // + // TODO: Handle the aging of agent records + // +} + + +void ConsoleSessionImpl::run() +{ + QPID_LOG(debug, "ConsoleSession thread started"); + + try { + while (!threadCanceled) { + periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / + qpid::sys::TIME_SEC); + + Receiver rx; + bool valid = session.nextReceiver(rx, Duration::SECOND); + if (threadCanceled) + break; + if (valid) { + try { + dispatch(rx.fetch()); + } catch (qpid::types::Exception& e) { + QPID_LOG(error, "Exception caught in message dispatch: " << e.what()); + } + session.acknowledge(); + } + } + } catch (qpid::types::Exception& e) { + QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what()); + enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED))); + } + + QPID_LOG(debug, "ConsoleSession thread exiting"); +} + diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h new file mode 100644 index 0000000000..9a077b0390 --- /dev/null +++ b/cpp/src/qmf/ConsoleSessionImpl.h @@ -0,0 +1,95 @@ +#ifndef _QMF_CONSOLE_SESSION_IMPL_H_ +#define _QMF_CONSOLE_SESSION_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/ConsoleSession.h" +#include "qmf/AgentImpl.h" +#include "qmf/SchemaId.h" +#include "qmf/Schema.h" +#include "qmf/ConsoleEventImpl.h" +#include "qmf/SchemaCache.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Address.h" +#include "qpid/management/Buffer.h" +#include "qpid/types/Variant.h" +#include <map> +#include <queue> + +namespace qmf { + class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { + public: + ~ConsoleSessionImpl(); + + // + // Methods from API handle + // + ConsoleSessionImpl(qpid::messaging::Connection& c, const std::string& o); + void setDomain(const std::string& d) { domain = d; } + void setAgentFilter(const std::string& f); + void open(); + void close(); + bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t); + uint32_t getAgentCount() const; + Agent getAgent(uint32_t i) const; + Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; } + + protected: + mutable qpid::sys::Mutex lock; + qpid::sys::Condition cond; + qpid::messaging::Connection connection; + qpid::messaging::Session session; + std::string domain; + qpid::types::Variant::Map agentFilter; + bool opened; + std::queue<ConsoleEvent> eventQueue; + qpid::sys::Thread* thread; + bool threadCanceled; + uint64_t lastVisit; + std::map<std::string, Agent> agents; + Agent connectedBrokerAgent; + qpid::messaging::Address replyAddress; + std::string directBase; + std::string topicBase; + boost::shared_ptr<SchemaCache> schemaCache; + + void enqueueEvent(const ConsoleEvent&); + void enqueueEventLH(const ConsoleEvent&); + void dispatch(qpid::messaging::Message); + void sendBrokerLocate(); + void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&); + void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&); + void periodicProcessing(uint64_t); + void run(); + + friend class AgentImpl; + }; +} + +#endif diff --git a/cpp/src/qmf/Data.cpp b/cpp/src/qmf/Data.cpp new file mode 100644 index 0000000000..0ceca6e1e9 --- /dev/null +++ b/cpp/src/qmf/Data.cpp @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/DataImpl.h" +#include "qmf/DataAddrImpl.h" +#include "qmf/SchemaIdImpl.h" +#include "qmf/PrivateImplRef.h" + +using namespace std; +using namespace qmf; +using qpid::types::Variant; + +typedef PrivateImplRef<Data> PI; + +Data::Data(DataImpl* impl) { PI::ctor(*this, impl); } +Data::Data(const Data& s) : qmf::Handle<DataImpl>() { PI::copy(*this, s); } +Data::~Data() { PI::dtor(*this); } +Data& Data::operator=(const Data& s) { return PI::assign(*this, s); } + +Data::Data(const SchemaId& s) { PI::ctor(*this, new DataImpl(s)); } +void Data::setSchema(const SchemaId& s) { impl->setSchema(s); } +void Data::setAddr(const DataAddr& a) { impl->setAddr(a); } +void Data::setProperty(const string& k, const qpid::types::Variant& v) { impl->setProperty(k, v); } +void Data::overwriteProperties(const qpid::types::Variant::Map& m) { impl->overwriteProperties(m); } +bool Data::hasSchema() const { return impl->hasSchema(); } +bool Data::hasAddr() const { return impl->hasAddr(); } +const SchemaId& Data::getSchemaId() const { return impl->getSchemaId(); } +const DataAddr& Data::getAddr() const { return impl->getAddr(); } +const Variant& Data::getProperty(const string& k) const { return impl->getProperty(k); } +const Variant::Map& Data::getProperties() const { return impl->getProperties(); } +bool Data::hasAgent() const { return impl->hasAgent(); } +const Agent& Data::getAgent() const { return impl->getAgent(); } + + +void DataImpl::overwriteProperties(const Variant::Map& m) { + for (Variant::Map::const_iterator iter = m.begin(); iter != m.end(); iter++) + properties[iter->first] = iter->second; +} + +const Variant& DataImpl::getProperty(const string& k) const { + Variant::Map::const_iterator iter = properties.find(k); + if (iter == properties.end()) + throw KeyNotFound(k); + return iter->second; +} + + +DataImpl::DataImpl(const qpid::types::Variant::Map& map, const Agent& a) +{ + Variant::Map::const_iterator iter; + + agent = a; + + iter = map.find("_values"); + if (iter != map.end()) + properties = iter->second.asMap(); + + iter = map.find("_object_id"); + if (iter != map.end()) + dataAddr = DataAddr(new DataAddrImpl(iter->second.asMap())); + + iter = map.find("_schema_id"); + if (iter != map.end()) + schemaId = SchemaId(new SchemaIdImpl(iter->second.asMap())); +} + + +Variant::Map DataImpl::asMap() const +{ + Variant::Map result; + + result["_values"] = properties; + + if (hasAddr()) { + const DataAddrImpl& aImpl(DataAddrImplAccess::get(getAddr())); + result["_object_id"] = aImpl.asMap(); + } + + if (hasSchema()) { + const SchemaIdImpl& sImpl(SchemaIdImplAccess::get(getSchemaId())); + result["_schema_id"] = sImpl.asMap(); + } + + return result; +} + + +DataImpl& DataImplAccess::get(Data& item) +{ + return *item.impl; +} + + +const DataImpl& DataImplAccess::get(const Data& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/DataAddr.cpp b/cpp/src/qmf/DataAddr.cpp new file mode 100644 index 0000000000..c864ac9bf4 --- /dev/null +++ b/cpp/src/qmf/DataAddr.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/DataAddrImpl.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/DataAddr.h" +#include <iostream> + +using namespace std; +using namespace qmf; +using qpid::types::Variant; + +typedef PrivateImplRef<DataAddr> PI; + +DataAddr::DataAddr(DataAddrImpl* impl) { PI::ctor(*this, impl); } +DataAddr::DataAddr(const DataAddr& s) : qmf::Handle<DataAddrImpl>() { PI::copy(*this, s); } +DataAddr::~DataAddr() { PI::dtor(*this); } +DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); } + +bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; } +bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; } + +DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); } +const string& DataAddr::getName() const { return impl->getName(); } +const string& DataAddr::getAgentName() const { return impl->getAgentName(); } +uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); } +Variant::Map DataAddr::asMap() const { return impl->asMap(); } + +bool DataAddrImpl::operator==(const DataAddrImpl& other) +{ + return + agentName == other.agentName && + name == other.name && + agentEpoch == other.agentEpoch; +} + + +bool DataAddrImpl::operator<(const DataAddrImpl& other) +{ + if (agentName < other.agentName) return true; + if (agentName > other.agentName) return false; + if (name < other.name) return true; + if (name > other.name) return false; + return agentEpoch < other.agentEpoch; +} + + +DataAddrImpl::DataAddrImpl(const Variant::Map& map) +{ + Variant::Map::const_iterator iter; + + iter = map.find("_agent_name"); + if (iter != map.end()) + agentName = iter->second.asString(); + + iter = map.find("_object_name"); + if (iter != map.end()) + name = iter->second.asString(); + + iter = map.find("_agent_epoch"); + if (iter != map.end()) + agentEpoch = (uint32_t) iter->second.asUint64(); +} + + +Variant::Map DataAddrImpl::asMap() const +{ + Variant::Map result; + + result["_agent_name"] = agentName; + result["_object_name"] = name; + if (agentEpoch > 0) + result["_agent_epoch"] = agentEpoch; + return result; +} + + +DataAddrImpl& DataAddrImplAccess::get(DataAddr& item) +{ + return *item.impl; +} + + +const DataAddrImpl& DataAddrImplAccess::get(const DataAddr& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/DataAddrImpl.h b/cpp/src/qmf/DataAddrImpl.h new file mode 100644 index 0000000000..26acd60575 --- /dev/null +++ b/cpp/src/qmf/DataAddrImpl.h @@ -0,0 +1,73 @@ +#ifndef _QMF_DATA_ADDR_IMPL_H_ +#define _QMF_DATA_ADDR_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/types/Variant.h" +#include "qmf/DataAddr.h" + +namespace qmf { + class DataAddrImpl : public virtual qpid::RefCounted { + public: + // + // Impl-only methods + // + DataAddrImpl(const qpid::types::Variant::Map&); + void setName(const std::string& n) { name = n; } + void setAgent(const std::string& n, uint32_t e=0) { agentName = n; agentEpoch = e; } + + // + // Methods from API handle + // + bool operator==(const DataAddrImpl&); + bool operator<(const DataAddrImpl&); + DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) : + agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {} + const std::string& getName() const { return name; } + const std::string& getAgentName() const { return agentName; } + uint32_t getAgentEpoch() const { return agentEpoch; } + qpid::types::Variant::Map asMap() const; + + private: + std::string agentName; + std::string name; + uint32_t agentEpoch; + }; + + struct DataAddrImplAccess + { + static DataAddrImpl& get(DataAddr&); + static const DataAddrImpl& get(const DataAddr&); + }; + + struct DataAddrCompare { + bool operator() (const DataAddr& lhs, const DataAddr& rhs) const + { + if (lhs.getName() != rhs.getName()) + return lhs.getName() < rhs.getName(); + return lhs.getAgentName() < rhs.getAgentName(); + } + }; +} + +#endif diff --git a/cpp/src/qmf/DataImpl.h b/cpp/src/qmf/DataImpl.h new file mode 100644 index 0000000000..38b62791fc --- /dev/null +++ b/cpp/src/qmf/DataImpl.h @@ -0,0 +1,72 @@ +#ifndef _QMF_DATA_IMPL_H_ +#define _QMF_DATA_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/Data.h" +#include "qmf/SchemaId.h" +#include "qmf/DataAddr.h" +#include "qmf/Agent.h" +#include "qpid/types/Variant.h" + +namespace qmf { + class DataImpl : public virtual qpid::RefCounted { + public: + // + // Public impl-only methods + // + DataImpl(const qpid::types::Variant::Map&, const Agent&); + qpid::types::Variant::Map asMap() const; + DataImpl() {} + + // + // Methods from API handle + // + DataImpl(const SchemaId& s) : schemaId(s) {} + void setSchema(const SchemaId& s) { schemaId = s; } + void setAddr(const DataAddr& a) { dataAddr = a; } + void setProperty(const std::string& k, const qpid::types::Variant& v) { properties[k] = v; } + void overwriteProperties(const qpid::types::Variant::Map& m); + bool hasSchema() const { return schemaId.isValid(); } + bool hasAddr() const { return dataAddr.isValid(); } + const SchemaId& getSchemaId() const { return schemaId; } + const DataAddr& getAddr() const { return dataAddr; } + const qpid::types::Variant& getProperty(const std::string& k) const; + const qpid::types::Variant::Map& getProperties() const { return properties; } + bool hasAgent() const { return agent.isValid(); } + const Agent& getAgent() const { return agent; } + + private: + SchemaId schemaId; + DataAddr dataAddr; + qpid::types::Variant::Map properties; + Agent agent; + }; + + struct DataImplAccess + { + static DataImpl& get(Data&); + static const DataImpl& get(const Data&); + }; +} + +#endif diff --git a/cpp/src/qmf/Hash.cpp b/cpp/src/qmf/Hash.cpp new file mode 100644 index 0000000000..86738dda2f --- /dev/null +++ b/cpp/src/qmf/Hash.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/Hash.h" + +using namespace qmf; + +Hash::Hash() +{ + data[0] = 0x5A5A5A5A5A5A5A5ALL; + data[1] = 0x5A5A5A5A5A5A5A5ALL; +} + +void Hash::update(const char* s, uint32_t len) +{ + uint64_t* first = &data[0]; + uint64_t* second = &data[1]; + + for (uint32_t idx = 0; idx < len; idx++) { + uint64_t recycle = ((*second & 0xff00000000000000LL) >> 56); + *second = *second << 8; + *second |= ((*first & 0xFF00000000000000LL) >> 56); + *first = *first << 8; + *first = *first + (uint64_t) s[idx] + recycle; + } +} + diff --git a/cpp/src/qmf/Hash.h b/cpp/src/qmf/Hash.h new file mode 100644 index 0000000000..e1eff84117 --- /dev/null +++ b/cpp/src/qmf/Hash.h @@ -0,0 +1,44 @@ +#ifndef QMF_HASH_H +#define QMF_HASH_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/types/Uuid.h" +#include <string> + +namespace qmf { + class Hash { + public: + Hash(); + qpid::types::Uuid asUuid() const { return qpid::types::Uuid((unsigned char*) data); } + void update(const char* s, uint32_t len); + void update(uint8_t v) { update((char*) &v, sizeof(v)); } + void update(uint32_t v) { update((char*) &v, sizeof(v)); } + void update(const std::string& v) { update(const_cast<char*>(v.c_str()), v.size()); } + void update(bool v) { update(uint8_t(v ? 1 : 0)); } + + private: + uint64_t data[2]; + }; +} + +#endif diff --git a/cpp/src/qmf/PrivateImplRef.h b/cpp/src/qmf/PrivateImplRef.h new file mode 100644 index 0000000000..8b698c4199 --- /dev/null +++ b/cpp/src/qmf/PrivateImplRef.h @@ -0,0 +1,93 @@ +#ifndef QMF_PRIVATEIMPL_H +#define QMF_PRIVATEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/ImportExport.h" +#include <boost/intrusive_ptr.hpp> +#include "qpid/RefCounted.h" + +namespace qmf { + +/** + * Helper class to implement a class with a private, reference counted + * implementation and reference semantics. + * + * Such classes are used in the public API to hide implementation, they + * should. Example of use: + * + * === Foo.h + * + * template <class T> PrivateImplRef; + * class FooImpl; + * + * Foo : public Handle<FooImpl> { + * public: + * Foo(FooImpl* = 0); + * Foo(const Foo&); + * ~Foo(); + * Foo& operator=(const Foo&); + * + * int fooDo(); // and other Foo functions... + * + * private: + * typedef FooImpl Impl; + * Impl* impl; + * friend class PrivateImplRef<Foo>; + * + * === Foo.cpp + * + * typedef PrivateImplRef<Foo> PI; + * Foo::Foo(FooImpl* p) { PI::ctor(*this, p); } + * Foo::Foo(const Foo& c) : Handle<FooImpl>() { PI::copy(*this, c); } + * Foo::~Foo() { PI::dtor(*this); } + * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); } + * + * int foo::fooDo() { return impl->fooDo(); } + * + */ +template <class T> class PrivateImplRef { + public: + typedef typename T::Impl Impl; + typedef boost::intrusive_ptr<Impl> intrusive_ptr; + + /** Get the implementation pointer from a handle */ + static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); } + + /** Set the implementation pointer in a handle */ + static void set(T& t, const intrusive_ptr& p) { + if (t.impl == p) return; + if (t.impl) boost::intrusive_ptr_release(t.impl); + t.impl = p.get(); + if (t.impl) boost::intrusive_ptr_add_ref(t.impl); + } + + // Helper functions to implement the ctor, dtor, copy, assign + static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } + static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); } + static T& assign(T& t, const T& x) { set(t, get(x)); return t;} +}; + +} // namespace qmf + +#endif /*!QMF_PRIVATEIMPL_H*/ diff --git a/cpp/src/qmf/Query.cpp b/cpp/src/qmf/Query.cpp new file mode 100644 index 0000000000..9b67c7d8b3 --- /dev/null +++ b/cpp/src/qmf/Query.cpp @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/Query.h" +#include "qmf/DataAddr.h" +#include "qmf/SchemaId.h" + +using namespace std; +using qpid::types::Variant; + +namespace qmf { + class QueryImpl : public virtual qpid::RefCounted { + public: + // + // Methods from API handle + // + QueryImpl(const string& c, const string& p, const string&) : packageName(p), className(c) {} + QueryImpl(const SchemaId& s) : schemaId(s) {} + QueryImpl(const DataAddr& a) : dataAddr(a) {} + + const DataAddr& getDataAddr() const { return dataAddr; } + const SchemaId& getSchemaId() const { return schemaId; } + const string& getClassName() const { return className; } + const string& getPackageName() const { return packageName; } + void addPredicate(const string& k, const Variant& v) { predicate[k] = v; } + const Variant::Map& getPredicate() const { return predicate; } + + private: + string packageName; + string className; + SchemaId schemaId; + DataAddr dataAddr; + Variant::Map predicate; + }; + + typedef PrivateImplRef<Query> PI; + + Query::Query(QueryImpl* impl) { PI::ctor(*this, impl); } + Query::Query(const Query& s) : qmf::Handle<QueryImpl>() { PI::copy(*this, s); } + Query::~Query() { PI::dtor(*this); } + Query& Query::operator=(const Query& s) { return PI::assign(*this, s); } + + Query::Query(const string& c, const string& p, const string& pr) { PI::ctor(*this, new QueryImpl(c, p, pr)); } + Query::Query(const SchemaId& s) { PI::ctor(*this, new QueryImpl(s)); } + Query::Query(const DataAddr& a) { PI::ctor(*this, new QueryImpl(a)); } + + const DataAddr& Query::getDataAddr() const { return impl->getDataAddr(); } + const SchemaId& Query::getSchemaId() const { return impl->getSchemaId(); } + const string& Query::getClassName() const { return impl->getClassName(); } + const string& Query::getPackageName() const { return impl->getPackageName(); } + void Query::addPredicate(const string& k, const Variant& v) { impl->addPredicate(k, v); } + const Variant::Map& Query::getPredicate() const { return impl->getPredicate(); } +} + diff --git a/cpp/src/qmf/Schema.cpp b/cpp/src/qmf/Schema.cpp new file mode 100644 index 0000000000..5676125c22 --- /dev/null +++ b/cpp/src/qmf/Schema.cpp @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/SchemaImpl.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/SchemaTypes.h" +#include "qmf/SchemaIdImpl.h" +#include "qmf/SchemaPropertyImpl.h" +#include "qmf/SchemaMethodImpl.h" +#include "qmf/Hash.h" +#include "qpid/log/Statement.h" +#include "qpid/management/Buffer.h" +#include <list> + +using namespace std; +using qpid::types::Variant; +using namespace qmf; + +typedef PrivateImplRef<Schema> PI; + +Schema::Schema(SchemaImpl* impl) { PI::ctor(*this, impl); } +Schema::Schema(const Schema& s) : qmf::Handle<SchemaImpl>() { PI::copy(*this, s); } +Schema::~Schema() { PI::dtor(*this); } +Schema& Schema::operator=(const Schema& s) { return PI::assign(*this, s); } + +Schema::Schema(int t, const string& p, const string& c) { PI::ctor(*this, new SchemaImpl(t, p, c)); } +const SchemaId& Schema::getSchemaId() const { return impl->getSchemaId(); } +void Schema::finalize() { impl->finalize(); } +bool Schema::isFinalized() const { return impl->isFinalized(); } +void Schema::addProperty(const SchemaProperty& p) { impl->addProperty(p); } +void Schema::addMethod(const SchemaMethod& m) { impl->addMethod(m); } +void Schema::setDesc(const string& d) { impl->setDesc(d); } +const string& Schema::getDesc() const { return impl->getDesc(); } +void Schema::setDefaultSeverity(int s) { impl->setDefaultSeverity(s); } +int Schema::getDefaultSeverity() const { return impl->getDefaultSeverity(); } +uint32_t Schema::getPropertyCount() const { return impl->getPropertyCount(); } +SchemaProperty Schema::getProperty(uint32_t i) const { return impl->getProperty(i); } +uint32_t Schema::getMethodCount() const { return impl->getMethodCount(); } +SchemaMethod Schema::getMethod(uint32_t i) const { return impl->getMethod(i); } + +//======================================================================================== +// Impl Method Bodies +//======================================================================================== + +SchemaImpl::SchemaImpl(const qpid::types::Variant::Map&) : finalized(true) +{ +} + + +SchemaImpl::SchemaImpl(qpid::management::Buffer& buffer) : finalized(false) +{ + int schemaType; + string packageName; + string className; + uint8_t hash[16]; + + schemaType = int(buffer.getOctet()); + buffer.getShortString(packageName); + buffer.getShortString(className); + buffer.getBin128(hash); + schemaId = SchemaId(schemaType, packageName, className); + schemaId.setHash(qpid::types::Uuid(hash)); + + if (schemaType == SCHEMA_TYPE_DATA) { + uint16_t propCount(buffer.getShort()); + uint16_t statCount(buffer.getShort()); + uint16_t methCount(buffer.getShort()); + for (uint16_t idx = 0; idx < propCount + statCount; idx++) + addProperty(new SchemaPropertyImpl(buffer)); + for (uint16_t idx = 0; idx < methCount; idx++) + addMethod(new SchemaMethodImpl(buffer)); + } + + finalized = true; +} + + +string SchemaImpl::asV1Content(uint32_t sequence) const +{ +#define RAW_BUF_SIZE 65536 + char rawBuf[RAW_BUF_SIZE]; + qpid::management::Buffer buffer(rawBuf, RAW_BUF_SIZE); + + // + // Encode the QMFv1 Header + // + buffer.putOctet('A'); + buffer.putOctet('M'); + buffer.putOctet('2'); + buffer.putOctet('s'); + buffer.putLong(sequence); + + // + // Encode the common schema information + // + buffer.putOctet(uint8_t(schemaId.getType())); + buffer.putShortString(schemaId.getPackageName()); + buffer.putShortString(schemaId.getName()); + buffer.putBin128(schemaId.getHash().data()); + + if (schemaId.getType() == SCHEMA_TYPE_DATA) { + buffer.putShort(properties.size()); + buffer.putShort(0); + buffer.putShort(methods.size()); + for (list<SchemaProperty>::const_iterator pIter = properties.begin(); pIter != properties.end(); pIter++) + SchemaPropertyImplAccess::get(*pIter).encodeV1(buffer, false, false); + for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++) + SchemaMethodImplAccess::get(*mIter).encodeV1(buffer); + } else { + buffer.putShort(properties.size()); + for (list<SchemaProperty>::const_iterator pIter = properties.begin(); pIter != properties.end(); pIter++) + SchemaPropertyImplAccess::get(*pIter).encodeV1(buffer, true, false); + } + + return string(rawBuf, buffer.getPosition()); +} + + +void SchemaImpl::finalize() +{ + Hash hash; + + hash.update((uint8_t) schemaId.getType()); + hash.update(schemaId.getPackageName()); + hash.update(schemaId.getName()); + + for (list<SchemaProperty>::const_iterator pIter = properties.begin(); pIter != properties.end(); pIter++) + SchemaPropertyImplAccess::get(*pIter).updateHash(hash); + for (list<SchemaMethod>::const_iterator mIter = methods.begin(); mIter != methods.end(); mIter++) + SchemaMethodImplAccess::get(*mIter).updateHash(hash); + + schemaId.setHash(hash.asUuid()); + QPID_LOG(debug, "Schema Finalized: " << schemaId.getPackageName() << ":" << schemaId.getName() << ":" << + schemaId.getHash()); + + finalized = true; +} + + +SchemaProperty SchemaImpl::getProperty(uint32_t i) const +{ + uint32_t count = 0; + for (list<SchemaProperty>::const_iterator iter = properties.begin(); iter != properties.end(); iter++) + if (count++ == i) + return *iter; + throw IndexOutOfRange(); +} + + +SchemaMethod SchemaImpl::getMethod(uint32_t i) const +{ + uint32_t count = 0; + for (list<SchemaMethod>::const_iterator iter = methods.begin(); iter != methods.end(); iter++) + if (count++ == i) + return *iter; + throw IndexOutOfRange(); +} + +void SchemaImpl::checkFinal() const +{ + if (finalized) + throw QmfException("Modification of a finalized schema is forbidden"); +} + + +void SchemaImpl::checkNotFinal() const +{ + if (!finalized) + throw QmfException("Schema is not yet finalized/registered"); +} + + +SchemaImpl& SchemaImplAccess::get(Schema& item) +{ + return *item.impl; +} + + +const SchemaImpl& SchemaImplAccess::get(const Schema& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/SchemaCache.cpp b/cpp/src/qmf/SchemaCache.cpp new file mode 100644 index 0000000000..74ca4044fd --- /dev/null +++ b/cpp/src/qmf/SchemaCache.cpp @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/SchemaCache.h" +#include "qmf/exceptions.h" + +using namespace std; +using namespace qmf; + +bool SchemaCache::declareSchemaId(const SchemaId& id) +{ + qpid::sys::Mutex::ScopedLock l(lock); + SchemaMap::const_iterator iter = schemata.find(id); + if (iter == schemata.end()) { + schemata[id] = Schema(); + return false; + } + return true; +} + + +void SchemaCache::declareSchema(const Schema& schema) +{ + qpid::sys::Mutex::ScopedLock l(lock); + SchemaMap::const_iterator iter = schemata.find(schema.getSchemaId()); + if (iter == schemata.end() || !iter->second.isValid()) { + schemata[schema.getSchemaId()] = schema; + + // + // If there are any threads blocking in SchemaCache::getSchema waiting for + // this schema, unblock them all now. + // + CondMap::iterator cIter = conditions.find(schema.getSchemaId()); + if (cIter != conditions.end()) + cIter->second->notifyAll(); + } +} + + +bool SchemaCache::haveSchema(const SchemaId& id) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + SchemaMap::const_iterator iter = schemata.find(id); + return iter != schemata.end() && iter->second.isValid(); +} + + +const Schema& SchemaCache::getSchema(const SchemaId& id, qpid::messaging::Duration timeout) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + SchemaMap::const_iterator iter = schemata.find(id); + if (iter != schemata.end() && iter->second.isValid()) + return iter->second; + + // + // The desired schema is not in the cache. Assume that the caller knows this and has + // sent a schema request to the remote agent and now wishes to wait until the schema + // information arrives. + // + CondMap::iterator cIter = conditions.find(id); + if (cIter == conditions.end()) + conditions[id] = boost::shared_ptr<qpid::sys::Condition>(new qpid::sys::Condition()); + + uint64_t milliseconds = timeout.getMilliseconds(); + conditions[id]->wait(lock, qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + iter = schemata.find(id); + if (iter != schemata.end() && iter->second.isValid()) + return iter->second; + + throw QmfException("Schema lookup timed out"); +} + diff --git a/cpp/src/qmf/SchemaCache.h b/cpp/src/qmf/SchemaCache.h new file mode 100644 index 0000000000..a1f104233f --- /dev/null +++ b/cpp/src/qmf/SchemaCache.h @@ -0,0 +1,56 @@ +#ifndef QMF_SCHEMA_CACHE_H +#define QMF_SCHEMA_CACHE_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/SchemaIdImpl.h" +#include "qmf/Schema.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include "qpid/messaging/Duration.h" +#include <string> +#include <map> +#include <boost/shared_ptr.hpp> + +namespace qmf { + + class SchemaCache { + public: + SchemaCache() {} + ~SchemaCache() {} + + bool declareSchemaId(const SchemaId&); + void declareSchema(const Schema&); + bool haveSchema(const SchemaId&) const; + const Schema& getSchema(const SchemaId&, qpid::messaging::Duration) const; + + private: + mutable qpid::sys::Mutex lock; + typedef std::map<SchemaId, Schema, SchemaIdCompare> SchemaMap; + typedef std::map<SchemaId, boost::shared_ptr<qpid::sys::Condition>, SchemaIdCompare> CondMap; + SchemaMap schemata; + mutable CondMap conditions; + }; + +} + +#endif + diff --git a/cpp/src/qmf/SchemaId.cpp b/cpp/src/qmf/SchemaId.cpp new file mode 100644 index 0000000000..110a2553fd --- /dev/null +++ b/cpp/src/qmf/SchemaId.cpp @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/SchemaIdImpl.h" +#include "qmf/PrivateImplRef.h" + +using namespace std; +using namespace qmf; +using qpid::types::Variant; + +typedef PrivateImplRef<SchemaId> PI; + +SchemaId::SchemaId(SchemaIdImpl* impl) { PI::ctor(*this, impl); } +SchemaId::SchemaId(const SchemaId& s) : qmf::Handle<SchemaIdImpl>() { PI::copy(*this, s); } +SchemaId::~SchemaId() { PI::dtor(*this); } +SchemaId& SchemaId::operator=(const SchemaId& s) { return PI::assign(*this, s); } + +SchemaId::SchemaId(int t, const string& p, const string& n) { PI::ctor(*this, new SchemaIdImpl(t, p, n)); } +void SchemaId::setHash(const qpid::types::Uuid& h) { impl->setHash(h); } +int SchemaId::getType() const { return impl->getType(); } +const string& SchemaId::getPackageName() const { return impl->getPackageName(); } +const string& SchemaId::getName() const { return impl->getName(); } +const qpid::types::Uuid& SchemaId::getHash() const { return impl->getHash(); } + + +SchemaIdImpl::SchemaIdImpl(const Variant::Map& map) +{ + Variant::Map::const_iterator iter; + + iter = map.find("_package_name"); + if (iter != map.end()) + package = iter->second.asString(); + + iter = map.find("_class_name"); + if (iter != map.end()) + name = iter->second.asString(); + + iter = map.find("_type"); + if (iter != map.end()) { + const string& stype = iter->second.asString(); + if (stype == "_data") + sType = SCHEMA_TYPE_DATA; + else if (stype == "_event") + sType = SCHEMA_TYPE_EVENT; + } + + iter = map.find("_hash"); + if (iter != map.end()) + hash = iter->second.asUuid(); +} + + +Variant::Map SchemaIdImpl::asMap() const +{ + Variant::Map result; + + result["_package_name"] = package; + result["_class_name"] = name; + if (sType == SCHEMA_TYPE_DATA) + result["_type"] = "_data"; + else + result["_type"] = "_event"; + result["_hash"] = hash; + return result; +} + + +SchemaIdImpl& SchemaIdImplAccess::get(SchemaId& item) +{ + return *item.impl; +} + + +const SchemaIdImpl& SchemaIdImplAccess::get(const SchemaId& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/SchemaIdImpl.h b/cpp/src/qmf/SchemaIdImpl.h new file mode 100644 index 0000000000..df3cc076b9 --- /dev/null +++ b/cpp/src/qmf/SchemaIdImpl.h @@ -0,0 +1,74 @@ +#ifndef _QMF_SCHEMA_ID_IMPL_H_ +#define _QMF_SCHEMA_ID_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/SchemaId.h" +#include "qpid/types/Variant.h" +#include "qpid/types/Uuid.h" +#include <string> + +namespace qmf { + class SchemaIdImpl : public virtual qpid::RefCounted { + public: + // + // Public impl-only methods + // + SchemaIdImpl(const qpid::types::Variant::Map&); + qpid::types::Variant::Map asMap() const; + + // + // Methods from API handle + // + SchemaIdImpl(int t, const std::string& p, const std::string& n) : sType(t), package(p), name(n) {} + void setHash(const qpid::types::Uuid& h) { hash = h; } + int getType() const { return sType; } + const std::string& getPackageName() const { return package; } + const std::string& getName() const { return name; } + const qpid::types::Uuid& getHash() const { return hash; } + + private: + int sType; + std::string package; + std::string name; + qpid::types::Uuid hash; + }; + + struct SchemaIdImplAccess + { + static SchemaIdImpl& get(SchemaId&); + static const SchemaIdImpl& get(const SchemaId&); + }; + + struct SchemaIdCompare { + bool operator() (const SchemaId& lhs, const SchemaId& rhs) const + { + if (lhs.getName() != rhs.getName()) + return lhs.getName() < rhs.getName(); + if (lhs.getPackageName() != rhs.getPackageName()) + return lhs.getPackageName() < rhs.getPackageName(); + return lhs.getHash() < rhs.getHash(); + } + }; +} + +#endif diff --git a/cpp/src/qmf/SchemaImpl.h b/cpp/src/qmf/SchemaImpl.h new file mode 100644 index 0000000000..eae3a3c37f --- /dev/null +++ b/cpp/src/qmf/SchemaImpl.h @@ -0,0 +1,90 @@ +#ifndef _QMF_SCHEMAIMPL_H_ +#define _QMF_SCHEMAIMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/SchemaTypes.h" +#include "qmf/SchemaId.h" +#include "qmf/Schema.h" +#include "qmf/SchemaProperty.h" +#include "qmf/SchemaMethod.h" +#include <list> + +namespace qpid { +namespace management { + class Buffer; +}} + +namespace qmf { + class SchemaImpl : public virtual qpid::RefCounted { + public: + // + // Impl-only public methods + // + SchemaImpl(const qpid::types::Variant::Map& m); + SchemaImpl(qpid::management::Buffer& v1Buffer); + std::string asV1Content(uint32_t sequence) const; + + // + // Methods from API handle + // + SchemaImpl(int t, const std::string& p, const std::string& c) : schemaId(t, p, c), finalized(false) {} + const SchemaId& getSchemaId() const { checkNotFinal(); return schemaId; } + + void finalize(); + bool isFinalized() const { return finalized; } + void addProperty(const SchemaProperty& p) { checkFinal(); properties.push_back(p); } + void addMethod(const SchemaMethod& m) { checkFinal(); methods.push_back(m); } + + void setDesc(const std::string& d) { description = d; } + const std::string& getDesc() const { return description; } + + void setDefaultSeverity(int s) { checkFinal(); defaultSeverity = s; } + int getDefaultSeverity() const { return defaultSeverity; } + + uint32_t getPropertyCount() const { return properties.size(); } + SchemaProperty getProperty(uint32_t i) const; + + uint32_t getMethodCount() const { return methods.size(); } + SchemaMethod getMethod(uint32_t i) const; + private: + SchemaId schemaId; + int defaultSeverity; + std::string description; + bool finalized; + std::list<SchemaProperty> properties; + std::list<SchemaMethod> methods; + + void checkFinal() const; + void checkNotFinal() const; + }; + + struct SchemaImplAccess + { + static SchemaImpl& get(Schema&); + static const SchemaImpl& get(const Schema&); + }; +} + +#endif diff --git a/cpp/src/qmf/SchemaMethod.cpp b/cpp/src/qmf/SchemaMethod.cpp new file mode 100644 index 0000000000..7ee6646ec4 --- /dev/null +++ b/cpp/src/qmf/SchemaMethod.cpp @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/SchemaMethodImpl.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/Hash.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/management/Buffer.h" + +using namespace std; +using qpid::types::Variant; +using namespace qmf; + +typedef PrivateImplRef<SchemaMethod> PI; + +SchemaMethod::SchemaMethod(SchemaMethodImpl* impl) { PI::ctor(*this, impl); } +SchemaMethod::SchemaMethod(const SchemaMethod& s) : qmf::Handle<SchemaMethodImpl>() { PI::copy(*this, s); } +SchemaMethod::~SchemaMethod() { PI::dtor(*this); } +SchemaMethod& SchemaMethod::operator=(const SchemaMethod& s) { return PI::assign(*this, s); } + +SchemaMethod::SchemaMethod(const string& n, const string& o) { PI::ctor(*this, new SchemaMethodImpl(n, o)); } +void SchemaMethod::setDesc(const string& d) { impl->setDesc(d); } +void SchemaMethod::addArgument(const SchemaProperty& p) { impl->addArgument(p); } +const string& SchemaMethod::getName() const { return impl->getName(); } +const string& SchemaMethod::getDesc() const { return impl->getDesc(); } +uint32_t SchemaMethod::getArgumentCount() const { return impl->getArgumentCount(); } +SchemaProperty SchemaMethod::getArgument(uint32_t i) const { return impl->getArgument(i); } + +//======================================================================================== +// Impl Method Bodies +//======================================================================================== + +SchemaMethodImpl::SchemaMethodImpl(const string& n, const string& options) : name(n) +{ + if (!options.empty()) { + qpid::messaging::AddressParser parser = qpid::messaging::AddressParser(options); + Variant::Map optMap; + Variant::Map::iterator iter; + + parser.parseMap(optMap); + iter = optMap.find("desc"); + if (iter != optMap.end()) { + desc = iter->second.asString(); + optMap.erase(iter); + } + + if (!optMap.empty()) + throw QmfException("Unrecognized option: " + optMap.begin()->first); + } +} + +SchemaMethodImpl::SchemaMethodImpl(const qpid::types::Variant::Map&) +{ +} + +SchemaMethodImpl::SchemaMethodImpl(qpid::management::Buffer& buffer) +{ + Variant::Map::const_iterator iter; + Variant::Map argMap; + + buffer.getMap(argMap); + + iter = argMap.find("name"); + if (iter == argMap.end()) + throw QmfException("Received V1 Method without a name"); + name = iter->second.asString(); + + iter = argMap.find("desc"); + if (iter != argMap.end()) + desc = iter->second.asString(); + + iter = argMap.find("argCount"); + if (iter == argMap.end()) + throw QmfException("Received V1 Method without argCount"); + + int64_t count = iter->second.asInt64(); + for (int idx = 0; idx < count; idx++) { + SchemaProperty arg(new SchemaPropertyImpl(buffer)); + addArgument(arg); + } +} + + +SchemaProperty SchemaMethodImpl::getArgument(uint32_t i) const +{ + uint32_t count = 0; + for (list<SchemaProperty>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++) + if (count++ == i) + return *iter; + + throw IndexOutOfRange(); +} + + +void SchemaMethodImpl::updateHash(Hash& hash) const +{ + hash.update(name); + hash.update(desc); + for (list<SchemaProperty>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++) + SchemaPropertyImplAccess::get(*iter).updateHash(hash); +} + + +void SchemaMethodImpl::encodeV1(qpid::management::Buffer& buffer) const +{ + Variant::Map map; + + map["name"] = name; + map["argCount"] = arguments.size(); + if (!desc.empty()) + map["desc"] = desc; + + buffer.putMap(map); + + for (list<SchemaProperty>::const_iterator iter = arguments.begin(); iter != arguments.end(); iter++) + SchemaPropertyImplAccess::get(*iter).encodeV1(buffer, true, true); +} + + +SchemaMethodImpl& SchemaMethodImplAccess::get(SchemaMethod& item) +{ + return *item.impl; +} + + +const SchemaMethodImpl& SchemaMethodImplAccess::get(const SchemaMethod& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/SchemaMethodImpl.h b/cpp/src/qmf/SchemaMethodImpl.h new file mode 100644 index 0000000000..4b0ff9134d --- /dev/null +++ b/cpp/src/qmf/SchemaMethodImpl.h @@ -0,0 +1,74 @@ +#ifndef _QMF_SCHEMA_METHOD_IMPL_H_ +#define _QMF_SCHEMA_METHOD_IMPL_H_ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/SchemaTypes.h" +#include "qmf/SchemaMethod.h" +#include "qmf/SchemaPropertyImpl.h" +#include "qpid/management/Buffer.h" +#include <list> +#include <string> + +namespace qpid { +namespace management { + class Buffer; +}} + +namespace qmf { + class Hash; + class SchemaMethodImpl : public virtual qpid::RefCounted { + public: + // + // Public impl-only methods + // + SchemaMethodImpl(const qpid::types::Variant::Map& m); + SchemaMethodImpl(qpid::management::Buffer& v1Buffer); + void updateHash(Hash&) const; + void encodeV1(qpid::management::Buffer&) const; + + // + // Methods from API handle + // + SchemaMethodImpl(const std::string& n, const std::string& options); + + void setDesc(const std::string& d) { desc = d; } + void addArgument(const SchemaProperty& p) { arguments.push_back(p); } + const std::string& getName() const { return name; } + const std::string& getDesc() const { return desc; } + uint32_t getArgumentCount() const { return arguments.size(); } + SchemaProperty getArgument(uint32_t i) const; + + private: + std::string name; + std::string desc; + std::list<SchemaProperty> arguments; + }; + + struct SchemaMethodImplAccess + { + static SchemaMethodImpl& get(SchemaMethod&); + static const SchemaMethodImpl& get(const SchemaMethod&); + }; +} + +#endif diff --git a/cpp/src/qmf/SchemaProperty.cpp b/cpp/src/qmf/SchemaProperty.cpp new file mode 100644 index 0000000000..244115b8a9 --- /dev/null +++ b/cpp/src/qmf/SchemaProperty.cpp @@ -0,0 +1,327 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/SchemaPropertyImpl.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/SchemaTypes.h" +#include "qmf/SchemaProperty.h" +#include "qmf/Hash.h" +#include "qpid/messaging/AddressParser.h" +#include <list> +#include <iostream> + +using namespace std; +using qpid::types::Variant; +using namespace qmf; + +typedef PrivateImplRef<SchemaProperty> PI; + +SchemaProperty::SchemaProperty(SchemaPropertyImpl* impl) { PI::ctor(*this, impl); } +SchemaProperty::SchemaProperty(const SchemaProperty& s) : qmf::Handle<SchemaPropertyImpl>() { PI::copy(*this, s); } +SchemaProperty::~SchemaProperty() { PI::dtor(*this); } +SchemaProperty& SchemaProperty::operator=(const SchemaProperty& s) { return PI::assign(*this, s); } + +SchemaProperty::SchemaProperty(const string& n, int t, const string& o) { PI::ctor(*this, new SchemaPropertyImpl(n, t, o)); } + +void SchemaProperty::setAccess(int a) { impl->setAccess(a); } +void SchemaProperty::setIndex(bool i) { impl->setIndex(i); } +void SchemaProperty::setOptional(bool o) { impl->setOptional(o); } +void SchemaProperty::setUnit(const string& u) { impl->setUnit(u); } +void SchemaProperty::setDesc(const string& d) { impl->setDesc(d); } +void SchemaProperty::setSubtype(const string& s) { impl->setSubtype(s); } +void SchemaProperty::setDirection(int d) { impl->setDirection(d); } + +const string& SchemaProperty::getName() const { return impl->getName(); } +int SchemaProperty::getAccess() const { return impl->getAccess(); } +bool SchemaProperty::isIndex() const { return impl->isIndex(); } +bool SchemaProperty::isOptional() const { return impl->isOptional(); } +const string& SchemaProperty::getUnit() const { return impl->getUnit(); } +const string& SchemaProperty::getDesc() const { return impl->getDesc(); } +const string& SchemaProperty::getSubtype() const { return impl->getSubtype(); } +int SchemaProperty::getDirection() const { return impl->getDirection(); } + +//======================================================================================== +// Impl Method Bodies +//======================================================================================== + +SchemaPropertyImpl::SchemaPropertyImpl(const string& n, int t, const string options) : + name(n), dataType(t), access(ACCESS_READ_ONLY), index(false), + optional(false), direction(DIR_IN) +{ + if (!options.empty()) { + qpid::messaging::AddressParser parser = qpid::messaging::AddressParser(options); + Variant::Map optMap; + Variant::Map::iterator iter; + + parser.parseMap(optMap); + + iter = optMap.find("access"); + if (iter != optMap.end()) { + const string& v(iter->second.asString()); + if (v == "RC") access = ACCESS_READ_CREATE; + else if (v == "RO") access = ACCESS_READ_ONLY; + else if (v == "RW") access = ACCESS_READ_WRITE; + else + throw QmfException("Invalid value for 'access' option. Expected RC, RO, or RW"); + optMap.erase(iter); + } + + iter = optMap.find("index"); + if (iter != optMap.end()) { + index = iter->second.asBool(); + optMap.erase(iter); + } + + iter = optMap.find("optional"); + if (iter != optMap.end()) { + optional = iter->second.asBool(); + optMap.erase(iter); + } + + iter = optMap.find("unit"); + if (iter != optMap.end()) { + unit = iter->second.asString(); + optMap.erase(iter); + } + + iter = optMap.find("desc"); + if (iter != optMap.end()) { + desc = iter->second.asString(); + optMap.erase(iter); + } + + iter = optMap.find("subtype"); + if (iter != optMap.end()) { + subtype = iter->second.asString(); + optMap.erase(iter); + } + + iter = optMap.find("dir"); + if (iter != optMap.end()) { + const string& v(iter->second.asString()); + if (v == "IN") direction = DIR_IN; + else if (v == "OUT") direction = DIR_OUT; + else if (v == "INOUT") direction = DIR_IN_OUT; + else + throw QmfException("Invalid value for 'dir' option. Expected IN, OUT, or INOUT"); + optMap.erase(iter); + } + + if (!optMap.empty()) + throw QmfException("Unexpected option: " + optMap.begin()->first); + } +} + + +SchemaPropertyImpl::SchemaPropertyImpl(const Variant::Map&) : + access(ACCESS_READ_ONLY), index(false), optional(false), direction(DIR_IN) +{ +} + + +SchemaPropertyImpl::SchemaPropertyImpl(qpid::management::Buffer& buffer) : + access(ACCESS_READ_ONLY), index(false), optional(false), direction(DIR_IN) +{ + Variant::Map::const_iterator iter; + Variant::Map pmap; + + buffer.getMap(pmap); + iter = pmap.find("name"); + if (iter == pmap.end()) + throw QmfException("Received V1 Schema property without a name"); + name = iter->second.asString(); + + iter = pmap.find("type"); + if (iter == pmap.end()) + throw QmfException("Received V1 Schema property without a type"); + fromV1TypeCode(iter->second.asInt8()); + + iter = pmap.find("unit"); + if (iter != pmap.end()) + unit = iter->second.asString(); + + iter = pmap.find("desc"); + if (iter != pmap.end()) + desc = iter->second.asString(); + + iter = pmap.find("access"); + if (iter != pmap.end()) { + int8_t val = iter->second.asInt8(); + if (val < 1 || val > 3) + throw QmfException("Received V1 Schema property with invalid 'access' code"); + access = val; + } + + iter = pmap.find("index"); + if (iter != pmap.end()) + index = iter->second.asInt64() != 0; + + iter = pmap.find("optional"); + if (iter != pmap.end()) + optional = iter->second.asInt64() != 0; + + iter = pmap.find("dir"); + if (iter != pmap.end()) { + string dirStr(iter->second.asString()); + if (dirStr == "I") direction = DIR_IN; + else if (dirStr == "O") direction = DIR_OUT; + else if (dirStr == "IO") direction = DIR_IN_OUT; + else + throw QmfException("Received V1 Schema property with invalid 'dir' code"); + } +} + + +void SchemaPropertyImpl::updateHash(Hash& hash) const +{ + hash.update(name); + hash.update((uint8_t) dataType); + hash.update(subtype); + hash.update((uint8_t) access); + hash.update(index); + hash.update(optional); + hash.update(unit); + hash.update(desc); + hash.update((uint8_t) direction); +} + + +void SchemaPropertyImpl::encodeV1(qpid::management::Buffer& buffer, bool isArg, bool isMethodArg) const +{ + Variant::Map pmap; + + pmap["name"] = name; + pmap["type"] = v1TypeCode(); + if (!unit.empty()) + pmap["unit"] = unit; + if (!desc.empty()) + pmap["desc"] = desc; + if (!isArg) { + pmap["access"] = access; + pmap["index"] = index ? 1 : 0; + pmap["optional"] = optional ? 1 : 0; + } else { + if (isMethodArg) { + string dirStr; + switch (direction) { + case DIR_IN : dirStr = "I"; break; + case DIR_OUT : dirStr = "O"; break; + case DIR_IN_OUT : dirStr = "IO"; break; + } + pmap["dir"] = dirStr; + } + } + + buffer.putMap(pmap); +} + + +uint8_t SchemaPropertyImpl::v1TypeCode() const +{ + switch (dataType) { + case SCHEMA_DATA_VOID: return 1; + case SCHEMA_DATA_BOOL: return 11; + case SCHEMA_DATA_INT: + if (subtype == "timestamp") return 8; + if (subtype == "duration") return 9; + return 19; + case SCHEMA_DATA_FLOAT: return 13; + case SCHEMA_DATA_STRING: return 7; + case SCHEMA_DATA_LIST: return 21; + case SCHEMA_DATA_UUID: return 14; + case SCHEMA_DATA_MAP: + if (subtype == "reference") return 10; + if (subtype == "data") return 20; + return 15; + } + + return 1; +} + +void SchemaPropertyImpl::fromV1TypeCode(int8_t code) +{ + switch (code) { + case 1: // U8 + case 2: // U16 + case 3: // U32 + case 4: // U64 + dataType = SCHEMA_DATA_INT; + break; + case 6: // SSTR + case 7: // LSTR + dataType = SCHEMA_DATA_STRING; + break; + case 8: // ABSTIME + dataType = SCHEMA_DATA_INT; + subtype = "timestamp"; + break; + case 9: // DELTATIME + dataType = SCHEMA_DATA_INT; + subtype = "duration"; + break; + case 10: // REF + dataType = SCHEMA_DATA_MAP; + subtype = "reference"; + break; + case 11: // BOOL + dataType = SCHEMA_DATA_BOOL; + break; + case 12: // FLOAT + case 13: // DOUBLE + dataType = SCHEMA_DATA_FLOAT; + break; + case 14: // UUID + dataType = SCHEMA_DATA_UUID; + break; + case 15: // FTABLE + dataType = SCHEMA_DATA_MAP; + break; + case 16: // S8 + case 17: // S16 + case 18: // S32 + case 19: // S64 + dataType = SCHEMA_DATA_INT; + break; + case 20: // OBJECT + dataType = SCHEMA_DATA_MAP; + subtype = "data"; + break; + case 21: // LIST + case 22: // ARRAY + dataType = SCHEMA_DATA_LIST; + break; + default: + throw QmfException("Received V1 schema with an unknown data type"); + } +} + + +SchemaPropertyImpl& SchemaPropertyImplAccess::get(SchemaProperty& item) +{ + return *item.impl; +} + + +const SchemaPropertyImpl& SchemaPropertyImplAccess::get(const SchemaProperty& item) +{ + return *item.impl; +} diff --git a/cpp/src/qmf/SchemaPropertyImpl.h b/cpp/src/qmf/SchemaPropertyImpl.h new file mode 100644 index 0000000000..94994c722d --- /dev/null +++ b/cpp/src/qmf/SchemaPropertyImpl.h @@ -0,0 +1,90 @@ +#ifndef _QMF_SCHEMA_PROPERTY_IMPL_H_ +#define _QMF_SCHEMA_PROPERTY_IMPL_H_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/SchemaTypes.h" +#include "qmf/SchemaProperty.h" +#include "qpid/types/Variant.h" +#include "qpid/management/Buffer.h" + +namespace qpid { +namespace management { + class Buffer; +}} + +namespace qmf { + class Hash; + class SchemaPropertyImpl : public virtual qpid::RefCounted { + public: + // + // Public impl-only methods + // + SchemaPropertyImpl(const qpid::types::Variant::Map& m); + SchemaPropertyImpl(qpid::management::Buffer& v1Buffer); + void updateHash(Hash&) const; + void encodeV1(qpid::management::Buffer&, bool isArg, bool isMethodArg) const; + + // + // Methods from API handle + // + SchemaPropertyImpl(const std::string& n, int t, const std::string o); + void setAccess(int a) { access = a; } + void setIndex(bool i) { index = i; } + void setOptional(bool o) { optional = o; } + void setUnit(const std::string& u) { unit = u; } + void setDesc(const std::string& d) { desc = d; } + void setSubtype(const std::string& s) { subtype = s; } + void setDirection(int d) { direction = d; } + + const std::string& getName() const { return name; } + int getAccess() const { return access; } + bool isIndex() const { return index; } + bool isOptional() const { return optional; } + const std::string& getUnit() const { return unit; } + const std::string& getDesc() const { return desc; } + const std::string& getSubtype() const { return subtype; } + int getDirection() const { return direction; } + private: + std::string name; + int dataType; + std::string subtype; + int access; + bool index; + bool optional; + std::string unit; + std::string desc; + int direction; + + uint8_t v1TypeCode() const; + void fromV1TypeCode(int8_t); + }; + + struct SchemaPropertyImplAccess + { + static SchemaPropertyImpl& get(SchemaProperty&); + static const SchemaPropertyImpl& get(const SchemaProperty&); + }; +} + +#endif diff --git a/cpp/src/qmf/exceptions.cpp b/cpp/src/qmf/exceptions.cpp new file mode 100644 index 0000000000..be212f62f7 --- /dev/null +++ b/cpp/src/qmf/exceptions.cpp @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qmf/exceptions.h" + +namespace qmf { + + QmfException::QmfException(const std::string& msg) : qpid::types::Exception(msg) {} + QmfException::~QmfException() throw() {} + + KeyNotFound::KeyNotFound(const std::string& msg) : QmfException("Key Not Found: " + msg) {} + KeyNotFound::~KeyNotFound() throw() {} + + IndexOutOfRange::IndexOutOfRange() : QmfException("Index out-of-range") {} + IndexOutOfRange::~IndexOutOfRange() throw() {} + + OperationTimedOut::OperationTimedOut() : QmfException("Timeout Expired") {} + OperationTimedOut::~OperationTimedOut() throw() {} +} + |