summaryrefslogtreecommitdiff
path: root/cpp/src/tests/verify_cluster_objects
blob: be6d67d81a78141ef9d80708afbb7f23b47e3b64 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import os
import getopt
import sys
import locale
import socket
import re
from qmf.console import Session, SchemaClass

_host = "localhost"
_connTimeout = 10
_verbose = 0
_del_test = False;
pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
_debug_recursion = 0

def Usage ():
    print "Usage:  verify_cluster_objects [OPTIONS] [broker-addr]"
    print
    print "             broker-addr is in the form:   [username/password@] hostname | ip-address [:<port>]"
    print "             ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
    print
    print "    This program contacts every node of a cluster, loads all manageable objects from"
    print "    those nodes and verifies that the management data is identical across the clusters."
    print
    print "Options:"
    print "    --timeout seconds (10)  Maximum time to wait for broker connection"
    print "    --verbose level (0)     Show details of objects and their IDs"
    print "    --delete                Delete some objects after creation, to test synchup"
    print
    sys.exit (1)

class IpAddr:
    def __init__(self, text):
        if text.find("@") != -1:
            tokens = text.split("@")
            text = tokens[1]
        if text.find(":") != -1:
            tokens = text.split(":")
            text = tokens[0]
            self.port = int(tokens[1])
        else:
            self.port = 5672
        self.dottedQuad = socket.gethostbyname(text)
        nums = self.dottedQuad.split(".")
        self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])

    def bestAddr(self, addrPortList):
        bestDiff = 0xFFFFFFFFL
        bestAddr = None
        for addrPort in addrPortList:
            diff = IpAddr(addrPort[0]).addr ^ self.addr
            if diff < bestDiff:
                bestDiff = diff
                bestAddr = addrPort
        return bestAddr

class ObjectId:
    """Object identity, use for dictionaries by object id"""
    def __init__(self, object): self.object = object
    def __eq__(self, other): return self.object is other.object
    def __hash__(self): return hash(id(self.object))

class Broker(object):
    def __init__(self, qmf, broker):
        self.broker = broker
        self.qmf = qmf

        agents = qmf.getAgents()
        for a in agents:
            if a.getAgentBank() == '0':
                self.brokerAgent = a

        bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker",
                              _agent=self.brokerAgent)[0]
        self.currentTime = bobj.getTimestamps()[0]
        try:
            self.uptime = bobj.uptime
        except:
            self.uptime = 0
        self.tablesByName = {}
        self.package = "org.apache.qpid.broker"
        self.id_cache = {}              # Cache for getAbstractId

    def getUrl(self):
        return self.broker.getUrl()

    def getData(self):
        if _verbose > 1:
            print "Broker:", self.broker

        classList = self.qmf.getClasses(self.package)
        for cls in classList:
            if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE:
                self.loadTable(cls)


    #
    # this should be a method on an object, but is kept here for now, until
    # we finish sorting out the treatment of names in qmfv2
    #
    def getAbstractId(self, object):
      """ return a string the of the hierarchical name """
      if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)]
      global _debug_recursion
      result = u""
      valstr = u""
      _debug_recursion += 1
      debug_prefix = _debug_recursion
      if (_verbose > 9):
          print debug_prefix, "  enter gai: props ", object._properties
      for property, value in object._properties:

          # we want to recurse on things which are refs.  we tell by
          # asking each property if it's an index.  I think...
          if (_verbose > 9):
              print debug_prefix, "  prop ", property, " val " , value, " idx ", 
              property.index, " type ", property.type

          # property is an instance, you can ask its type, name, etc.

          # special case system refs, as they will never be the same on
          # distinct cluster nodes.  later we probably want a different
          # way of representing these objects, like for instance don't
          # include the system ref in the hierarchy.

          if property.name == "systemRef":
              _debug_recursion -= 1
              self.id_cache[ObjectId(object)] = ""
              return ""

          if property.index:
              if result != u"":
                  result += u":"
              if property.type == 10:
                  try:
                      recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker)
                      if (_verbose > 9):
                          print debug_prefix, "   r ", recursive_objects[0]
                          for rp, rv in recursive_objects[0]._properties:
                              print debug_prefix, "   rrr ", rp, " idx-p ", rp.index, " v ", rv
                          print debug_prefix, "    recursing on ", recursive_objects[0]
                      valstr = self.getAbstractId(recursive_objects[0])
                      if (_verbose > 9):
                          print debug_prefix,  "    recursing on ", recursive_objects[0],
                          " -> ", valstr
                  except Exception, e:
                      if (_verbose > 9):
                          print debug_prefix, "          except ", e
                      valstr = u"<undecodable>"
              else:
                  # this yields UUID-blah.  not good.  try something else
                  # valstr = value.__repr__()
                  # print debug_prefix, " val ", value
          
                  # yetch.  this needs to be abstracted someplace?  I don't
                  # think we have the infrastructure we need to make these id
                  # strings be sensible in the general case
                  if property.name == "systemId":
                      # special case.  try to do something sensible about systemref objects
                      valstr = object.nodeName
                  else:
                      valstr = value.__repr__() # I think...
          result += valstr
          if (_verbose > 9):
              print debug_prefix, "    id ", self, " -> ", result
      _debug_recursion -= 1
      self.id_cache[ObjectId(object)] = result
      return result

    def loadTable(self, cls):
        if _verbose > 1:
            print "  Class:", cls.getClassName()
        list = self.qmf.getObjects(_class=cls.getClassName(),
                                   _package=cls.getPackageName(),
                                   _agent=self.brokerAgent)

        # tables-by-name maps class name to a table by object-name of
        # objects.  ie use the class name ("broker", "queue", etc) to
        # index tables-by-name, returning a second table, use the
        # object name to index that to get an object.

        self.tablesByName[cls.getClassName()] = {}
        for obj in list:
            # make sure we aren't colliding on name.  it's an internal
            # error (ie, the name-generation code is busted) if we do
            key = self.getAbstractId(obj)
            if key in self.tablesByName[cls.getClassName()]:
                raise Exception("internal error: collision for %s on key %s\n"
                                % (obj, key))
                
            self.tablesByName[cls.getClassName()][key] = obj
            if _verbose > 1:
                print "   ", obj.getObjectId(), " ", obj.getIndex(), " ", key


