summaryrefslogtreecommitdiff
path: root/cpp/test/streaming.cc
blob: 2d03976a5a94a3fe9afb8fff5f8ca0e5fcf085b2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#include <msgpack.hpp>
#include <gtest/gtest.h>
#include <sstream>

TEST(streaming, basic)
{
	std::ostringstream stream;
	msgpack::packer<std::ostream> pk(&stream);

	pk.pack(1);
	pk.pack(2);
	pk.pack(3);

	std::istringstream input(stream.str());

	msgpack::unpacker pac;

	int count = 0;
	while(count < 3) {
		pac.reserve_buffer(32*1024);

		size_t len = input.readsome(pac.buffer(), pac.buffer_capacity());
		pac.buffer_consumed(len);

		while(pac.execute()) {
			std::auto_ptr<msgpack::zone> z(pac.release_zone());
			msgpack::object obj = pac.data();
			pac.reset();

			switch(count++) {
			case 0:
				EXPECT_EQ(1, obj.as<int>());
				break;
			case 1:
				EXPECT_EQ(2, obj.as<int>());
				break;
			case 2:
				EXPECT_EQ(3, obj.as<int>());
				return;
			}

		}
	}
}


class event_handler {
public:
	event_handler(std::istream& input) : input(input) { }
	~event_handler() { }

	void on_read()
	{
		while(true) {
			pac.reserve_buffer(32*1024);

			size_t len = input.readsome(pac.buffer(), pac.buffer_capacity());

			if(len == 0) {
				return;
			}

			pac.buffer_consumed(len);

			while(pac.execute()) {
				std::auto_ptr<msgpack::zone> z(pac.release_zone());
				msgpack::object obj = pac.data();
				pac.reset();
				on_message(obj, z);
			}

			if(pac.message_size() > 10*1024*1024) {
				throw std::runtime_error("message is too large");
			}
		}
	}

	void on_message(msgpack::object obj, std::auto_ptr<msgpack::zone> z)
	{
		EXPECT_EQ(expect, obj.as<int>());
	}

	int expect;

private:
	std::istream& input;
	msgpack::unpacker pac;
};

TEST(streaming, event)
{
	std::stringstream stream;
	msgpack::packer<std::ostream> pk(&stream);

	event_handler handler(stream);

	pk.pack(1);
	handler.expect = 1;
	handler.on_read();

	pk.pack(2);
	handler.expect = 2;
	handler.on_read();

	pk.pack(3);
	handler.expect = 3;
	handler.on_read();
}