From c19ba3eb15620becdbc157005dcc5a465b3ac064 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 4 Mar 2008 20:03:09 +0000 Subject: import of in-process 0-10 final python client git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@633610 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/tests/framer.py | 92 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 qpid/python/tests/framer.py (limited to 'qpid/python/tests/framer.py') diff --git a/qpid/python/tests/framer.py b/qpid/python/tests/framer.py new file mode 100644 index 0000000000..ea2e04e954 --- /dev/null +++ b/qpid/python/tests/framer.py @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from threading import * +from unittest import TestCase +from qpid.util import connect, listen +from qpid.framer import * + +PORT = 1234 + +class FramerTest(TestCase): + + def setUp(self): + self.running = True + started = Event() + def run(): + for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): + conn = Framer(s) + try: + conn.write_header(*conn.read_header()[-2:]) + while True: + frame = conn.read_frame() + conn.write_frame(frame) + except Closed: + pass + + self.server = Thread(target=run) + self.server.setDaemon(True) + self.server.start() + + started.wait(3) + + def tearDown(self): + self.running = False + self.server.join(3) + + def test(self): + c = Framer(connect("0.0.0.0", PORT)) + + c.write_header(0, 10) + assert c.read_header() == ("AMQP", 1, 1, 0, 10) + + c.write_frame(Frame(FIRST_FRM, 1, 2, 3, "THIS")) + c.write_frame(Frame(0, 1, 2, 3, "IS")) + c.write_frame(Frame(0, 1, 2, 3, "A")) + c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST")) + + f = c.read_frame() + assert f.flags & FIRST_FRM + assert not (f.flags & LAST_FRM) + assert f.type == 1 + assert f.track == 2 + assert f.channel == 3 + assert f.payload == "THIS" + + f = c.read_frame() + assert f.flags == 0 + assert f.type == 1 + assert f.track == 2 + assert f.channel == 3 + assert f.payload == "IS" + + f = c.read_frame() + assert f.flags == 0 + assert f.type == 1 + assert f.track == 2 + assert f.channel == 3 + assert f.payload == "A" + + f = c.read_frame() + assert f.flags & LAST_FRM + assert not (f.flags & FIRST_FRM) + assert f.type == 1 + assert f.track == 2 + assert f.channel == 3 + assert f.payload == "TEST" -- cgit v1.2.1