summaryrefslogtreecommitdiff
path: root/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/NDDS_Latency_Sender.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/NDDS_Latency_Sender.cpp')
-rw-r--r--CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/NDDS_Latency_Sender.cpp715
1 files changed, 0 insertions, 715 deletions
diff --git a/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/NDDS_Latency_Sender.cpp b/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/NDDS_Latency_Sender.cpp
deleted file mode 100644
index 752f1971d50..00000000000
--- a/CIAO/connectors/dds4ccm/performance-tests/DDSLatency/DDS_Sender/NDDS_Latency_Sender.cpp
+++ /dev/null
@@ -1,715 +0,0 @@
-#include "ace/Get_Opt.h"
-#include "tao/ORB_Core.h"
-#include "ace/Env_Value_T.h"
-#include "Latency_Base.h"
-#include "Latency_BaseSupport.h"
-#include "Latency_BasePlugin.h"
-#include "ace/Tokenizer_T.h"
-#include "ace/OS_NS_math.h"
-
-#include <ndds/ndds_namespace_cpp.h>
-#include <ndds/ndds_cpp.h>
-#include <ndds/clock/clock_highResolution.h>
-
-// Global variables
-CORBA::UShort iterations_ = 1000;
-CORBA::UShort datalen_ = 100;
-CORBA::UShort datalen_idx_ = 0;
-CORBA::UShort nr_of_runs_ = 10;
-CORBA::UShort sleep_ = 2;
-ACE_UINT64 tv_total_ = 0;
-ACE_UINT64 tv_max_ = 0;
-ACE_UINT64 tv_min_ = 0;
-CORBA::UShort count_ = 0;
-CORBA::UShort number_of_msg_ = 0;
-bool received_ = false;
-CORBA::Long seq_num_ = 0;
-CORBA::Double sigma_duration_squared_;
-struct RTINtpTime start_time_;
-
-ACE_UINT64 * duration_times_ = 0;
-CORBA::Short * datalen_range_ = 0;
-ACE_UINT64 clock_overhead_;
-RTIClock *timer = 0;
-LatencyTest * instance_ = 0;
-
-ACE_UINT64 unexpected_count_ = 0;
-
-LatencyTestDataWriter * test_data_writer_ = 0;
-
-const char * lib_name_ = 0;
-const char * prof_name_ = 0;
-
-CORBA::UShort domain_id_ = 0;
-CORBA::Boolean both_read_write_ = false;
-
-/* The listener of events and data from the middleware */
-class HelloListener: public DDSDataReaderListener
-{
-public:
- void on_data_available(DDSDataReader *reader);
-};
-
-/* The dummy listener of events and data from the middleware */
-class DummyListener: public DDSDataReaderListener
-{
-};
-
-// since this one is also created in the dds4ccm wrapper, we need
-// to create one here as well.
-class DummyPublisherListener :
- public DDSPublisherListener
-{
-public:
- virtual void on_offered_deadline_missed (
- DDSDataWriter* writer,
- const DDS_OfferedDeadlineMissedStatus& status);
- virtual void on_liveliness_lost(
- DDSDataWriter* writer,
- const DDS_LivelinessLostStatus& status);
- virtual void on_offered_incompatible_qos(
- DDSDataWriter* writer,
- const DDS_OfferedIncompatibleQosStatus& status);
- virtual void on_publication_matched(
- DDSDataWriter* writer,
- const DDS_PublicationMatchedStatus& status);
- virtual void on_reliable_writer_cache_changed(
- DDSDataWriter* writer,
- const DDS_ReliableWriterCacheChangedStatus& status);
- virtual void on_reliable_reader_activity_changed (
- DDSDataWriter* writer,
- const DDS_ReliableReaderActivityChangedStatus& status);
-};
-
-void DummyPublisherListener::on_offered_deadline_missed (
- DDSDataWriter* /*writer*/,
- const DDS_OfferedDeadlineMissedStatus& /*status*/)
-{
- ACE_DEBUG ((LM_DEBUG, "on_offered_deadline_missed\n"));
- ++unexpected_count_;
-}
-
-void DummyPublisherListener::on_liveliness_lost(
- DDSDataWriter* /*writer*/,
- const DDS_LivelinessLostStatus& /*status*/)
-{
- ++unexpected_count_;
-}
-
-void DummyPublisherListener::on_offered_incompatible_qos(
- DDSDataWriter* /*writer*/,
- const DDS_OfferedIncompatibleQosStatus& /*status*/)
-{
- ++unexpected_count_;
-}
-
-void DummyPublisherListener::on_publication_matched(
- DDSDataWriter* /*writer*/,
- const DDS_PublicationMatchedStatus& /*status*/)
-{
- ++unexpected_count_;
-}
-
-void DummyPublisherListener::on_reliable_writer_cache_changed(
- DDSDataWriter* /*writer*/,
- const DDS_ReliableWriterCacheChangedStatus& /*status*/)
-{
- ++unexpected_count_;
-}
-
-void DummyPublisherListener::on_reliable_reader_activity_changed (
- DDSDataWriter* /*writer*/,
- const DDS_ReliableReaderActivityChangedStatus& /*status*/)
-{
- ++unexpected_count_;
-}
-
-void
-split_qos (const char * qos)
-{
- char* buf = ACE_OS::strdup (qos);
- ACE_Tokenizer_T<char> tok (buf);
- tok.delimiter_replace ('#', 0);
- for (char *p = tok.next (); p; p = tok.next ())
- {
- if (!lib_name_)
- {
- lib_name_ = ACE_OS::strdup (p);
- }
- else if (!prof_name_)
- {
- prof_name_ = ACE_OS::strdup (p);
- }
- }
- ACE_OS::free (buf);
- ACE_DEBUG ((LM_DEBUG, "Sender : Found QoS profile %C %C\n",
- lib_name_,
- prof_name_));
-}
-
-int
-parse_args (int argc, ACE_TCHAR *argv[])
-{
- ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("b:d:i:s:q:O"));
- int c;
-
- while ((c = get_opts ()) != -1)
- {
- switch (c)
- {
- case 'd':
- domain_id_ = ACE_OS::atoi (get_opts.opt_arg ());
- break;
- case 'i':
- iterations_ = ACE_OS::atoi (get_opts.opt_arg ());
- break;
- case 's':
- sleep_ = ACE_OS::atoi (get_opts.opt_arg ());
- break;
- case 'b':
- both_read_write_ = true;
- break;
- case 'q':
- {
- const char * qos = get_opts.opt_arg ();
- split_qos (qos);
- }
- break;
- case '?':
- default:
- ACE_ERROR_RETURN ((LM_ERROR,
- "usage:\n\n"
- " -d <domain_id >\n"
- " -i <iterations >\n"
- " -s <sleep>\n"
- " -q <QoS profile>\n"
- " -b "
- "\n"),
- -1);
- }
- }
- // Indicates successful parsing of the command line
- return 0;
-}
-
-void
-calculate_clock_overhead (void)
-{
- int num_of_loops_clock = 320;
- struct RTINtpTime begin_time = RTI_NTP_TIME_ZERO;
- struct RTINtpTime clock_roundtrip_time = RTI_NTP_TIME_ZERO;
-
- timer->getTime(timer, &begin_time);
- for (int i = 0; i < num_of_loops_clock; ++i) {
- timer->getTime(timer, &clock_roundtrip_time);
- }
- RTINtpTime_decrement(clock_roundtrip_time, begin_time);
- clock_overhead_ = (ACE_UINT64)(1E6 * RTINtpTime_toDouble(&clock_roundtrip_time) /
- (double)num_of_loops_clock);
-}
-
-void
-init_values (void)
-{
- delete [] duration_times_;
- duration_times_ = new ACE_UINT64[iterations_];
- datalen_range_ = new CORBA::Short[nr_of_runs_];
- int start = 16;
- for(int i = 0; i < nr_of_runs_; i++)
- {
- datalen_range_[i] = start;
- start = 2 * start;
- }
-
- datalen_ = datalen_range_[0];
-
- // make instances of Topic
- instance_->seq_num = 0;
- instance_->data.length (datalen_);
- calculate_clock_overhead ();
-}
-
-void
-record_time (struct RTINtpTime& receive_time)
-{
- ++count_;
- RTINtpTime roundtrip = {0,0};
- RTINtpTime_subtract(roundtrip, receive_time, start_time_);
- ACE_UINT64 duration =
- (ACE_UINT64)(1E6 * RTINtpTime_toDouble(&roundtrip));// - _clock_overhead;
-
- if (count_ > iterations_)
- {
- ACE_ERROR ((LM_ERROR, "ERROR: Internal error while getting more "
- "messages back as expected.\n"));
- }
- else
- {
- duration_times_[count_-1] = duration;
- sigma_duration_squared_ += (double)duration * (double)duration;
- tv_total_ += duration;
- if (duration > tv_max_ || (tv_max_ == 0L))
- {
- tv_max_ = duration;
- }
- if (duration < tv_min_ || (tv_min_ == 0L))
- {
- tv_min_ = duration;
- }
- }
-}
-
-void
-reset_results (void)
-{
- count_ = 0;
- delete [] duration_times_;
- duration_times_ = new ACE_UINT64[iterations_];
- tv_total_ = 0L;
- tv_max_ = 0L;
- tv_min_ = 0L;
- number_of_msg_ = 0;
- received_ = false;
- seq_num_ = 0;
- sigma_duration_squared_ = 0;
-}
-
-static int compare_two_longs (const void * long1, const void * long2)
-{
- return (int)((*(ACE_UINT64*)long1 - *(ACE_UINT64*)long2));
-}
-
-void
-calc_results()
-{
- // Sort all duration times.
- qsort(duration_times_,
- count_,
- sizeof(ACE_UINT64),
- compare_two_longs);
-
- // Show latency_50_percentile, latency_90_percentile,
- // latency_99_percentile and latency_99.99_percentile.
- // For example duration_times[per50] is the median i.e. 50% of the
- // samples have a latency time <= duration_times[per50]
- int per50 = count_/2;
- int per90 = (int)(count_ * 0.90);
- int per99 = (int)(count_ * 0.990);
- int per9999 = (int)(count_ * 0.9999);
-
- double avg = 0;
- double roundtrip_time_std = 0;
- if (count_ > 0)
- {
- avg = (double)(tv_total_ / count_);
- // Calculate standard deviation.
- roundtrip_time_std = sqrt(
- (sigma_duration_squared_ / (double)count_) -
- (avg * avg));
- }
-
- // Show values as float, in order to be comparable with RTI performance test.
- if (count_ > 0)
- {
- if (datalen_idx_ == 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "Collecting statistics on %d samples per message size.\n"
- "This is the roundtrip time, *not* the one-way-latency\n"
- "Clock overhead %d\n"
- "bytes ,stdev us,ave us, min us, 50%% us, 90%% us, 99%% us, 99.99%%,"
- " max us\n"
- "------,-------,-------,-------,-------,-------,-------,-------,"
- "-------\n", count_, clock_overhead_));
- }
- ACE_DEBUG ((LM_DEBUG,
- "%6d,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f,%7.1f\n",
- datalen_,
- roundtrip_time_std,
- avg,
- (double)tv_min_,
- (double)duration_times_[per50-1],
- (double)duration_times_[per90-1],
- (double)duration_times_[per99-1],
- (double)duration_times_[per9999-1],
- (double)tv_max_));
- }
- else
- {
- ACE_ERROR ((LM_ERROR, "SUMMARY SENDER latency time:\n "
- "No samples reveived back.\n"));
- }
-}
-
-
-bool
-write_one (void)
-{
- // First message sent always, next messages only as previous sent message
- // is received back.
- if ((number_of_msg_ == 0) || received_)
- {
- // All messages send, stop timer.
- if ((iterations_ != 0) &&
- (number_of_msg_ >= iterations_ ))
- {
- if (datalen_idx_ >= (nr_of_runs_ - 1))
- {
- calc_results();
- return true;
- }
- else
- {
- calc_results();
- reset_results();
- ++datalen_idx_;
- datalen_ = datalen_range_[datalen_idx_];
- instance_->data.length (datalen_);
- }
- }
- else
- {
- try
- {
- instance_->seq_num = number_of_msg_;
- // Keep last sent seq_num, to control if message is sent back.
- seq_num_ = number_of_msg_;
- received_ = false;
- RTINtpTime_setZero(&start_time_);
- timer->getTime(timer, &start_time_);
- test_data_writer_->write (*instance_, DDS_HANDLE_NIL);
- }
- catch (const CORBA::Exception& )
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Internal Error ")
- ACE_TEXT ("while writing sample with sequence_number <%u>.\n"),
- instance_->seq_num));
- }
- ++number_of_msg_;
- }
- }
- return false;
-}
-
-void start (void)
-{
- unsigned int sec = sleep_/1000;
- unsigned int usec = (sleep_ % 1000) * 1000;
-
- while (!write_one())
- {
- ACE_Time_Value sleeptime (sec, usec);
- ACE_OS::sleep (sleeptime);
- }
-}
-
-void
-read (LatencyTest & an_instance, struct RTINtpTime& receive_time)
-{
- if (an_instance.seq_num == seq_num_)
- {
- record_time (receive_time);
- received_ = true;
- }
-}
-
-int ACE_TMAIN(int argc, ACE_TCHAR* argv[])
-{
- timer = RTIHighResolutionClock_new();
- DDS_ReturnCode_t retcode;
- ::DDS::DataReader * data_reader = 0;
- ::DDS::DataReader * dum_data_reader = 0;
-
- HelloListener listener;
- DummyListener dum_listener;
- const char * type_name = 0;
- int main_result = 1; /* error by default */
-
- ::DDS::Topic * receive_topic = 0;
- ::DDS::Topic * send_topic = 0;
- ::DDS::DataWriter * data_writer = 0;
- ::DDS::DataWriter * dum_data_writer = 0;
- DummyPublisherListener * pub_listener = 0;
- ::DDS::Publisher * pub = 0;
-
- try
- {
- ACE_Env_Value<int> id (ACE_TEXT("DDS4CCM_DEFAULT_DOMAIN_ID"), domain_id_);
- domain_id_ = id;
-
- if (parse_args (argc, argv) != 0)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("Error arguments.\n")));
- return 1;
- }
-
- /* Create the domain participant */
- DDSDomainParticipant * participant =
- DDSDomainParticipantFactory::get_instance()->
- create_participant_with_profile(
- domain_id_,
- lib_name_,
- prof_name_,
- 0,
- DDS_STATUS_MASK_NONE);
- if (!participant)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("Sender : Unable to create domain participant.\n")));
- goto clean_exit;
- }
-
- /* Register type before creating topic */
- type_name = LatencyTestTypeSupport::get_type_name();
- retcode = LatencyTestTypeSupport::register_type (participant,
- type_name);
- if (retcode != DDS_RETCODE_OK)
- {
- goto clean_exit;
- }
-
- send_topic = participant->create_topic_with_profile (
- "send",
- type_name,
- lib_name_,
- prof_name_,
- 0,
- DDS_STATUS_MASK_NONE);
- if (!send_topic)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create topic.\n")));
- goto clean_exit;
- }
-
- receive_topic = participant->create_topic_with_profile (
- "receive",
- type_name,
- lib_name_,
- prof_name_,
- 0,
- DDS_STATUS_MASK_NONE);
- if (!receive_topic)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create topic.\n")));
- goto clean_exit;
- }
-
- pub_listener = new DummyPublisherListener ();
- pub = participant->create_publisher_with_profile (
- lib_name_,
- prof_name_,
- 0,
- DDS_STATUS_MASK_NONE);
-
- if (!pub)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create publisher.\n")));
- goto clean_exit;
- }
-
- /* Create the data writer using the publisher */
- data_writer = pub->create_datawriter_with_profile(
- send_topic,
- lib_name_,
- prof_name_,
- pub_listener,
- DDS_OFFERED_DEADLINE_MISSED_STATUS |
- DDS_OFFERED_INCOMPATIBLE_QOS_STATUS |
- DDS_RELIABLE_WRITER_CACHE_CHANGED_STATUS |
- DDS_RELIABLE_READER_ACTIVITY_CHANGED_STATUS |
- DDS_LIVELINESS_LOST_STATUS |
- DDS_PUBLICATION_MATCHED_STATUS);
- if (!data_writer)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data writer.\n")));
- goto clean_exit;
- }
-
- /* Create a data reader, which will not be used, but is there for
- * compatibility with DDS4CCM latency test, where there is always a
- * reader and a writer per connector.
- */
- if (both_read_write_)
- {
- dum_data_reader = participant->create_datareader_with_profile(
- send_topic,
- lib_name_,
- prof_name_,
- &dum_listener,
- DDS_DATA_AVAILABLE_STATUS);
-
- if (!dum_data_reader )
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create dummy data reader.\n")));
- goto clean_exit;
- }
- }
-
- data_reader = participant->create_datareader_with_profile(
- receive_topic,
- lib_name_,
- prof_name_,
- &listener,
- DDS_DATA_AVAILABLE_STATUS);
- if (!data_reader)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data reader.\n")));
- goto clean_exit;
- }
-
- /* Create a data writer, which will not be used, but is there for
- * compatibility with DDS4CCM latency test, where there is always a
- * reader and a writer per connector
- */
- if (both_read_write_)
- {
- dum_data_writer = participant->create_datawriter_with_profile(
- receive_topic,
- lib_name_,
- prof_name_,
- 0,
- DDS_STATUS_MASK_NONE);
- if (!dum_data_writer)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create dummy data writer.\n")));
- goto clean_exit;
- }
- }
-
- /* Create data sample for writing */
- instance_ = LatencyTestTypeSupport::create_data ();
- if (instance_ == 0)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Unable to create data sample.\n")));
- goto clean_exit;
- }
-
- init_values();
-
- test_data_writer_ = LatencyTestDataWriter::narrow (data_writer);
- if (!test_data_writer_)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("LatencyTestDataWriter_narrow failed.\n")));
- goto clean_exit;
- }
-
- // Sleep a couple seconds to allow discovery to happen
- ACE_OS::sleep (5);
-
- // handle writing of messages
- start();
-
- /* --- Clean Up --- */
- ACE_OS::sleep (5);
- main_result = 0;
-
- clean_exit:
- const char * read_write_str = 0;
- if (both_read_write_)
- {
- read_write_str = "Used a extra dummy reader and writer per topic.";
- }
- else
- {
- read_write_str = "Used a reader for one topic and a writer for other topic.";
- }
-
- if((nr_of_runs_ -1) != datalen_idx_)
- {
- ACE_DEBUG ((LM_DEBUG, "SUMMARY SENDER : %u of %u runs completed.\n"
- " Number of messages sent of last run (%u): %u\n"
- "%C\n\n",
- datalen_idx_,
- nr_of_runs_,
- datalen_idx_ + 1,
- number_of_msg_,
- read_write_str));
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG, "TEST successful, number of runs (%u) of "
- "%u messages.\n"
- "%C\n\n",
- nr_of_runs_,
- number_of_msg_,
- read_write_str));
- }
- ACE_DEBUG ((LM_DEBUG, "\tNumber of unexpected events : %u\n",
- unexpected_count_));
- if (participant)
- {
- retcode = participant->delete_contained_entities ();
- if (retcode != DDS_RETCODE_OK)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n")));
- main_result = 1;
- }
- retcode = DDSDomainParticipantFactory::get_instance()->
- delete_participant (participant);
- if (retcode != DDS_RETCODE_OK)
- {
- ACE_ERROR ((LM_ERROR, ACE_TEXT ("Deletion failed.\n")));
- main_result = 1;
- }
- }
- }
- catch (const ::CORBA::Exception &ex)
- {
- ex._tao_print_exception("ERROR : Unexpected CORBA exception caught :");
- main_result = 1;
- }
-
- delete [] datalen_range_;
- delete [] duration_times_;
- delete pub_listener;
-
- return main_result;
-}
-
-void HelloListener::on_data_available(DDSDataReader *reader)
-{
- LatencyTestDataReader * test_reader =
- LatencyTestDataReader::narrow (reader);
- if (!test_reader)
- {
- /* In this specific case, this will never fail */
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("LatencyTestDataReader::narrow failed.\n")));
- return;
- }
-
- /* Loop until there are messages available in the queue */
- for(;;)
- {
- ::DDS::SampleInfoSeq info;
- ::LatencyTestRTISeq sample_req;
- ::DDS::ReturnCode_t const retcode = test_reader->take(sample_req, info);
- if (retcode == DDS_RETCODE_NO_DATA)
- {
- /* No more samples */
- break;
- }
- else if (retcode != DDS_RETCODE_OK)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("Unable to take data from data reader,"
- " error %d.\n"),
- retcode));
- return;
- }
- for (::DDS_Long i = 0; i < sample_req.length (); ++i)
- {
- if (info[i].valid_data)
- {
- struct RTINtpTime finish_time;
- RTINtpTime_setZero(&finish_time);
- timer->getTime(timer, &finish_time);
- read(sample_req[i], finish_time);
- }
- }
- (void) test_reader->return_loan (sample_req, info);
- }
-}
-