diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-07-17 12:03:29 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-07-17 12:03:29 +0000 |
| commit | 9a95fdef8beee204e2a76b1895f748c1c29289de (patch) | |
| tree | e365c90119b5eb6458f56506f22280af7c0fb2c1 /python/qpid/tests/framing.py | |
| parent | 98c58cce294009c64ca1580d2969c582b19b215d (diff) | |
| download | qpid-python-9a95fdef8beee204e2a76b1895f748c1c29289de.tar.gz | |
added non blocking framing code
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@795059 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests/framing.py')
| -rw-r--r-- | python/qpid/tests/framing.py | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/python/qpid/tests/framing.py b/python/qpid/tests/framing.py new file mode 100644 index 0000000000..4cd596b583 --- /dev/null +++ b/python/qpid/tests/framing.py @@ -0,0 +1,173 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# setup, usage, teardown, errors(sync), errors(async), stress, soak, +# boundary-conditions, config + +from qpid.tests import Test +from qpid.framing import * + +class Base(Test): + + def cmp_frames(self, frm1, frm2): + assert frm1.flags == frm2.flags, "expected: %r, got %r" % (frm1, frm2) + assert frm1.type == frm2.type, "expected: %r, got %r" % (frm1, frm2) + assert frm1.track == frm2.track, "expected: %r, got %r" % (frm1, frm2) + assert frm1.channel == frm2.channel, "expected: %r, got %r" % (frm1, frm2) + assert frm1.payload == frm2.payload, "expected: %r, got %r" % (frm1, frm2) + + def cmp_segments(self, seg1, seg2): + assert seg1.first == seg2.first, "expected: %r, got %r" % (seg1, seg2) + assert seg1.last == seg2.last, "expected: %r, got %r" % (seg1, seg2) + assert seg1.type == seg2.type, "expected: %r, got %r" % (seg1, seg2) + assert seg1.track == seg2.track, "expected: %r, got %r" % (seg1, seg2) + assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2) + assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2) + +class FrameTest(Base): + + def enc_dec(self, frames, encoded=None): + enc = FrameEncoder() + dec = FrameDecoder() + + enc.write(*frames) + bytes = enc.read() + if encoded is not None: + assert bytes == encoded, "expected %r, got %r" % (encoded, bytes) + dec.write(bytes) + dframes = dec.read() + + assert len(frames) == len(dframes) + for f, df, in zip(frames, dframes): + self.cmp_frames(f, df) + + def testEmpty(self): + self.enc_dec([Frame(0, 0, 0, 0, "")], + "\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x00") + + def testSingle(self): + self.enc_dec([Frame(0, 0, 0, 1, "payload")], + "\x00\x00\x00\x13\x00\x00\x00\x01\x00\x00\x00\x00payload") + + def testMaxChannel(self): + self.enc_dec([Frame(0, 0, 0, 65535, "max-channel")], + "\x00\x00\x00\x17\x00\x00\xff\xff\x00\x00\x00\x00max-channel") + + def testMaxType(self): + self.enc_dec([Frame(0, 255, 0, 0, "max-type")], + "\x00\xff\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00max-type") + + def testMaxTrack(self): + self.enc_dec([Frame(0, 0, 15, 0, "max-track")], + "\x00\x00\x00\x15\x00\x0f\x00\x00\x00\x00\x00\x00max-track") + + def testSequence(self): + self.enc_dec([Frame(0, 0, 0, 0, "zero"), + Frame(0, 0, 0, 1, "one"), + Frame(0, 0, 1, 0, "two"), + Frame(0, 0, 1, 1, "three"), + Frame(0, 1, 0, 0, "four"), + Frame(0, 1, 0, 1, "five"), + Frame(0, 1, 1, 0, "six"), + Frame(0, 1, 1, 1, "seven"), + Frame(1, 0, 0, 0, "eight"), + Frame(1, 0, 0, 1, "nine"), + Frame(1, 0, 1, 0, "ten"), + Frame(1, 0, 1, 1, "eleven"), + Frame(1, 1, 0, 0, "twelve"), + Frame(1, 1, 0, 1, "thirteen"), + Frame(1, 1, 1, 0, "fourteen"), + Frame(1, 1, 1, 1, "fifteen")]) + +class SegmentTest(Base): + + def enc_dec(self, segments, frames=None, interleave=None, max_payload=Frame.MAX_PAYLOAD): + enc = SegmentEncoder(max_payload) + dec = SegmentDecoder() + + enc.write(*segments) + frms = enc.read() + if frames is not None: + assert len(frames) == len(frms), "expected %s, got %s" % (frames, frms) + for f1, f2 in zip(frames, frms): + self.cmp_frames(f1, f2) + if interleave is not None: + ilvd = [] + for f in frms: + ilvd.append(f) + if interleave: + ilvd.append(interleave.pop(0)) + ilvd.extend(interleave) + dec.write(*ilvd) + else: + dec.write(*frms) + segs = dec.read() + assert len(segments) == len(segs) + for s1, s2 in zip(segments, segs): + self.cmp_segments(s1, s2) + + def testEmpty(self): + self.enc_dec([Segment(True, True, 0, 0, 0, "")], + [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0, + "")]) + + def testSingle(self): + self.enc_dec([Segment(True, True, 0, 0, 0, "payload")], + [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0, + "payload")]) + + def testMaxChannel(self): + self.enc_dec([Segment(False, False, 0, 0, 65535, "max-channel")], + [Frame(FIRST_FRM | LAST_FRM, 0, 0, 65535, "max-channel")]) + + def testMaxType(self): + self.enc_dec([Segment(False, False, 255, 0, 0, "max-type")], + [Frame(FIRST_FRM | LAST_FRM, 255, 0, 0, "max-type")]) + + def testMaxTrack(self): + self.enc_dec([Segment(False, False, 0, 15, 0, "max-track")], + [Frame(FIRST_FRM | LAST_FRM, 0, 15, 0, "max-track")]) + + def testSequence(self): + self.enc_dec([Segment(True, False, 0, 0, 0, "one"), + Segment(False, False, 0, 0, 0, "two"), + Segment(False, True, 0, 0, 0, "three")], + [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG, 0, 0, 0, "one"), + Frame(FIRST_FRM | LAST_FRM, 0, 0, 0, "two"), + Frame(FIRST_FRM | LAST_FRM | LAST_SEG, 0, 0, 0, "three")]) + + def testInterleaveChannel(self): + frames = [Frame(0, 0, 0, 0, chr(ord("a") + i)) for i in range(7)] + frames[0].flags |= FIRST_FRM + frames[-1].flags |= LAST_FRM + + ilvd = [Frame(0, 0, 0, 1, chr(ord("a") + i)) for i in range(7)] + + self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefg")], frames, ilvd, max_payload=1) + + def testInterleaveTrack(self): + frames = [Frame(0, 0, 0, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1)) + for i in range(0, 8, 2)] + frames[0].flags |= FIRST_FRM + frames[-1].flags |= LAST_FRM + + ilvd = [Frame(0, 0, 1, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1)) + for i in range(0, 8, 2)] + + self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2) |
