summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLassi Marttala <Lassi.LM.Marttala@partner.bmw.de>2012-10-04 09:58:29 +0200
committerLassi Marttala <Lassi.LM.Marttala@partner.bmw.de>2012-10-09 13:54:19 +0200
commit20bcfb6c88769bfa38e5bc8389949b8563934882 (patch)
treed42730c90755d5e7a3244952d7f14ad3780741fe
parent2ebaa24eb8ead664e78395c9b552c2ca69c70f45 (diff)
downloadDLT-daemon-20bcfb6c88769bfa38e5bc8389949b8563934882.tar.gz
[GDLT-137]: Automated resending: Improve init. Use everywhere.
This is a combination of 2 commits. Rest of the commit messages below. [GDLT-137]: Add resending to all user library places which use buffer. [GDLT-137]: Delay mq opening. Make it thread safe.
-rw-r--r--src/lib/dlt_user.c208
-rwxr-xr-xsrc/tests/dlt-test-client.c1
2 files changed, 156 insertions, 53 deletions
diff --git a/src/lib/dlt_user.c b/src/lib/dlt_user.c
index 0edea8c..0681bd6 100644
--- a/src/lib/dlt_user.c
+++ b/src/lib/dlt_user.c
@@ -101,8 +101,13 @@ static pthread_attr_t dlt_receiverthread_attr;
/* Segmented Network Trace */
#define DLT_MAX_TRACE_SEGMENT_SIZE 1024
#define DLT_MESSAGE_QUEUE_NAME "/dlt_message_queue"
-#define DLT_DELAYED_RESEND_INDICATOR_PATTERN 0xFFFFFFFF
+#define DLT_DELAYED_RESEND_INDICATOR_PATTERN 0xFFFF
+/* Mutex to wait on while message queue is not initialized */
+pthread_mutex_t mq_mutex;
+pthread_cond_t mq_init_condition;
+
+/* Structure to pass data to segmented thread */
typedef struct {
DltContext *handle;
uint16_t id;
@@ -129,6 +134,7 @@ static int dlt_user_log_check_user_message(void);
static void dlt_user_log_reattach_to_daemon(void);
static int dlt_user_log_send_overflow(void);
static void dlt_user_trace_network_segmented_thread(void *unused);
+static int dlt_user_queue_resend(void);
int dlt_user_check_library_version(const char *user_major_version,const char *user_minor_version){
@@ -240,35 +246,15 @@ int dlt_init(void)
dlt_log(LOG_WARNING, "Can't destroy thread attributes!\n");
}
- /* Generate per process name for queue */
- char queue_name[NAME_MAX];
- sprintf(queue_name, "%s.%d", DLT_MESSAGE_QUEUE_NAME, getpid());
-
- /* Maximum queue size is 10, limit to size of pointers */
- struct mq_attr mqatr;
- mqatr.mq_flags = 0;
- mqatr.mq_maxmsg = 10;
- mqatr.mq_msgsize = sizeof(s_segmented_data *);
- mqatr.mq_curmsgs = 0;
-
- /* Separate handles for reading and writing */
- dlt_user.dlt_segmented_queue_read_handle = mq_open(queue_name, O_CREAT| O_RDONLY,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH, &mqatr);
- if(dlt_user.dlt_segmented_queue_read_handle < 0)
- {
- dlt_log(LOG_CRIT, "Can't create message queue read handle!\n");
- dlt_log(LOG_CRIT, strerror(errno));
- return -1;
- }
+ /* These will be lazy initialized only when needed */
+ dlt_user.dlt_segmented_queue_read_handle = -1;
+ dlt_user.dlt_segmented_queue_write_handle = -1;
- dlt_user.dlt_segmented_queue_write_handle = mq_open(queue_name, O_WRONLY);
- if(dlt_user.dlt_segmented_queue_write_handle < 0)
- {
- dlt_log(LOG_CRIT, "Can't open message queue write handle!\n");
- dlt_log(LOG_CRIT, strerror(errno));
- return -1;
- }
+ /* Wait mutext for segmented thread */
+ pthread_mutex_init(&mq_mutex, NULL);
+ pthread_cond_init(&mq_init_condition, NULL);
+ /* Start the segmented thread */
if(pthread_create(&(dlt_user.dlt_segmented_nwt_handle), NULL,
(void *)dlt_user_trace_network_segmented_thread, NULL))
{
@@ -304,6 +290,73 @@ int dlt_init_file(const char *name)
return 0;
}
+int dlt_init_message_queue(void)
+{
+ pthread_mutex_lock(&mq_mutex);
+ if(dlt_user.dlt_segmented_queue_read_handle >= 0 &&
+ dlt_user.dlt_segmented_queue_write_handle >= 0)
+ {
+ // Already intialized
+ pthread_mutex_unlock(&mq_mutex);
+ return 0;
+ }
+
+ /* Generate per process name for queue */
+ char queue_name[NAME_MAX];
+ sprintf(queue_name, "%s.%d", DLT_MESSAGE_QUEUE_NAME, getpid());
+
+ /* Maximum queue size is 10, limit to size of pointers */
+ struct mq_attr mqatr;
+ mqatr.mq_flags = 0;
+ mqatr.mq_maxmsg = 10;
+ mqatr.mq_msgsize = sizeof(s_segmented_data *);
+ mqatr.mq_curmsgs = 0;
+
+ /**
+ * Create the message queue. It must be newly created
+ * if old one was left by a crashing process.
+ * */
+ dlt_user.dlt_segmented_queue_read_handle = mq_open(queue_name, O_CREAT| O_RDONLY | O_EXCL,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH, &mqatr);
+ if(dlt_user.dlt_segmented_queue_read_handle < 0)
+ {
+ if(errno == EEXIST)
+ {
+ dlt_log(LOG_WARNING, "Old message queue exists, trying to delete.\n");
+ if(mq_unlink(queue_name) < 0)
+ {
+ dlt_log(LOG_CRIT, "Could not delete existing message queue!\n");
+ dlt_log(LOG_CRIT, strerror(errno));
+ }
+ else // Retry
+ {
+ dlt_user.dlt_segmented_queue_read_handle = mq_open(queue_name, O_CREAT| O_RDONLY | O_EXCL,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH, &mqatr);
+ }
+ }
+ if(dlt_user.dlt_segmented_queue_read_handle < 0)
+ {
+ dlt_log(LOG_CRIT, "Can't create message queue read handle!\n");
+ dlt_log(LOG_CRIT, strerror(errno));
+ pthread_mutex_unlock(&mq_mutex);
+ return -1;
+ }
+ }
+
+ dlt_user.dlt_segmented_queue_write_handle = mq_open(queue_name, O_WRONLY);
+ if(dlt_user.dlt_segmented_queue_write_handle < 0)
+ {
+ dlt_log(LOG_CRIT, "Can't open message queue write handle!\n");
+ dlt_log(LOG_CRIT, strerror(errno));
+ pthread_mutex_unlock(&mq_mutex);
+ return -1;
+ }
+
+ pthread_cond_signal(&mq_init_condition);
+ pthread_mutex_unlock(&mq_mutex);
+ return 0;
+}
+
int dlt_init_common(void)
{
char *env_local_print;
@@ -491,18 +544,13 @@ int dlt_free(void)
char queue_name[NAME_MAX];
sprintf(queue_name, "%s.%d", DLT_MESSAGE_QUEUE_NAME, getpid());
- if(mq_close(dlt_user.dlt_segmented_queue_write_handle) < 0)
- {
- dlt_log(LOG_ERR, "Failed to unlink message queue write handle!\n");
- dlt_log(LOG_ERR, strerror(errno));
- }
-
- if(mq_close(dlt_user.dlt_segmented_queue_read_handle) < 0 ||
- mq_unlink(queue_name))
- {
- dlt_log(LOG_ERR, "Failed to unlink message queue read handle!\n");
- dlt_log(LOG_ERR, strerror(errno));
- }
+ /**
+ * Ignore errors from these, to not to spam user if dlt_free
+ * is accidentally called multiple times.
+ */
+ mq_close(dlt_user.dlt_segmented_queue_write_handle);
+ mq_close(dlt_user.dlt_segmented_queue_read_handle);
+ mq_unlink(queue_name);
dlt_user_initialised = 0;
@@ -1049,6 +1097,11 @@ int dlt_forward_msg(void *msgdata,size_t size)
}
DLT_SEM_FREE();
+
+ if(dlt_user_queue_resend() < 0)
+ {
+ dlt_log(LOG_WARNING, "Failed to queue resending.\n");
+ }
}
switch (ret)
@@ -2061,8 +2114,17 @@ void dlt_user_trace_network_segmented_thread(void *unused)
while(1)
{
+ // Wait untill message queue is initialized
+ pthread_mutex_lock(&mq_mutex);
+ if(dlt_user.dlt_segmented_queue_read_handle < 0)
+ {
+ pthread_cond_wait(&mq_init_condition, &mq_mutex);
+ }
+ pthread_mutex_unlock(&mq_mutex);
+
ssize_t read = mq_receive(dlt_user.dlt_segmented_queue_read_handle, (char *)&data,
sizeof(s_segmented_data * ), NULL);
+
if(read != sizeof(s_segmented_data *))
{
dlt_log(LOG_ERR, "NWTSegmented: Error while reading queue.\n");
@@ -2071,9 +2133,15 @@ void dlt_user_trace_network_segmented_thread(void *unused)
}
/* Indicator just to try to flush the buffer */
- if(data->payload == (void *)DLT_DELAYED_RESEND_INDICATOR_PATTERN)
+ if(data->payload_len == DLT_DELAYED_RESEND_INDICATOR_PATTERN)
{
- dlt_user_log_resend_buffer();
+ // Sleep 100ms, to allow other process to read FIFO
+ usleep(100*1000);
+ if(dlt_user_log_resend_buffer() < 0)
+ {
+ // Requeue if still not empty
+ dlt_user_queue_resend();
+ }
free(data);
continue;
}
@@ -2172,6 +2240,13 @@ int dlt_user_trace_network_segmented(DltContext *handle, DltNetworkTraceType nw_
return -1;
}
+ /* Open queue if it is not open */
+ if(dlt_init_message_queue() < 0)
+ {
+ dlt_log(LOG_ERR, "NWTSegmented: Could not open queue.\n");
+ return -1;
+ }
+
/* Add to queue */
if(mq_send(dlt_user.dlt_segmented_queue_write_handle,
(char *)&thread_data, sizeof(s_segmented_data *), 1) < 0)
@@ -2584,6 +2659,33 @@ int dlt_user_log_init(DltContext *handle, DltContextData *log)
return 0;
}
+int dlt_user_queue_resend(void)
+{
+ /**
+ * Ask segmented thread to try emptying the buffer soon.
+ * This will be freed in dlt_user_trace_network_segmented_thread
+ * */
+ s_segmented_data *resend_data = malloc(sizeof(s_segmented_data));
+ resend_data->payload_len = DLT_DELAYED_RESEND_INDICATOR_PATTERN;
+
+ /* Open queue if it is not open */
+ if(dlt_init_message_queue() < 0)
+ {
+ dlt_log(LOG_ERR, "NWTSegmented: Could not open queue.\n");
+ return -1;
+ }
+
+ if(mq_send(dlt_user.dlt_segmented_queue_write_handle, (char *)&resend_data, sizeof(s_segmented_data *), 1) < 0)
+ {
+ dlt_log(LOG_ERR,"Could not request resending.\n");
+ dlt_log(LOG_ERR, strerror(errno));
+ free(resend_data);
+ DLT_SEM_FREE();
+ return -1;
+ }
+ return 0;
+}
+
DltReturnValue dlt_user_log_send_log(DltContextData *log, int mtype)
{
DltMessage msg;
@@ -2809,17 +2911,9 @@ DltReturnValue dlt_user_log_send_log(DltContextData *log, int mtype)
DLT_SEM_FREE();
- /**
- * Ask segmented thread to try emptying the buffer soon.
- * This will be freed in dlt_user_trace_network_segmented_thread
- * */
- s_segmented_data *resend_data = malloc(sizeof(s_segmented_data));
- resend_data->payload = (void *)DLT_DELAYED_RESEND_INDICATOR_PATTERN;
- if(mq_send(dlt_user.dlt_segmented_queue_write_handle, (char *)&resend_data, sizeof(s_segmented_data *), 1) < 0)
+ if(dlt_user_queue_resend() < 0)
{
- dlt_log(LOG_ERR,"Could not request resending.\n");
- dlt_log(LOG_ERR, strerror(errno));
- free(resend_data);
+ dlt_log(LOG_WARNING, "Failed to queue resending.\n");
}
}
@@ -2929,6 +3023,11 @@ int dlt_user_log_send_register_application(void)
}
DLT_SEM_FREE();
+
+ if(dlt_user_queue_resend() < 0)
+ {
+ dlt_log(LOG_WARNING, "Failed to queue resending.\n");
+ }
}
return 0;
@@ -3035,6 +3134,11 @@ int dlt_user_log_send_register_context(DltContextData *log)
}
DLT_SEM_FREE();
+
+ if(dlt_user_queue_resend() < 0)
+ {
+ dlt_log(LOG_WARNING, "Failed to queue resending.\n");
+ }
}
return 0;
diff --git a/src/tests/dlt-test-client.c b/src/tests/dlt-test-client.c
index 6f1e972..f99da3e 100755
--- a/src/tests/dlt-test-client.c
+++ b/src/tests/dlt-test-client.c
@@ -2398,7 +2398,6 @@ int dlt_testclient_message_callback(DltMessage *message, void *data)
/* If the payload is correct, the counter is increased by 1 */
if (message->extendedheader->noar==4)
{
- //TODO: CHECK ACTUAL CONTENT
type_info=0;
type_info_tmp=0;
length=0,length_tmp=0; /* the macro can set this variable to -1 */