summaryrefslogtreecommitdiff
path: root/lib/cpp/src/transport/TFileTransport.h
blob: fbaf2cd0d268134ea83ff2d86e9b28c1c63ff6da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1

#include "TTransport.h"
#include "Thrift.h"
#include "TProcessor.h"

#include <string>
#include <stdio.h>

#include <boost/shared_ptr.hpp>

namespace apache { namespace thrift { namespace transport {

using apache::thrift::TProcessor;
using apache::thrift::protocol::TProtocolFactory;

// Data pertaining to a single event
typedef struct eventInfo {
  uint8_t* eventBuff_;
  uint32_t eventSize_;
  uint32_t eventBuffPos_;

  eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
  ~eventInfo() {
    if (eventBuff_) {
      delete[] eventBuff_;
    }
  }
} eventInfo;

// information about current read state
typedef struct readState {
  eventInfo* event_;

  // keep track of event size
  uint8_t   eventSizeBuff_[4];
  uint8_t   eventSizeBuffPos_;
  bool      readingSize_;

  // read buffer variables
  int32_t  bufferPtr_;
  int32_t  bufferLen_;

  // last successful dispatch point
  int32_t lastDispatchPtr_;

  void resetState(uint32_t lastDispatchPtr) {
    readingSize_ = true;
    eventSizeBuffPos_ = 0;
    lastDispatchPtr_ = lastDispatchPtr;
  }

  void resetAllValues() {
    resetState(0);
    bufferPtr_ = 0;
    bufferLen_ = 0;
    if (event_) {
      delete(event_);
    }
    event_ = 0;
  }

  readState() {
    event_ = 0;
   resetAllValues();
  }

  ~readState() {
    if (event_) {
      delete(event_);
    }
  }

} readState;

/**
 * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
 * to be written to disk.  Should be used in the following way:
 *  1) Buffer created
 *  2) Buffer written to (addEvent)
 *  3) Buffer read from (getNext)
 *  4) Buffer reset (reset)
 *  5) Go back to 2, or destroy buffer
 *
 * The buffer should never be written to after it is read from, unless it is reset first.
 * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
 *       which uses the buffer in this way.
 *
 */
class TFileTransportBuffer {
  public:
    TFileTransportBuffer(uint32_t size);
    ~TFileTransportBuffer();

    bool addEvent(eventInfo *event);
    eventInfo* getNext();
    void reset();
    bool isFull();
    bool isEmpty();

  private:
    TFileTransportBuffer(); // should not be used

    enum mode {
      WRITE,
      READ
    };
    mode bufferMode_;

    uint32_t writePoint_;
    uint32_t readPoint_;
    uint32_t size_;
    eventInfo** buffer_;
};

/**
 * Abstract interface for transports used to read files
 */
class TFileReaderTransport : virtual public TTransport {
 public:
  virtual int32_t getReadTimeout() = 0;
  virtual void setReadTimeout(int32_t readTimeout) = 0;

  virtual uint32_t getNumChunks() = 0;
  virtual uint32_t getCurChunk() = 0;
  virtual void seekToChunk(int32_t chunk) = 0;
  virtual void seekToEnd() = 0;
};

/**
 * Abstract interface for transports used to write files
 */
class TFileWriterTransport : virtual public TTransport {
 public:
  virtual uint32_t getChunkSize() = 0;
  virtual void setChunkSize(uint32_t chunkSize) = 0;
};

/**
 * File implementation of a transport. Reads and writes are done to a
 * file on disk.
 *
 */
class TFileTransport : public TFileReaderTransport,
                       public TFileWriterTransport {
 public:
  TFileTransport(std::string path, bool readOnly=false);
  ~TFileTransport();

  // TODO: what is the correct behaviour for this?
  // the log file is generally always open
  bool isOpen() {
    return true;
  }

  void write(const uint8_t* buf, uint32_t len);
  void flush();

  uint32_t readAll(uint8_t* buf, uint32_t len);
  uint32_t read(uint8_t* buf, uint32_t len);

  // log-file specific functions
  void seekToChunk(int32_t chunk);
  void seekToEnd();
  uint32_t getNumChunks();
  uint32_t getCurChunk();

  // for changing the output file
  void resetOutputFile(int fd, std::string filename, int64_t offset);

  // Setter/Getter functions for user-controllable options
  void setReadBuffSize(uint32_t readBuffSize) {
    if (readBuffSize) {
      readBuffSize_ = readBuffSize;
    }
  }
  uint32_t getReadBuffSize() {
    return readBuffSize_;
  }

  static const int32_t TAIL_READ_TIMEOUT = -1;
  static const int32_t NO_TAIL_READ_TIMEOUT = 0;
  void setReadTimeout(int32_t readTimeout) {
    readTimeout_ = readTimeout;
  }
  int32_t getReadTimeout() {
    return readTimeout_;
  }

  void setChunkSize(uint32_t chunkSize) {
    if (chunkSize) {
      chunkSize_ = chunkSize;
    }
  }
  uint32_t getChunkSize() {
    return chunkSize_;
  }

  void setEventBufferSize(uint32_t bufferSize) {
    if (bufferAndThreadInitialized_) {
      GlobalOutput("Cannot change the buffer size after writer thread started");
      return;
    }
    eventBufferSize_ = bufferSize;
  }

  uint32_t getEventBufferSize() {
    return eventBufferSize_;
  }

  void setFlushMaxUs(uint32_t flushMaxUs) {
    if (flushMaxUs) {
      flushMaxUs_ = flushMaxUs;
    }
  }
  uint32_t getFlushMaxUs() {
    return flushMaxUs_;
  }

  void setFlushMaxBytes(uint32_t flushMaxBytes) {
    if (flushMaxBytes) {
      flushMaxBytes_ = flushMaxBytes;
    }
  }
  uint32_t getFlushMaxBytes() {
    return flushMaxBytes_;
  }

  void setMaxEventSize(uint32_t maxEventSize) {
    maxEventSize_ = maxEventSize;
  }
  uint32_t getMaxEventSize() {
    return maxEventSize_;
  }

  void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
    maxCorruptedEvents_ = maxCorruptedEvents;
  }
  uint32_t getMaxCorruptedEvents() {
    return maxCorruptedEvents_;
  }

