diff options
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/Stress-test.mk | 71 | ||||
-rw-r--r-- | test/cpp/Thrift-test.mk | 78 | ||||
-rw-r--r-- | test/cpp/realloc/Makefile | 40 | ||||
-rw-r--r-- | test/cpp/realloc/realloc_test.c | 107 | ||||
-rw-r--r-- | test/cpp/src/TestClient.cpp | 497 | ||||
-rw-r--r-- | test/cpp/src/TestServer.cpp | 424 | ||||
-rw-r--r-- | test/cpp/src/main.cpp | 509 | ||||
-rw-r--r-- | test/cpp/src/nb-main.cpp | 502 |
8 files changed, 2228 insertions, 0 deletions
diff --git a/test/cpp/Stress-test.mk b/test/cpp/Stress-test.mk new file mode 100644 index 000000000..1a753b3a1 --- /dev/null +++ b/test/cpp/Stress-test.mk @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +ifndef thrift_home +thrift_home=../.. +endif #thrift_home + +target: all + +ifndef boost_home +#boost_home=../../../../../thirdparty/boost_1_33_1 +boost_home=/usr/local/include/boost-1_33_1 +endif #boost_home +target: all + +include_paths = $(thrift_home)/lib/cpp/src \ + $(thrift_home)/lib/cpp \ + $(thrift_home)/ \ + $(boost_home) + +include_flags = $(patsubst %,-I%, $(include_paths)) + +# Tools +ifndef THRIFT +THRIFT = ../../compiler/cpp/thrift +endif # THRIFT + +CC = g++ +LD = g++ + +# Compiler flags +DCFL = -Wall -O3 -g -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent +CFL = -Wall -O3 -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent + +all: stress-test stress-test-nb + +debug: stress-test-debug stress-test-debug-nb + +stubs: ../StressTest.thrift + $(THRIFT) --gen cpp --gen php ../StressTest.thrift + +stress-test-debug-nb: stubs + g++ -o stress-test-nb $(DCFL) src/nb-main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp + +stress-test-nb: stubs + g++ -o stress-test-nb $(CFL) src/nb-main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp + +stress-test-debug: stubs + g++ -o stress-test $(DCFL) src/main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp + +stress-test: stubs + g++ -o stress-test $(CFL) src/main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp + +clean: + rm -fr stress-test stress-test-nb gen-cpp diff --git a/test/cpp/Thrift-test.mk b/test/cpp/Thrift-test.mk new file mode 100644 index 000000000..fb3e38bc6 --- /dev/null +++ b/test/cpp/Thrift-test.mk @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Makefile for Thrift test project. +# Default target is everything + +ifndef thrift_home +thrift_home=../.. +endif #thrift_home + +target: all + +ifndef boost_home +#boost_home=../../../../../thirdparty/boost_1_33_1 +boost_home=/usr/local/include/boost-1_33_1 +endif #boost_home +target: all + +include_paths = $(thrift_home)/lib/cpp/src \ + $(boost_home) + +include_flags = $(patsubst %,-I%, $(include_paths)) + +# Tools +ifndef THRIFT +THRIFT = ../../compiler/cpp/thrift +endif # THRIFT + +CC = g++ +LD = g++ + +# Compiler flags +DCFL = -Wall -O3 -g -I. -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent +LFL = -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent +CCFL = -Wall -O3 -I. -I./gen-cpp $(include_flags) +CFL = $(CCFL) $(LFL) + +all: server client + +debug: server-debug client-debug + +stubs: ../ThriftTest.thrift + $(THRIFT) --gen cpp ../ThriftTest.thrift + +server-debug: stubs + g++ -o TestServer $(DCFL) src/TestServer.cpp ./gen-cpp/ThriftTest.cpp ./gen-cpp/ThriftTest_types.cpp ../ThriftTest_extras.cpp + +client-debug: stubs + g++ -o TestClient $(DCFL) src/TestClient.cpp ./gen-cpp/ThriftTest.cpp ./gen-cpp/ThriftTest_types.cpp ../ThriftTest_extras.cpp + +server: stubs + g++ -o TestServer $(CFL) src/TestServer.cpp ./gen-cpp/ThriftTest.cpp ./gen-cpp/ThriftTest_types.cpp ../ThriftTest_extras.cpp + +client: stubs + g++ -o TestClient $(CFL) src/TestClient.cpp ./gen-cpp/ThriftTest.cpp ./gen-cpp/ThriftTest_types.cpp ../ThriftTest_extras.cpp + +small: + $(THRIFT) --gen cpp ../SmallTest.thrift + g++ -c $(CCFL) ./gen-cpp/SmallService.cpp ./gen-cpp/SmallTest_types.cpp + +clean: + rm -fr *.o TestServer TestClient gen-cpp diff --git a/test/cpp/realloc/Makefile b/test/cpp/realloc/Makefile new file mode 100644 index 000000000..f89bbb3c5 --- /dev/null +++ b/test/cpp/realloc/Makefile @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This probably should not go into "make check", because it is an experiment, +# not a test. Specifically, it is meant to determine how likely realloc is +# to avoid a copy. This is poorly documented. + +run: realloc_test + for it in 1 4 64 ; do \ + for nb in 1 8 64 512 ; do \ + for mins in 64 512 ; do \ + for maxs in 2048 262144 ; do \ + for db in 8 64 ; do \ + ./realloc_test $$nb $$mins $$maxs $$db $$it \ + ; done \ + ; done \ + ; done \ + ; done \ + ; done \ + > raw_stats + +CFLAGS = -Wall -g -std=c99 +LDLIBS = -ldl +realloc_test: realloc_test.c diff --git a/test/cpp/realloc/realloc_test.c b/test/cpp/realloc/realloc_test.c new file mode 100644 index 000000000..f9763adf4 --- /dev/null +++ b/test/cpp/realloc/realloc_test.c @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#define _GNU_SOURCE +#include <stdlib.h> +#include <stdio.h> +#include <time.h> +#include <dlfcn.h> + +int copies; +int non_copies; + +void *realloc(void *ptr, size_t size) { + static void *(*real_realloc)(void*, size_t) = NULL; + if (real_realloc == NULL) { + real_realloc = (void* (*) (void*, size_t)) dlsym(RTLD_NEXT, "realloc"); + } + + void *ret_ptr = (*real_realloc)(ptr, size); + + if (ret_ptr == ptr) { + non_copies++; + } else { + copies++; + } + + return ret_ptr; +} + + +struct TMemoryBuffer { + void* ptr; + int size; +}; + +int main(int argc, char *argv[]) { + int num_buffers; + int init_size; + int max_size; + int doublings; + int iterations; + + if (argc < 6 || + argc > 7 || + (num_buffers = atoi(argv[1])) == 0 || + (init_size = atoi(argv[2])) == 0 || + (max_size = atoi(argv[3])) == 0 || + init_size > max_size || + (iterations = atoi(argv[4])) == 0 || + (doublings = atoi(argv[5])) == 0 || + (argc == 7 && atoi(argv[6]) == 0)) { + fprintf(stderr, "usage: realloc_test <num_buffers> <init_size> <max_size> <doublings> <iterations> [seed]\n"); + exit(EXIT_FAILURE); + } + + for ( int i = 0 ; i < argc ; i++ ) { + printf("%s ", argv[i]); + } + printf("\n"); + + if (argc == 7) { + srand(atoi(argv[6])); + } else { + srand(time(NULL)); + } + + struct TMemoryBuffer* buffers = calloc(num_buffers, sizeof(*buffers)); + if (buffers == NULL) abort(); + + for ( int i = 0 ; i < num_buffers ; i++ ) { + buffers[i].size = max_size; + } + + while (iterations --> 0) { + for ( int i = 0 ; i < doublings * num_buffers ; i++ ) { + struct TMemoryBuffer* buf = &buffers[rand() % num_buffers]; + buf->size *= 2; + if (buf->size <= max_size) { + buf->ptr = realloc(buf->ptr, buf->size); + } else { + free(buf->ptr); + buf->size = init_size; + buf->ptr = malloc(buf->size); + } + if (buf->ptr == NULL) abort(); + } + } + + printf("Non-copied %d/%d (%.2f%%)\n", non_copies, copies + non_copies, 100.0 * non_copies / (copies + non_copies)); + return 0; +} diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp new file mode 100644 index 000000000..5ddfa0601 --- /dev/null +++ b/test/cpp/src/TestClient.cpp @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdio.h> +#include <unistd.h> +#include <sys/time.h> +#include <protocol/TBinaryProtocol.h> +#include <transport/TTransportUtils.h> +#include <transport/TSocket.h> + +#include <boost/shared_ptr.hpp> +#include "ThriftTest.h" + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +using namespace boost; +using namespace std; +using namespace apache::thrift; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using namespace thrift::test; + +//extern uint32_t g_socket_syscalls; + +// Current time, microseconds since the epoch +uint64_t now() +{ + long long ret; + struct timeval tv; + + gettimeofday(&tv, NULL); + ret = tv.tv_sec; + ret = ret*1000*1000 + tv.tv_usec; + return ret; +} + +int main(int argc, char** argv) { + string host = "localhost"; + int port = 9090; + int numTests = 1; + bool framed = false; + + for (int i = 0; i < argc; ++i) { + if (strcmp(argv[i], "-h") == 0) { + char* pch = strtok(argv[++i], ":"); + if (pch != NULL) { + host = string(pch); + } + pch = strtok(NULL, ":"); + if (pch != NULL) { + port = atoi(pch); + } + } else if (strcmp(argv[i], "-n") == 0) { + numTests = atoi(argv[++i]); + } else if (strcmp(argv[i], "-f") == 0) { + framed = true; + } + } + + + shared_ptr<TTransport> transport; + + shared_ptr<TSocket> socket(new TSocket(host, port)); + + if (framed) { + shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket)); + transport = framedSocket; + } else { + shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket)); + transport = bufferedSocket; + } + + shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport)); + ThriftTestClient testClient(protocol); + + uint64_t time_min = 0; + uint64_t time_max = 0; + uint64_t time_tot = 0; + + int test = 0; + for (test = 0; test < numTests; ++test) { + + try { + transport->open(); + } catch (TTransportException& ttx) { + printf("Connect failed: %s\n", ttx.what()); + continue; + } + + /** + * CONNECT TEST + */ + printf("Test #%d, connect %s:%d\n", test+1, host.c_str(), port); + + uint64_t start = now(); + + /** + * VOID TEST + */ + try { + printf("testVoid()"); + testClient.testVoid(); + printf(" = void\n"); + } catch (TApplicationException tax) { + printf("%s\n", tax.what()); + } + + /** + * STRING TEST + */ + printf("testString(\"Test\")"); + string s; + testClient.testString(s, "Test"); + printf(" = \"%s\"\n", s.c_str()); + + /** + * BYTE TEST + */ + printf("testByte(1)"); + uint8_t u8 = testClient.testByte(1); + printf(" = %d\n", (int)u8); + + /** + * I32 TEST + */ + printf("testI32(-1)"); + int32_t i32 = testClient.testI32(-1); + printf(" = %d\n", i32); + + /** + * I64 TEST + */ + printf("testI64(-34359738368)"); + int64_t i64 = testClient.testI64(-34359738368LL); + printf(" = %"PRId64"\n", i64); + + /** + * DOUBLE TEST + */ + printf("testDouble(-5.2098523)"); + double dub = testClient.testDouble(-5.2098523); + printf(" = %lf\n", dub); + + /** + * STRUCT TEST + */ + printf("testStruct({\"Zero\", 1, -3, -5})"); + Xtruct out; + out.string_thing = "Zero"; + out.byte_thing = 1; + out.i32_thing = -3; + out.i64_thing = -5; + Xtruct in; + testClient.testStruct(in, out); + printf(" = {\"%s\", %d, %d, %"PRId64"}\n", + in.string_thing.c_str(), + (int)in.byte_thing, + in.i32_thing, + in.i64_thing); + + /** + * NESTED STRUCT TEST + */ + printf("testNest({1, {\"Zero\", 1, -3, -5}), 5}"); + Xtruct2 out2; + out2.byte_thing = 1; + out2.struct_thing = out; + out2.i32_thing = 5; + Xtruct2 in2; + testClient.testNest(in2, out2); + in = in2.struct_thing; + printf(" = {%d, {\"%s\", %d, %d, %"PRId64"}, %d}\n", + in2.byte_thing, + in.string_thing.c_str(), + (int)in.byte_thing, + in.i32_thing, + in.i64_thing, + in2.i32_thing); + + /** + * MAP TEST + */ + map<int32_t,int32_t> mapout; + for (int32_t i = 0; i < 5; ++i) { + mapout.insert(make_pair(i, i-10)); + } + printf("testMap({"); + map<int32_t, int32_t>::const_iterator m_iter; + bool first = true; + for (m_iter = mapout.begin(); m_iter != mapout.end(); ++m_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d => %d", m_iter->first, m_iter->second); + } + printf("})"); + map<int32_t,int32_t> mapin; + testClient.testMap(mapin, mapout); + printf(" = {"); + first = true; + for (m_iter = mapin.begin(); m_iter != mapin.end(); ++m_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d => %d", m_iter->first, m_iter->second); + } + printf("}\n"); + + /** + * SET TEST + */ + set<int32_t> setout; + for (int32_t i = -2; i < 3; ++i) { + setout.insert(i); + } + printf("testSet({"); + set<int32_t>::const_iterator s_iter; + first = true; + for (s_iter = setout.begin(); s_iter != setout.end(); ++s_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d", *s_iter); + } + printf("})"); + set<int32_t> setin; + testClient.testSet(setin, setout); + printf(" = {"); + first = true; + for (s_iter = setin.begin(); s_iter != setin.end(); ++s_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d", *s_iter); + } + printf("}\n"); + + /** + * LIST TEST + */ + vector<int32_t> listout; + for (int32_t i = -2; i < 3; ++i) { + listout.push_back(i); + } + printf("testList({"); + vector<int32_t>::const_iterator l_iter; + first = true; + for (l_iter = listout.begin(); l_iter != listout.end(); ++l_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d", *l_iter); + } + printf("})"); + vector<int32_t> listin; + testClient.testList(listin, listout); + printf(" = {"); + first = true; + for (l_iter = listin.begin(); l_iter != listin.end(); ++l_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d", *l_iter); + } + printf("}\n"); + + /** + * ENUM TEST + */ + printf("testEnum(ONE)"); + Numberz ret = testClient.testEnum(ONE); + printf(" = %d\n", ret); + + printf("testEnum(TWO)"); + ret = testClient.testEnum(TWO); + printf(" = %d\n", ret); + + printf("testEnum(THREE)"); + ret = testClient.testEnum(THREE); + printf(" = %d\n", ret); + + printf("testEnum(FIVE)"); + ret = testClient.testEnum(FIVE); + printf(" = %d\n", ret); + + printf("testEnum(EIGHT)"); + ret = testClient.testEnum(EIGHT); + printf(" = %d\n", ret); + + /** + * TYPEDEF TEST + */ + printf("testTypedef(309858235082523)"); + UserId uid = testClient.testTypedef(309858235082523LL); + printf(" = %"PRId64"\n", uid); + + /** + * NESTED MAP TEST + */ + printf("testMapMap(1)"); + map<int32_t, map<int32_t, int32_t> > mm; + testClient.testMapMap(mm, 1); + printf(" = {"); + map<int32_t, map<int32_t, int32_t> >::const_iterator mi; + for (mi = mm.begin(); mi != mm.end(); ++mi) { + printf("%d => {", mi->first); + map<int32_t, int32_t>::const_iterator mi2; + for (mi2 = mi->second.begin(); mi2 != mi->second.end(); ++mi2) { + printf("%d => %d, ", mi2->first, mi2->second); + } + printf("}, "); + } + printf("}\n"); + + /** + * INSANITY TEST + */ + Insanity insane; + insane.userMap.insert(make_pair(FIVE, 5000)); + Xtruct truck; + truck.string_thing = "Truck"; + truck.byte_thing = 8; + truck.i32_thing = 8; + truck.i64_thing = 8; + insane.xtructs.push_back(truck); + printf("testInsanity()"); + map<UserId, map<Numberz,Insanity> > whoa; + testClient.testInsanity(whoa, insane); + printf(" = {"); + map<UserId, map<Numberz,Insanity> >::const_iterator i_iter; + for (i_iter = whoa.begin(); i_iter != whoa.end(); ++i_iter) { + printf("%"PRId64" => {", i_iter->first); + map<Numberz,Insanity>::const_iterator i2_iter; + for (i2_iter = i_iter->second.begin(); + i2_iter != i_iter->second.end(); + ++i2_iter) { + printf("%d => {", i2_iter->first); + map<Numberz, UserId> userMap = i2_iter->second.userMap; + map<Numberz, UserId>::const_iterator um; + printf("{"); + for (um = userMap.begin(); um != userMap.end(); ++um) { + printf("%d => %"PRId64", ", um->first, um->second); + } + printf("}, "); + + vector<Xtruct> xtructs = i2_iter->second.xtructs; + vector<Xtruct>::const_iterator x; + printf("{"); + for (x = xtructs.begin(); x != xtructs.end(); ++x) { + printf("{\"%s\", %d, %d, %"PRId64"}, ", + x->string_thing.c_str(), + (int)x->byte_thing, + x->i32_thing, + x->i64_thing); + } + printf("}"); + + printf("}, "); + } + printf("}, "); + } + printf("}\n"); + + /* test exception */ + + try { + printf("testClient.testException(\"Xception\") =>"); + testClient.testException("Xception"); + printf(" void\nFAILURE\n"); + + } catch(Xception& e) { + printf(" {%u, \"%s\"}\n", e.errorCode, e.message.c_str()); + } + + try { + printf("testClient.testException(\"success\") =>"); + testClient.testException("success"); + printf(" void\n"); + } catch(...) { + printf(" exception\nFAILURE\n"); + } + + /* test multi exception */ + + try { + printf("testClient.testMultiException(\"Xception\", \"test 1\") =>"); + Xtruct result; + testClient.testMultiException(result, "Xception", "test 1"); + printf(" result\nFAILURE\n"); + } catch(Xception& e) { + printf(" {%u, \"%s\"}\n", e.errorCode, e.message.c_str()); + } + + try { + printf("testClient.testMultiException(\"Xception2\", \"test 2\") =>"); + Xtruct result; + testClient.testMultiException(result, "Xception2", "test 2"); + printf(" result\nFAILURE\n"); + + } catch(Xception2& e) { + printf(" {%u, {\"%s\"}}\n", e.errorCode, e.struct_thing.string_thing.c_str()); + } + + try { + printf("testClient.testMultiException(\"success\", \"test 3\") =>"); + Xtruct result; + testClient.testMultiException(result, "success", "test 3"); + printf(" {{\"%s\"}}\n", result.string_thing.c_str()); + } catch(...) { + printf(" exception\nFAILURE\n"); + } + + /* test oneway void */ + { + printf("testClient.testOneway(3) =>"); + uint64_t startOneway = now(); + testClient.testOneway(3); + uint64_t elapsed = now() - startOneway; + if (elapsed > 200 * 1000) { // 0.2 seconds + printf(" FAILURE - took %.2f ms\n", (double)elapsed/1000.0); + } else { + printf(" success - took %.2f ms\n", (double)elapsed/1000.0); + } + } + + /** + * redo a simple test after the oneway to make sure we aren't "off by one" -- + * if the server treated oneway void like normal void, this next test will + * fail since it will get the void confirmation rather than the correct + * result. In this circumstance, the client will throw the exception: + * + * TApplicationException: Wrong method namea + */ + /** + * I32 TEST + */ + printf("re-test testI32(-1)"); + i32 = testClient.testI32(-1); + printf(" = %d\n", i32); + + + uint64_t stop = now(); + uint64_t tot = stop-start; + + printf("Total time: %"PRIu64" us\n", stop-start); + + time_tot += tot; + if (time_min == 0 || tot < time_min) { + time_min = tot; + } + if (tot > time_max) { + time_max = tot; + } + + transport->close(); + } + + // printf("\nSocket syscalls: %u", g_socket_syscalls); + printf("\nAll tests done.\n"); + + uint64_t time_avg = time_tot / numTests; + + printf("Min time: %"PRIu64" us\n", time_min); + printf("Max time: %"PRIu64" us\n", time_max); + printf("Avg time: %"PRIu64" us\n", time_avg); + + return 0; +} diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp new file mode 100644 index 000000000..83454dcaf --- /dev/null +++ b/test/cpp/src/TestServer.cpp @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <concurrency/ThreadManager.h> +#include <concurrency/PosixThreadFactory.h> +#include <protocol/TBinaryProtocol.h> +#include <server/TSimpleServer.h> +#include <server/TThreadedServer.h> +#include <server/TThreadPoolServer.h> +#include <server/TNonblockingServer.h> +#include <transport/TServerSocket.h> +#include <transport/TTransportUtils.h> +#include "ThriftTest.h" + +#include <iostream> +#include <stdexcept> +#include <sstream> + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +using namespace std; +using namespace boost; + +using namespace apache::thrift; +using namespace apache::thrift::concurrency; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using namespace apache::thrift::server; + +using namespace thrift::test; + +class TestHandler : public ThriftTestIf { + public: + TestHandler() {} + + void testVoid() { + printf("testVoid()\n"); + } + + void testString(string& out, const string &thing) { + printf("testString(\"%s\")\n", thing.c_str()); + out = thing; + } + + int8_t testByte(const int8_t thing) { + printf("testByte(%d)\n", (int)thing); + return thing; + } + + int32_t testI32(const int32_t thing) { + printf("testI32(%d)\n", thing); + return thing; + } + + int64_t testI64(const int64_t thing) { + printf("testI64(%"PRId64")\n", thing); + return thing; + } + + double testDouble(const double thing) { + printf("testDouble(%lf)\n", thing); + return thing; + } + + void testStruct(Xtruct& out, const Xtruct &thing) { + printf("testStruct({\"%s\", %d, %d, %"PRId64"})\n", thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing); + out = thing; + } + + void testNest(Xtruct2& out, const Xtruct2& nest) { + const Xtruct &thing = nest.struct_thing; + printf("testNest({%d, {\"%s\", %d, %d, %"PRId64"}, %d})\n", (int)nest.byte_thing, thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing, nest.i32_thing); + out = nest; + } + + void testMap(map<int32_t, int32_t> &out, const map<int32_t, int32_t> &thing) { + printf("testMap({"); + map<int32_t, int32_t>::const_iterator m_iter; + bool first = true; + for (m_iter = thing.begin(); m_iter != thing.end(); ++m_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d => %d", m_iter->first, m_iter->second); + } + printf("})\n"); + out = thing; + } + + void testSet(set<int32_t> &out, const set<int32_t> &thing) { + printf("testSet({"); + set<int32_t>::const_iterator s_iter; + bool first = true; + for (s_iter = thing.begin(); s_iter != thing.end(); ++s_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d", *s_iter); + } + printf("})\n"); + out = thing; + } + + void testList(vector<int32_t> &out, const vector<int32_t> &thing) { + printf("testList({"); + vector<int32_t>::const_iterator l_iter; + bool first = true; + for (l_iter = thing.begin(); l_iter != thing.end(); ++l_iter) { + if (first) { + first = false; + } else { + printf(", "); + } + printf("%d", *l_iter); + } + printf("})\n"); + out = thing; + } + + Numberz testEnum(const Numberz thing) { + printf("testEnum(%d)\n", thing); + return thing; + } + + UserId testTypedef(const UserId thing) { + printf("testTypedef(%"PRId64")\n", thing); + return thing; + } + + void testMapMap(map<int32_t, map<int32_t,int32_t> > &mapmap, const int32_t hello) { + printf("testMapMap(%d)\n", hello); + + map<int32_t,int32_t> pos; + map<int32_t,int32_t> neg; + for (int i = 1; i < 5; i++) { + pos.insert(make_pair(i,i)); + neg.insert(make_pair(-i,-i)); + } + + mapmap.insert(make_pair(4, pos)); + mapmap.insert(make_pair(-4, neg)); + + } + + void testInsanity(map<UserId, map<Numberz,Insanity> > &insane, const Insanity &argument) { + printf("testInsanity()\n"); + + Xtruct hello; + hello.string_thing = "Hello2"; + hello.byte_thing = 2; + hello.i32_thing = 2; + hello.i64_thing = 2; + + Xtruct goodbye; + goodbye.string_thing = "Goodbye4"; + goodbye.byte_thing = 4; + goodbye.i32_thing = 4; + goodbye.i64_thing = 4; + + Insanity crazy; + crazy.userMap.insert(make_pair(EIGHT, 8)); + crazy.xtructs.push_back(goodbye); + + Insanity looney; + crazy.userMap.insert(make_pair(FIVE, 5)); + crazy.xtructs.push_back(hello); + + map<Numberz, Insanity> first_map; + map<Numberz, Insanity> second_map; + + first_map.insert(make_pair(TWO, crazy)); + first_map.insert(make_pair(THREE, crazy)); + + second_map.insert(make_pair(SIX, looney)); + + insane.insert(make_pair(1, first_map)); + insane.insert(make_pair(2, second_map)); + + printf("return"); + printf(" = {"); + map<UserId, map<Numberz,Insanity> >::const_iterator i_iter; + for (i_iter = insane.begin(); i_iter != insane.end(); ++i_iter) { + printf("%"PRId64" => {", i_iter->first); + map<Numberz,Insanity>::const_iterator i2_iter; + for (i2_iter = i_iter->second.begin(); + i2_iter != i_iter->second.end(); + ++i2_iter) { + printf("%d => {", i2_iter->first); + map<Numberz, UserId> userMap = i2_iter->second.userMap; + map<Numberz, UserId>::const_iterator um; + printf("{"); + for (um = userMap.begin(); um != userMap.end(); ++um) { + printf("%d => %"PRId64", ", um->first, um->second); + } + printf("}, "); + + vector<Xtruct> xtructs = i2_iter->second.xtructs; + vector<Xtruct>::const_iterator x; + printf("{"); + for (x = xtructs.begin(); x != xtructs.end(); ++x) { + printf("{\"%s\", %d, %d, %"PRId64"}, ", x->string_thing.c_str(), (int)x->byte_thing, x->i32_thing, x->i64_thing); + } + printf("}"); + + printf("}, "); + } + printf("}, "); + } + printf("}\n"); + + + } + + void testMulti(Xtruct &hello, const int8_t arg0, const int32_t arg1, const int64_t arg2, const std::map<int16_t, std::string> &arg3, const Numberz arg4, const UserId arg5) { + printf("testMulti()\n"); + + hello.string_thing = "Hello2"; + hello.byte_thing = arg0; + hello.i32_thing = arg1; + hello.i64_thing = (int64_t)arg2; + } + + void testException(const std::string &arg) + throw(Xception, apache::thrift::TException) + { + printf("testException(%s)\n", arg.c_str()); + if (arg.compare("Xception") == 0) { + Xception e; + e.errorCode = 1001; + e.message = arg; + throw e; + } else if (arg.compare("ApplicationException") == 0) { + apache::thrift::TException e; + throw e; + } else { + Xtruct result; + result.string_thing = arg; + return; + } + } + + void testMultiException(Xtruct &result, const std::string &arg0, const std::string &arg1) throw(Xception, Xception2) { + + printf("testMultiException(%s, %s)\n", arg0.c_str(), arg1.c_str()); + + if (arg0.compare("Xception") == 0) { + Xception e; + e.errorCode = 1001; + e.message = "This is an Xception"; + throw e; + } else if (arg0.compare("Xception2") == 0) { + Xception2 e; + e.errorCode = 2002; + e.struct_thing.string_thing = "This is an Xception2"; + throw e; + } else { + result.string_thing = arg1; + return; + } + } + + void testOneway(int sleepFor) { + printf("testOneway(%d): Sleeping...\n", sleepFor); + sleep(sleepFor); + printf("testOneway(%d): done sleeping!\n", sleepFor); + } +}; + +int main(int argc, char **argv) { + + int port = 9090; + string serverType = "simple"; + string protocolType = "binary"; + size_t workerCount = 4; + + ostringstream usage; + + usage << + argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>]" << endl << + + "\t\tserver-type\t\ttype of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\". Default is " << serverType << endl << + + "\t\tprotocol-type\t\ttype of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl << + + "\t\tworkers\t\tNumber of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl; + + map<string, string> args; + + for (int ix = 1; ix < argc; ix++) { + string arg(argv[ix]); + if (arg.compare(0,2, "--") == 0) { + size_t end = arg.find_first_of("=", 2); + if (end != string::npos) { + args[string(arg, 2, end - 2)] = string(arg, end + 1); + } else { + args[string(arg, 2)] = "true"; + } + } else { + throw invalid_argument("Unexcepted command line token: "+arg); + } + } + + try { + + if (!args["port"].empty()) { + port = atoi(args["port"].c_str()); + } + + if (!args["server-type"].empty()) { + serverType = args["server-type"]; + if (serverType == "simple") { + } else if (serverType == "thread-pool") { + } else if (serverType == "threaded") { + } else if (serverType == "nonblocking") { + } else { + throw invalid_argument("Unknown server type "+serverType); + } + } + + if (!args["protocol-type"].empty()) { + protocolType = args["protocol-type"]; + if (protocolType == "binary") { + } else if (protocolType == "ascii") { + throw invalid_argument("ASCII protocol not supported"); + } else if (protocolType == "xml") { + throw invalid_argument("XML protocol not supported"); + } else { + throw invalid_argument("Unknown protocol type "+protocolType); + } + } + + if (!args["workers"].empty()) { + workerCount = atoi(args["workers"].c_str()); + } + } catch (exception& e) { + cerr << e.what() << endl; + cerr << usage; + } + + // Dispatcher + shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + shared_ptr<TestHandler> testHandler(new TestHandler()); + + shared_ptr<ThriftTestProcessor> testProcessor(new ThriftTestProcessor(testHandler)); + + // Transport + shared_ptr<TServerSocket> serverSocket(new TServerSocket(port)); + + // Factory + shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); + + if (serverType == "simple") { + + // Server + TSimpleServer simpleServer(testProcessor, + serverSocket, + transportFactory, + protocolFactory); + + printf("Starting the server on port %d...\n", port); + simpleServer.serve(); + + } else if (serverType == "thread-pool") { + + shared_ptr<ThreadManager> threadManager = + ThreadManager::newSimpleThreadManager(workerCount); + + shared_ptr<PosixThreadFactory> threadFactory = + shared_ptr<PosixThreadFactory>(new PosixThreadFactory()); + + threadManager->threadFactory(threadFactory); + + threadManager->start(); + + TThreadPoolServer threadPoolServer(testProcessor, + serverSocket, + transportFactory, + protocolFactory, + threadManager); + + printf("Starting the server on port %d...\n", port); + threadPoolServer.serve(); + + } else if (serverType == "threaded") { + + TThreadedServer threadedServer(testProcessor, + serverSocket, + transportFactory, + protocolFactory); + + printf("Starting the server on port %d...\n", port); + threadedServer.serve(); + + } else if (serverType == "nonblocking") { + TNonblockingServer nonblockingServer(testProcessor, port); + printf("Starting the nonblocking server on port %d...\n", port); + nonblockingServer.serve(); + } + + printf("done.\n"); + return 0; +} diff --git a/test/cpp/src/main.cpp b/test/cpp/src/main.cpp new file mode 100644 index 000000000..46ee950d8 --- /dev/null +++ b/test/cpp/src/main.cpp @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <concurrency/ThreadManager.h> +#include <concurrency/PosixThreadFactory.h> +#include <concurrency/Monitor.h> +#include <concurrency/Util.h> +#include <concurrency/Mutex.h> +#include <protocol/TBinaryProtocol.h> +#include <server/TSimpleServer.h> +#include <server/TThreadPoolServer.h> +#include <server/TThreadedServer.h> +#include <transport/TServerSocket.h> +#include <transport/TSocket.h> +#include <transport/TTransportUtils.h> +#include <transport/TFileTransport.h> +#include <TLogging.h> + +#include "Service.h" + +#include <iostream> +#include <set> +#include <stdexcept> +#include <sstream> + +#include <map> +#include <ext/hash_map> +using __gnu_cxx::hash_map; +using __gnu_cxx::hash; + +using namespace std; +using namespace boost; + +using namespace apache::thrift; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using namespace apache::thrift::server; +using namespace apache::thrift::concurrency; + +using namespace test::stress; + +struct eqstr { + bool operator()(const char* s1, const char* s2) const { + return strcmp(s1, s2) == 0; + } +}; + +struct ltstr { + bool operator()(const char* s1, const char* s2) const { + return strcmp(s1, s2) < 0; + } +}; + + +// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map; +typedef map<const char*, int, ltstr> count_map; + +class Server : public ServiceIf { + public: + Server() {} + + void count(const char* method) { + Guard m(lock_); + int ct = counts_[method]; + counts_[method] = ++ct; + } + + void echoVoid() { + count("echoVoid"); + return; + } + + count_map getCount() { + Guard m(lock_); + return counts_; + } + + int8_t echoByte(const int8_t arg) {return arg;} + int32_t echoI32(const int32_t arg) {return arg;} + int64_t echoI64(const int64_t arg) {return arg;} + void echoString(string& out, const string &arg) { + if (arg != "hello") { + T_ERROR_ABORT("WRONG STRING!!!!"); + } + out = arg; + } + void echoList(vector<int8_t> &out, const vector<int8_t> &arg) { out = arg; } + void echoSet(set<int8_t> &out, const set<int8_t> &arg) { out = arg; } + void echoMap(map<int8_t, int8_t> &out, const map<int8_t, int8_t> &arg) { out = arg; } + +private: + count_map counts_; + Mutex lock_; + +}; + +class ClientThread: public Runnable { +public: + + ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) : + _transport(transport), + _client(client), + _monitor(monitor), + _workerCount(workerCount), + _loopCount(loopCount), + _loopType(loopType) + {} + + void run() { + + // Wait for all worker threads to start + + {Synchronized s(_monitor); + while(_workerCount == 0) { + _monitor.wait(); + } + } + + _startTime = Util::currentTime(); + + _transport->open(); + + switch(_loopType) { + case T_VOID: loopEchoVoid(); break; + case T_BYTE: loopEchoByte(); break; + case T_I32: loopEchoI32(); break; + case T_I64: loopEchoI64(); break; + case T_STRING: loopEchoString(); break; + default: cerr << "Unexpected loop type" << _loopType << endl; break; + } + + _endTime = Util::currentTime(); + + _transport->close(); + + _done = true; + + {Synchronized s(_monitor); + + _workerCount--; + + if (_workerCount == 0) { + + _monitor.notify(); + } + } + } + + void loopEchoVoid() { + for (size_t ix = 0; ix < _loopCount; ix++) { + _client->echoVoid(); + } + } + + void loopEchoByte() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int8_t arg = 1; + int8_t result; + result =_client->echoByte(arg); + assert(result == arg); + } + } + + void loopEchoI32() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int32_t arg = 1; + int32_t result; + result =_client->echoI32(arg); + assert(result == arg); + } + } + + void loopEchoI64() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int64_t arg = 1; + int64_t result; + result =_client->echoI64(arg); + assert(result == arg); + } + } + + void loopEchoString() { + for (size_t ix = 0; ix < _loopCount; ix++) { + string arg = "hello"; + string result; + _client->echoString(result, arg); + assert(result == arg); + } + } + + shared_ptr<TTransport> _transport; + shared_ptr<ServiceClient> _client; + Monitor& _monitor; + size_t& _workerCount; + size_t _loopCount; + TType _loopType; + long long _startTime; + long long _endTime; + bool _done; + Monitor _sleep; +}; + + +int main(int argc, char **argv) { + + int port = 9091; + string serverType = "thread-pool"; + string protocolType = "binary"; + size_t workerCount = 4; + size_t clientCount = 20; + size_t loopCount = 50000; + TType loopType = T_VOID; + string callName = "echoVoid"; + bool runServer = true; + bool logRequests = false; + string requestLogPath = "./requestlog.tlog"; + bool replayRequests = false; + + ostringstream usage; + + usage << + argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl << + "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl << + "\thelp Prints this help text." << endl << + "\tcall Service method to call. Default is " << callName << endl << + "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl << + "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl << + "\tserver Run the Thrift server in this process. Default is " << runServer << endl << + "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl << + "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl << + "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl << + "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl << + "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl; + + + map<string, string> args; + + for (int ix = 1; ix < argc; ix++) { + + string arg(argv[ix]); + + if (arg.compare(0,2, "--") == 0) { + + size_t end = arg.find_first_of("=", 2); + + string key = string(arg, 2, end - 2); + + if (end != string::npos) { + args[key] = string(arg, end + 1); + } else { + args[key] = "true"; + } + } else { + throw invalid_argument("Unexcepted command line token: "+arg); + } + } + + try { + + if (!args["clients"].empty()) { + clientCount = atoi(args["clients"].c_str()); + } + + if (!args["help"].empty()) { + cerr << usage.str(); + return 0; + } + + if (!args["loop"].empty()) { + loopCount = atoi(args["loop"].c_str()); + } + + if (!args["call"].empty()) { + callName = args["call"]; + } + + if (!args["port"].empty()) { + port = atoi(args["port"].c_str()); + } + + if (!args["server"].empty()) { + runServer = args["server"] == "true"; + } + + if (!args["log-request"].empty()) { + logRequests = args["log-request"] == "true"; + } + + if (!args["replay-request"].empty()) { + replayRequests = args["replay-request"] == "true"; + } + + if (!args["server-type"].empty()) { + serverType = args["server-type"]; + + if (serverType == "simple") { + + } else if (serverType == "thread-pool") { + + } else if (serverType == "threaded") { + + } else { + + throw invalid_argument("Unknown server type "+serverType); + } + } + + if (!args["workers"].empty()) { + workerCount = atoi(args["workers"].c_str()); + } + + } catch(exception& e) { + cerr << e.what() << endl; + cerr << usage; + } + + shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory()); + + // Dispatcher + shared_ptr<Server> serviceHandler(new Server()); + + if (replayRequests) { + shared_ptr<Server> serviceHandler(new Server()); + shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Transports + shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + fileTransport->seekToEnd(); + + // Protocol Factory + shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + TFileProcessor fileProcessor(serviceProcessor, + protocolFactory, + fileTransport); + + fileProcessor.process(0, true); + exit(0); + } + + + if (runServer) { + + shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Transport + shared_ptr<TServerSocket> serverSocket(new TServerSocket(port)); + + // Transport Factory + shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); + + // Protocol Factory + shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + if (logRequests) { + // initialize the log file + shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + + transportFactory = + shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport)); + } + + shared_ptr<Thread> serverThread; + + if (serverType == "simple") { + + serverThread = threadFactory->newThread(shared_ptr<TServer>(new TSimpleServer(serviceProcessor, serverSocket, transportFactory, protocolFactory))); + + } else if (serverType == "threaded") { + + serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadedServer(serviceProcessor, serverSocket, transportFactory, protocolFactory))); + + } else if (serverType == "thread-pool") { + + shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); + + threadManager->threadFactory(threadFactory); + threadManager->start(); + serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(serviceProcessor, serverSocket, transportFactory, protocolFactory, threadManager))); + } + + cerr << "Starting the server on port " << port << endl; + + serverThread->start(); + + // If we aren't running clients, just wait forever for external clients + + if (clientCount == 0) { + serverThread->join(); + } + } + + if (clientCount > 0) { + + Monitor monitor; + + size_t threadCount = 0; + + set<shared_ptr<Thread> > clientThreads; + + if (callName == "echoVoid") { loopType = T_VOID;} + else if (callName == "echoByte") { loopType = T_BYTE;} + else if (callName == "echoI32") { loopType = T_I32;} + else if (callName == "echoI64") { loopType = T_I64;} + else if (callName == "echoString") { loopType = T_STRING;} + else {throw invalid_argument("Unknown service call "+callName);} + + for (size_t ix = 0; ix < clientCount; ix++) { + + shared_ptr<TSocket> socket(new TSocket("127.0.01", port)); + shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048)); + shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket)); + shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol)); + + clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType)))); + } + + for (std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) { + (*thread)->start(); + } + + long long time00; + long long time01; + + {Synchronized s(monitor); + threadCount = clientCount; + + cerr << "Launch "<< clientCount << " client threads" << endl; + + time00 = Util::currentTime(); + + monitor.notifyAll(); + + while(threadCount > 0) { + monitor.wait(); + } + + time01 = Util::currentTime(); + } + + long long firstTime = 9223372036854775807LL; + long long lastTime = 0; + + double averageTime = 0; + long long minTime = 9223372036854775807LL; + long long maxTime = 0; + + for (set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) { + + shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable()); + + long long delta = client->_endTime - client->_startTime; + + assert(delta > 0); + + if (client->_startTime < firstTime) { + firstTime = client->_startTime; + } + + if (client->_endTime > lastTime) { + lastTime = client->_endTime; + } + + if (delta < minTime) { + minTime = delta; + } + + if (delta > maxTime) { + maxTime = delta; + } + + averageTime+= delta; + } + + averageTime /= clientCount; + + + cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl; + + count_map count = serviceHandler->getCount(); + count_map::iterator iter; + for (iter = count.begin(); iter != count.end(); ++iter) { + printf("%s => %d\n", iter->first, iter->second); + } + cerr << "done." << endl; + } + + return 0; +} diff --git a/test/cpp/src/nb-main.cpp b/test/cpp/src/nb-main.cpp new file mode 100644 index 000000000..8c74a815d --- /dev/null +++ b/test/cpp/src/nb-main.cpp @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <concurrency/ThreadManager.h> +#include <concurrency/PosixThreadFactory.h> +#include <concurrency/Monitor.h> +#include <concurrency/Util.h> +#include <concurrency/Mutex.h> +#include <protocol/TBinaryProtocol.h> +#include <server/TSimpleServer.h> +#include <server/TThreadPoolServer.h> +#include <server/TThreadedServer.h> +#include <server/TNonblockingServer.h> +#include <transport/TServerSocket.h> +#include <transport/TSocket.h> +#include <transport/TTransportUtils.h> +#include <transport/TFileTransport.h> +#include <TLogging.h> + +#include "Service.h" + +#include <unistd.h> +#include <boost/shared_ptr.hpp> + +#include <iostream> +#include <set> +#include <stdexcept> +#include <sstream> + +#include <map> +#include <ext/hash_map> +using __gnu_cxx::hash_map; +using __gnu_cxx::hash; + +using namespace std; +using namespace boost; + +using namespace apache::thrift; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using namespace apache::thrift::server; +using namespace apache::thrift::concurrency; + +using namespace test::stress; + +struct eqstr { + bool operator()(const char* s1, const char* s2) const { + return strcmp(s1, s2) == 0; + } +}; + +struct ltstr { + bool operator()(const char* s1, const char* s2) const { + return strcmp(s1, s2) < 0; + } +}; + + +// typedef hash_map<const char*, int, hash<const char*>, eqstr> count_map; +typedef map<const char*, int, ltstr> count_map; + +class Server : public ServiceIf { + public: + Server() {} + + void count(const char* method) { + Guard m(lock_); + int ct = counts_[method]; + counts_[method] = ++ct; + } + + void echoVoid() { + count("echoVoid"); + // Sleep to simulate work + usleep(5000); + return; + } + + count_map getCount() { + Guard m(lock_); + return counts_; + } + + int8_t echoByte(const int8_t arg) {return arg;} + int32_t echoI32(const int32_t arg) {return arg;} + int64_t echoI64(const int64_t arg) {return arg;} + void echoString(string& out, const string &arg) { + if (arg != "hello") { + T_ERROR_ABORT("WRONG STRING!!!!"); + } + out = arg; + } + void echoList(vector<int8_t> &out, const vector<int8_t> &arg) { out = arg; } + void echoSet(set<int8_t> &out, const set<int8_t> &arg) { out = arg; } + void echoMap(map<int8_t, int8_t> &out, const map<int8_t, int8_t> &arg) { out = arg; } + +private: + count_map counts_; + Mutex lock_; + +}; + +class ClientThread: public Runnable { +public: + + ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) : + _transport(transport), + _client(client), + _monitor(monitor), + _workerCount(workerCount), + _loopCount(loopCount), + _loopType(loopType) + {} + + void run() { + + // Wait for all worker threads to start + + {Synchronized s(_monitor); + while(_workerCount == 0) { + _monitor.wait(); + } + } + + _startTime = Util::currentTime(); + + _transport->open(); + + switch(_loopType) { + case T_VOID: loopEchoVoid(); break; + case T_BYTE: loopEchoByte(); break; + case T_I32: loopEchoI32(); break; + case T_I64: loopEchoI64(); break; + case T_STRING: loopEchoString(); break; + default: cerr << "Unexpected loop type" << _loopType << endl; break; + } + + _endTime = Util::currentTime(); + + _transport->close(); + + _done = true; + + {Synchronized s(_monitor); + + _workerCount--; + + if (_workerCount == 0) { + + _monitor.notify(); + } + } + } + + void loopEchoVoid() { + for (size_t ix = 0; ix < _loopCount; ix++) { + _client->echoVoid(); + } + } + + void loopEchoByte() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int8_t arg = 1; + int8_t result; + result =_client->echoByte(arg); + assert(result == arg); + } + } + + void loopEchoI32() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int32_t arg = 1; + int32_t result; + result =_client->echoI32(arg); + assert(result == arg); + } + } + + void loopEchoI64() { + for (size_t ix = 0; ix < _loopCount; ix++) { + int64_t arg = 1; + int64_t result; + result =_client->echoI64(arg); + assert(result == arg); + } + } + + void loopEchoString() { + for (size_t ix = 0; ix < _loopCount; ix++) { + string arg = "hello"; + string result; + _client->echoString(result, arg); + assert(result == arg); + } + } + + shared_ptr<TTransport> _transport; + shared_ptr<ServiceClient> _client; + Monitor& _monitor; + size_t& _workerCount; + size_t _loopCount; + TType _loopType; + long long _startTime; + long long _endTime; + bool _done; + Monitor _sleep; +}; + + +int main(int argc, char **argv) { + + int port = 9091; + string serverType = "simple"; + string protocolType = "binary"; + size_t workerCount = 4; + size_t clientCount = 20; + size_t loopCount = 50000; + TType loopType = T_VOID; + string callName = "echoVoid"; + bool runServer = true; + bool logRequests = false; + string requestLogPath = "./requestlog.tlog"; + bool replayRequests = false; + + ostringstream usage; + + usage << + argv[0] << " [--port=<port number>] [--server] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--clients=<client-count>] [--loop=<loop-count>]" << endl << + "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl << + "\thelp Prints this help text." << endl << + "\tcall Service method to call. Default is " << callName << endl << + "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl << + "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl << + "\tserver Run the Thrift server in this process. Default is " << runServer << endl << + "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl << + "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl << + "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl << + "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl << + "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl; + + + map<string, string> args; + + for (int ix = 1; ix < argc; ix++) { + + string arg(argv[ix]); + + if (arg.compare(0,2, "--") == 0) { + + size_t end = arg.find_first_of("=", 2); + + string key = string(arg, 2, end - 2); + + if (end != string::npos) { + args[key] = string(arg, end + 1); + } else { + args[key] = "true"; + } + } else { + throw invalid_argument("Unexcepted command line token: "+arg); + } + } + + try { + + if (!args["clients"].empty()) { + clientCount = atoi(args["clients"].c_str()); + } + + if (!args["help"].empty()) { + cerr << usage.str(); + return 0; + } + + if (!args["loop"].empty()) { + loopCount = atoi(args["loop"].c_str()); + } + + if (!args["call"].empty()) { + callName = args["call"]; + } + + if (!args["port"].empty()) { + port = atoi(args["port"].c_str()); + } + + if (!args["server"].empty()) { + runServer = args["server"] == "true"; + } + + if (!args["log-request"].empty()) { + logRequests = args["log-request"] == "true"; + } + + if (!args["replay-request"].empty()) { + replayRequests = args["replay-request"] == "true"; + } + + if (!args["server-type"].empty()) { + serverType = args["server-type"]; + } + + if (!args["workers"].empty()) { + workerCount = atoi(args["workers"].c_str()); + } + + } catch(exception& e) { + cerr << e.what() << endl; + cerr << usage; + } + + shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory()); + + // Dispatcher + shared_ptr<Server> serviceHandler(new Server()); + + if (replayRequests) { + shared_ptr<Server> serviceHandler(new Server()); + shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Transports + shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + fileTransport->seekToEnd(); + + // Protocol Factory + shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + TFileProcessor fileProcessor(serviceProcessor, + protocolFactory, + fileTransport); + + fileProcessor.process(0, true); + exit(0); + } + + + if (runServer) { + + shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Protocol Factory + shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); + + // Transport Factory + shared_ptr<TTransportFactory> transportFactory; + + if (logRequests) { + // initialize the log file + shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + + transportFactory = + shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport)); + } + + shared_ptr<Thread> serverThread; + shared_ptr<Thread> serverThread2; + + if (serverType == "simple") { + + serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port))); + serverThread2 = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port+1))); + + } else if (serverType == "thread-pool") { + + shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); + + threadManager->threadFactory(threadFactory); + threadManager->start(); + serverThread = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port, threadManager))); + serverThread2 = threadFactory->newThread(shared_ptr<TServer>(new TNonblockingServer(serviceProcessor, protocolFactory, port+1, threadManager))); + } + + cerr << "Starting the server on port " << port << " and " << (port + 1) << endl; + serverThread->start(); + serverThread2->start(); + + // If we aren't running clients, just wait forever for external clients + + if (clientCount == 0) { + serverThread->join(); + serverThread2->join(); + } + } + sleep(1); + + if (clientCount > 0) { + + Monitor monitor; + + size_t threadCount = 0; + + set<shared_ptr<Thread> > clientThreads; + + if (callName == "echoVoid") { loopType = T_VOID;} + else if (callName == "echoByte") { loopType = T_BYTE;} + else if (callName == "echoI32") { loopType = T_I32;} + else if (callName == "echoI64") { loopType = T_I64;} + else if (callName == "echoString") { loopType = T_STRING;} + else {throw invalid_argument("Unknown service call "+callName);} + + for (size_t ix = 0; ix < clientCount; ix++) { + + shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2))); + shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket)); + shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket)); + shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol)); + + clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType)))); + } + + for (std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) { + (*thread)->start(); + } + + long long time00; + long long time01; + + {Synchronized s(monitor); + threadCount = clientCount; + + cerr << "Launch "<< clientCount << " client threads" << endl; + + time00 = Util::currentTime(); + + monitor.notifyAll(); + + while(threadCount > 0) { + monitor.wait(); + } + + time01 = Util::currentTime(); + } + + long long firstTime = 9223372036854775807LL; + long long lastTime = 0; + + double averageTime = 0; + long long minTime = 9223372036854775807LL; + long long maxTime = 0; + + for (set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) { + + shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable()); + + long long delta = client->_endTime - client->_startTime; + + assert(delta > 0); + + if (client->_startTime < firstTime) { + firstTime = client->_startTime; + } + + if (client->_endTime > lastTime) { + lastTime = client->_endTime; + } + + if (delta < minTime) { + minTime = delta; + } + + if (delta > maxTime) { + maxTime = delta; + } + + averageTime+= delta; + } + + averageTime /= clientCount; + + + cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl; + + count_map count = serviceHandler->getCount(); + count_map::iterator iter; + for (iter = count.begin(); iter != count.end(); ++iter) { + printf("%s => %d\n", iter->first, iter->second); + } + cerr << "done." << endl; + } + + return 0; +} |