/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "gen-cpp/OneWayService.h" BOOST_AUTO_TEST_SUITE(OneWayHTTPTest) using namespace apache::thrift; using apache::thrift::protocol::TProtocol; using apache::thrift::protocol::TBinaryProtocol; using apache::thrift::protocol::TBinaryProtocolFactory; using apache::thrift::protocol::TJSONProtocol; using apache::thrift::protocol::TJSONProtocolFactory; using apache::thrift::server::TThreadedServer; using apache::thrift::server::TServerEventHandler; using apache::thrift::transport::TTransport; using apache::thrift::transport::THttpServer; using apache::thrift::transport::THttpServerTransportFactory; using apache::thrift::transport::THttpClient; using apache::thrift::transport::TBufferedTransport; using apache::thrift::transport::TBufferedTransportFactory; using apache::thrift::transport::TMemoryBuffer; using apache::thrift::transport::TServerSocket; using apache::thrift::transport::TSocket; using apache::thrift::transport::TTransportException; using std::shared_ptr; using std::cout; using std::cerr; using std::endl; using std::string; namespace utf = boost::unit_test; // Define this env var to enable some logging (in case you need to debug) #undef ENABLE_STDERR_LOGGING class OneWayServiceHandler : public onewaytest::OneWayServiceIf { public: OneWayServiceHandler() = default; void roundTripRPC() override { #ifdef ENABLE_STDERR_LOGGING cerr << "roundTripRPC()" << endl; #endif } void oneWayRPC() override { #ifdef ENABLE_STDERR_LOGGING cerr << "oneWayRPC()" << std::endl ; #endif } }; class OneWayServiceCloneFactory : virtual public onewaytest::OneWayServiceIfFactory { public: ~OneWayServiceCloneFactory() override = default; onewaytest::OneWayServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override { (void)connInfo ; return new OneWayServiceHandler; } void releaseHandler( onewaytest::OneWayServiceIf* handler) override { delete handler; } }; class RPC0ThreadClass { public: RPC0ThreadClass(TThreadedServer& server) : server_(server) { } // Constructor ~RPC0ThreadClass() = default; // Destructor void Run() { server_.serve() ; } TThreadedServer& server_ ; } ; using apache::thrift::concurrency::Monitor; using apache::thrift::concurrency::Mutex; using apache::thrift::concurrency::Synchronized; // copied from IntegrationTest class TServerReadyEventHandler : public TServerEventHandler, public Monitor { public: TServerReadyEventHandler() : isListening_(false), accepted_(0) {} ~TServerReadyEventHandler() override = default; void preServe() override { Synchronized sync(*this); isListening_ = true; notify(); } void* createContext(shared_ptr input, shared_ptr output) override { Synchronized sync(*this); ++accepted_; notify(); (void)input; (void)output; return nullptr; } bool isListening() const { return isListening_; } uint64_t acceptedCount() const { return accepted_; } private: bool isListening_; uint64_t accepted_; }; class TBlockableBufferedTransport : public TBufferedTransport { public: TBlockableBufferedTransport(std::shared_ptr transport) : TBufferedTransport(transport, 10240), blocked_(false) { } uint32_t write_buffer_length() { auto have_bytes = static_cast(wBase_ - wBuf_.get()); return have_bytes ; } void block() { blocked_ = true ; #ifdef ENABLE_STDERR_LOGGING cerr << "block flushing\n" ; #endif } void unblock() { blocked_ = false ; #ifdef ENABLE_STDERR_LOGGING cerr << "unblock flushing, buffer is\n<<" << std::string((char *)wBuf_.get(), write_buffer_length()) << ">>\n" ; #endif } void flush() override { if (blocked_) { #ifdef ENABLE_STDERR_LOGGING cerr << "flush was blocked\n" ; #endif return ; } TBufferedTransport::flush() ; } bool blocked_ ; } ; BOOST_AUTO_TEST_CASE( JSON_BufferedHTTP ) { std::shared_ptr ss = std::make_shared(0) ; TThreadedServer server( std::make_shared(std::make_shared()), ss, //port std::make_shared(), std::make_shared()); std::shared_ptr pEventHandler(new TServerReadyEventHandler) ; server.setServerEventHandler(pEventHandler); #ifdef ENABLE_STDERR_LOGGING cerr << "Starting the server...\n"; #endif RPC0ThreadClass t(server) ; boost::thread thread(&RPC0ThreadClass::Run, &t); { Synchronized sync(*(pEventHandler.get())); while (!pEventHandler->isListening()) { pEventHandler->wait(); } } int port = ss->getPort() ; #ifdef ENABLE_STDERR_LOGGING cerr << "port " << port << endl ; #endif { std::shared_ptr socket(new TSocket("localhost", port)); socket->setRecvTimeout(10000) ; // 1000msec should be enough std::shared_ptr blockable_transport(new TBlockableBufferedTransport(socket)); std::shared_ptr transport(new THttpClient(blockable_transport, "localhost", "/service")); std::shared_ptr protocol(new TJSONProtocol(transport)); onewaytest::OneWayServiceClient client(protocol); transport->open(); client.roundTripRPC(); blockable_transport->block() ; uint32_t size0 = blockable_transport->write_buffer_length() ; client.send_oneWayRPC() ; uint32_t size1 = blockable_transport->write_buffer_length() ; client.send_oneWayRPC() ; uint32_t size2 = blockable_transport->write_buffer_length() ; BOOST_CHECK((size1 - size0) == (size2 - size1)) ; blockable_transport->unblock() ; client.send_roundTripRPC() ; blockable_transport->flush() ; try { client.recv_roundTripRPC() ; } catch (const TTransportException &e) { BOOST_ERROR( "we should not get a transport exception -- this means we failed: " + std::string(e.what()) ) ; } transport->close(); } server.stop(); thread.join() ; #ifdef ENABLE_STDERR_LOGGING cerr << "finished.\n"; #endif } BOOST_AUTO_TEST_SUITE_END()