  void setEofSleepTimeUs(uint32_t eofSleepTime) {
    if (eofSleepTime) {
      eofSleepTime_ = eofSleepTime;
    }
  }
  uint32_t getEofSleepTimeUs() {
    return eofSleepTime_;
  }

 private:
  // helper functions for writing to a file
  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
  bool swapEventBuffers(struct timespec* deadline);
  bool initBufferAndWriteThread();

  // control for writer thread
  static void* startWriterThread(void* ptr) {
    (((TFileTransport*)ptr)->writerThread());
    return 0;
  }
  void writerThread();

  // helper functions for reading from a file
  eventInfo* readEvent();

  // event corruption-related functions
  bool isEventCorrupted();
  void performRecovery();

  // Utility functions
  void openLogFile();
  void getNextFlushTime(struct timespec* ts_next_flush);

  // Class variables
  readState readState_;
  uint8_t* readBuff_;
  eventInfo* currentEvent_;

  uint32_t readBuffSize_;
  static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;

  int32_t readTimeout_;
  static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;

  // size of chunks that file will be split up into
  uint32_t chunkSize_;
  static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;

  // size of event buffers
  uint32_t eventBufferSize_;
  static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;

  // max number of microseconds that can pass without flushing
  uint32_t flushMaxUs_;
  static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;

  // max number of bytes that can be written without flushing
  uint32_t flushMaxBytes_;
  static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;

  // max event size
  uint32_t maxEventSize_;
  static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;

  // max number of corrupted events per chunk
  uint32_t maxCorruptedEvents_;
  static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;

  // sleep duration when EOF is hit
  uint32_t eofSleepTime_;
  static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;

  // sleep duration when a corrupted event is encountered
  uint32_t corruptedEventSleepTime_;
  static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;

  // writer thread id
  pthread_t writerThreadId_;

  // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
  // needs to be written to the file.  The buffers are swapped by the writer thread.
  TFileTransportBuffer *dequeueBuffer_;
  TFileTransportBuffer *enqueueBuffer_;

  // conditions used to block when the buffer is full or empty
  pthread_cond_t notFull_, notEmpty_;
  volatile bool closing_;

  // To keep track of whether the buffer has been flushed
  pthread_cond_t flushed_;
  volatile bool forceFlush_;

  // Mutex that is grabbed when enqueueing and swapping the read/write buffers
  pthread_mutex_t mutex_;

  // File information
  std::string filename_;
  int fd_;

  // Whether the writer thread and buffers have been initialized
  bool bufferAndThreadInitialized_;

  // Offset within the file
  off_t offset_;

  // event corruption information
  uint32_t lastBadChunk_;
  uint32_t numCorruptedEventsInChunk_;

  bool readOnly_;
};

// Exception thrown when EOF is hit
class TEOFException : public TTransportException {
 public:
  TEOFException():
    TTransportException(TTransportException::END_OF_FILE) {};
};


// wrapper class to process events from a file containing thrift events
class TFileProcessor {
 public:
  /**
   * Constructor that defaults output transport to null transport
   *
   * @param processor processes log-file events
   * @param protocolFactory protocol factory
   * @param inputTransport file transport
   */
  TFileProcessor(boost::shared_ptr<TProcessor> processor,
                 boost::shared_ptr<TProtocolFactory> protocolFactory,
                 boost::shared_ptr<TFileReaderTransport> inputTransport);

  TFileProcessor(boost::shared_ptr<TProcessor> processor,
                 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
                 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
                 boost::shared_ptr<TFileReaderTransport> inputTransport);

  /**
   * Constructor
   *
   * @param processor processes log-file events
   * @param protocolFactory protocol factory
   * @param inputTransport input file transport
   * @param output output transport
   */
  TFileProcessor(boost::shared_ptr<TProcessor> processor,
                 boost::shared_ptr<TProtocolFactory> protocolFactory,
                 boost::shared_ptr<TFileReaderTransport> inputTransport,
                 boost::shared_ptr<TTransport> outputTransport);

  /**
   * processes events from the file
   *
   * @param numEvents number of events to process (0 for unlimited)
   * @param tail tails the file if true
   */
  void process(uint32_t numEvents, bool tail);

  /**
   * process events until the end of the chunk
   *
   */
  void processChunk();

 private:
  boost::shared_ptr<TProcessor> processor_;
  boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
  boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
  boost::shared_ptr<TFileReaderTransport> inputTransport_;
  boost::shared_ptr<TTransport> outputTransport_;
};


}}} // apache::thrift::transport

#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_