class BrokerManager:
    def __init__(self):
        self.brokerName = None
        self.qmf        = None
        self.broker     = None
        self.brokers    = []
        self.cluster    = None

    def SetBroker(self, brokerUrl):
        self.url = brokerUrl
        self.qmf = Session()
        self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
        agents = self.qmf.getAgents()
        for a in agents:
            if a.getAgentBank() == '0':
                self.brokerAgent = a

    def Disconnect(self):
        if self.broker:
            self.qmf.delBroker(self.broker)

    def _getCluster(self):
        packages = self.qmf.getPackages()
        if "org.apache.qpid.cluster" not in packages:
            return None

        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
        if len(clusters) == 0:
            print "Clustering is installed but not enabled on the broker."
            return None

        self.cluster = clusters[0]

    def _getHostList(self, urlList):
        hosts = []
        hostAddr = IpAddr(_host)
        for url in urlList:
            if url.find("amqp:") != 0:
                raise Exception("Invalid URL 1")
            url = url[5:]
            addrs = str(url).split(",")
            addrList = []
            for addr in addrs:
                tokens = addr.split(":")
                if len(tokens) != 3:
                    raise Exception("Invalid URL 2")
                addrList.append((tokens[1], tokens[2]))

            # Find the address in the list that is most likely to be
            # in the same subnet as the address with which we made the
            # original QMF connection.  This increases the probability
            # that we will be able to reach the cluster member.

            best = hostAddr.bestAddr(addrList)
            bestUrl = best[0] + ":" + best[1]
            hosts.append(bestUrl)
        return hosts


    # the main fun which tests for broker state "identity".  now that
    # we're using qmf2 style object names across the board, that test
    # means that we are ensuring that for all objects of a given
    # class, an object of that class with the same object name exists
    # on the peer broker.

    def verify(self):
        if _verbose > 0:
            print "Connecting to the cluster..."
        self._getCluster()
        if self.cluster:
            memberList = self.cluster.members.split(";")
            hostList = self._getHostList(memberList)
            self.qmf.delBroker(self.broker)
            self.broker = None
            for host in hostList:
                b = self.qmf.addBroker(host, _connTimeout)
                self.brokers.append(Broker(self.qmf, b))
                if _verbose > 0:
                    print "   ", b
        else:
            raise Exception("Failed - Not a cluster")

        failures = []

        # Wait until connections to all nodes are established before
        # loading the management data.  This will ensure that the
        # objects are all stable and the same.
        if _verbose > 0:
            print "Loading management data from nodes..."
        for broker in self.brokers:
            broker.getData()

        # If we're testing delete-some-objects functionality, create a
        # few widgets here and then delete them.
        if _del_test:
            if _verbose > 0:
                print "Running delete test"
            # just stick 'em in the first broker
            b = self.brokers[0]
            session = b.qmf.brokers[0].getAmqpSession()
            session.queue_declare(queue="foo", exclusive=True, auto_delete=True)
            session.exchange_bind(exchange="amq.direct",
                                                 queue="foo", binding_key="foo")
            session.queue_declare(queue="bar", exclusive=True, auto_delete=True)
            session.exchange_bind(exchange="amq.direct",
                                                 queue="bar", binding_key="bar")
            # now delete 'em
            session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo")
            session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar")
            session.queue_delete("bar")
            session.queue_delete("foo")

        # Verify that each node has the same set of objects (based on
        # object name).
        if _verbose > 0:
            print "Verifying objects based on object name..."
        base = self.brokers[0]
        for broker in self.brokers[1:]:

            # walk over the class names, for each class (with some
            # exceptions) walk over the objects of that class, making
            # sure they match between broker A and broker B

            for className in base.tablesByName:
                if className in ["broker", "system", "connection"]:
                    continue

                tab1 = base.tablesByName[className]
                tab2 = broker.tablesByName[className]

                for key in tab1:
                    if key not in tab2:
                        failures.append("%s key %s not found on node %s" %
                                        (className, key, broker.getUrl()))
                for key in tab2:
                    if key not in tab1:
                        failures.append("%s key %s not found on node %s" %
                                        (className, key, base.getUrl()))

        if len(failures) > 0:
            print "Failures:"
            for failure in failures:
                print "  %s" % failure
            raise Exception("Failures")

        if _verbose > 0:
            print "Success"

##
## Main Program
##

try:
    longOpts = ("verbose=", "timeout=", "delete")
    (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts)
except:
    Usage()

try:
    encoding = locale.getpreferredencoding()
    cargs = [a.decode(encoding) for a in encArgs]
except:
    cargs = encArgs

for opt in optlist:
    if opt[0] == "--timeout":
        _connTimeout = int(opt[1])
        if _connTimeout == 0:
            _connTimeout = None
    elif opt[0] == "--verbose":
        _verbose = int(opt[1])
    elif opt[0] == "--delete":
        _del_test = True;
    else:
        Usage()

nargs = len(cargs)
bm    = BrokerManager()

if nargs == 1:
    _host = cargs[0]

try:
    bm.SetBroker(_host)
    bm.verify()
except KeyboardInterrupt:
    print
except Exception,e:
    print "Failed: %s - %s" % (e.__class__.__name__, e)
    sys.exit(1)

bm.Disconnect()