summaryrefslogtreecommitdiff
path: root/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java
blob: 01b15e841d5d3ab7098036f2d1aad87c3e8adf1f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.thrift.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Provides common methods and classes used by nonblocking TServer implementations. */
public abstract class AbstractNonblockingServer extends TServer {
  protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());

  public abstract static class AbstractNonblockingServerArgs<
          T extends AbstractNonblockingServerArgs<T>>
      extends AbstractServerArgs<T> {
    public long maxReadBufferBytes = 256 * 1024 * 1024;

    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
      super(transport);
      transportFactory(new TFramedTransport.Factory());
    }
  }

  /**
   * The maximum amount of memory we will allocate to client IO buffers at a time. Without this
   * limit, the server will gladly allocate client buffers right into an out of memory exception,
   * rather than waiting.
   */
  final long MAX_READ_BUFFER_BYTES;

  /** How many bytes are currently allocated to read buffers. */
  final AtomicLong readBufferBytesAllocated = new AtomicLong(0);

  public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
    super(args);
    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
  }

  /** Begin accepting connections and processing invocations. */
  public void serve() {
    // start any IO threads
    if (!startThreads()) {
      return;
    }

    // start listening, or exit
    if (!startListening()) {
      return;
    }

    setServing(true);

    // this will block while we serve
    waitForShutdown();

    setServing(false);

    // do a little cleanup
    stopListening();
  }

  /**
   * Starts any threads required for serving.
   *
   * @return true if everything went ok, false if threads could not be started.
   */
  protected abstract boolean startThreads();

  /** A method that will block until when threads handling the serving have been shut down. */
  protected abstract void waitForShutdown();

  /**
   * Have the server transport start accepting connections.
   *
   * @return true if we started listening successfully, false if something went wrong.
   */
  protected boolean startListening() {
    try {
      serverTransport_.listen();
      return true;
    } catch (TTransportException ttx) {
      LOGGER.error("Failed to start listening on server socket!", ttx);
      return false;
    }
  }

  /** Stop listening for connections. */
  protected void stopListening() {
    serverTransport_.close();
  }

  /**
   * Perform an invocation. This method could behave several different ways - invoke immediately
   * inline, queue for separate execution, etc.
   *
   * @return true if invocation was successfully requested, which is not a guarantee that invocation
   *     has completed. False if the request failed.
   */
  protected abstract boolean requestInvoke(FrameBuffer frameBuffer);

  /**
   * An abstract thread that handles selecting on a set of transports and {@link FrameBuffer
   * FrameBuffers} associated with selected keys corresponding to requests.
   */
  protected abstract class AbstractSelectThread extends Thread {
    protected Selector selector;

    // List of FrameBuffers that want to change their selection interests.
    protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();

    public AbstractSelectThread() throws IOException {
      this.selector = SelectorProvider.provider().openSelector();
    }

    /** If the selector is blocked, wake it up. */
    public void wakeupSelector() {
      selector.wakeup();
    }

    /**
     * Add FrameBuffer to the list of select interest changes and wake up the selector if it's
     * blocked. When the select() call exits, it'll give the FrameBuffer a chance to change its
     * interests.
     */
    public void requestSelectInterestChange(FrameBuffer frameBuffer) {
      synchronized (selectInterestChanges) {
        selectInterestChanges.add(frameBuffer);
      }
      // wakeup the selector, if it's currently blocked.
      selector.wakeup();
    }

    /**
     * Check to see if there are any FrameBuffers that have switched their interest type from read
     * to write or vice versa.
     */
    protected void processInterestChanges() {
      synchronized (selectInterestChanges) {
        for (FrameBuffer fb : selectInterestChanges) {
          fb.changeSelectInterests();
        }
        selectInterestChanges.clear();
      }
    }

    /**
     * Do the work required to read from a readable client. If the frame is fully read, then invoke
     * the method call.
     */
    protected void handleRead(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (!buffer.read()) {
        cleanupSelectionKey(key);
        return;
      }

      // if the buffer's frame read is complete, invoke the method.
      if (buffer.isFrameFullyRead() && !requestInvoke(buffer)) {
        cleanupSelectionKey(key);
      }
    }

    /** Let a writable client get written, if there's data to be written. */
    protected void handleWrite(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (!buffer.write()) {
        cleanupSelectionKey(key);
      }
    }

    /** Do connection-close cleanup on a given SelectionKey. */
    protected void cleanupSelectionKey(SelectionKey key) {
      // remove the records from the two maps
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (buffer != null) {
        // close the buffer
        buffer.close();
      }
      // cancel the selection key
      key.cancel();
    }
  } // SelectThread

  /** Possible states for the FrameBuffer state machine. */
  private enum FrameBufferState {
    // in the midst of reading the frame size off the wire
    READING_FRAME_SIZE,
    // reading the actual frame data now, but not all the way done yet
    READING_FRAME,
    // completely read the frame, so an invocation can now happen
    READ_FRAME_COMPLETE,
    // waiting to get switched to listening for write events
    AWAITING_REGISTER_WRITE,
    // started writing response data, not fully complete yet
    WRITING,
    // another thread wants this framebuffer to go back to reading
    AWAITING_REGISTER_READ,
    // we want our transport and selection key invalidated in the selector
    // thread
    AWAITING_CLOSE
  }

  /**
   * Class that implements a sort of state machine around the interaction with a client and an
   * invoker. It manages reading the frame size and frame data, getting it handed off as wrapped
   * transports, and then the writing of response data back to the client. In the process it manages
   * flipping the read and write bits on the selection key for its client.
   */
  public class FrameBuffer {
    private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());

    // the actual transport hooked up to the client.
    protected final TNonblockingTransport trans_;

    // the SelectionKey that corresponds to our transport
    protected final SelectionKey selectionKey_;

    // the SelectThread that owns the registration of our transport
    protected final AbstractSelectThread selectThread_;

    // where in the process of reading/writing are we?
    protected FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;

    // the ByteBuffer we'll be using to write and read, depending on the state
    protected ByteBuffer buffer_;

    protected final TByteArrayOutputStream response_;

    // the frame that the TTransport should wrap.
    protected final TMemoryInputTransport frameTrans_;

    // the transport that should be used to connect to clients
    protected final TTransport inTrans_;

    protected final TTransport outTrans_;

    // the input protocol to use on frames
    protected final TProtocol inProt_;

    // the output protocol to use on frames
    protected final TProtocol outProt_;

    // context associated with this connection
    protected final ServerContext context_;

    public FrameBuffer(
        final TNonblockingTransport trans,
        final SelectionKey selectionKey,
        final AbstractSelectThread selectThread)
        throws TTransportException {
      trans_ = trans;
      selectionKey_ = selectionKey;
      selectThread_ = selectThread;
      buffer_ = ByteBuffer.allocate(4);

      frameTrans_ = new TMemoryInputTransport();
      response_ = new TByteArrayOutputStream();
      inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
      outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
      inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
      outProt_ = outputProtocolFactory_.getProtocol(outTrans_);

      if (eventHandler_ != null) {
        context_ = eventHandler_.createContext(inProt_, outProt_);
      } else {
        context_ = null;
      }
    }

    /**
     * Give this FrameBuffer a chance to read. The selector loop should have received a read event
     * for this FrameBuffer.
     *
     * @return true if the connection should live on, false if it should be closed
     */
    public boolean read() {
      if (state_ == FrameBufferState.READING_FRAME_SIZE) {
        // try to read the frame size completely
        if (!internalRead()) {
          return false;
        }

        // if the frame size has been read completely, then prepare to read the
        // actual frame.
        if (buffer_.remaining() == 0) {
          // pull out the frame size as an integer.
          int frameSize = buffer_.getInt(0);
          if (frameSize <= 0) {
            LOGGER.error(
                "Read an invalid frame size of "
                    + frameSize
                    + ". Are you using TFramedTransport on the client side?");
            return false;
          }

          // if this frame will always be too large for this server, log the
          // error and close the connection.
          if (frameSize > trans_.getMaxFrameSize()) {
            LOGGER.error(
                "Read a frame size of "
                    + frameSize
                    + ", which is bigger than the maximum allowable frame size "
                    + trans_.getMaxFrameSize()
                    + " for ALL connections.");
            return false;
          }

          // if this frame will push us over the memory limit, then return.
          // with luck, more memory will free up the next time around.
          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
            return true;
          }

          // increment the amount of memory allocated to read buffers
          readBufferBytesAllocated.addAndGet(frameSize + 4);

          // reallocate the readbuffer as a frame-sized buffer
          buffer_ = ByteBuffer.allocate(frameSize + 4);
          buffer_.putInt(frameSize);

          state_ = FrameBufferState.READING_FRAME;
        } else {
          // this skips the check of READING_FRAME state below, since we can't
          // possibly go on to that state if there's data left to be read at
          // this one.
          return true;
        }
      }

      // it is possible to fall through from the READING_FRAME_SIZE section
      // to READING_FRAME if there's already some frame data available once
      // READING_FRAME_SIZE is complete.

      if (state_ == FrameBufferState.READING_FRAME) {
        if (!internalRead()) {
          return false;
        }

        // since we're already in the select loop here for sure, we can just
        // modify our selection key directly.
        if (buffer_.remaining() == 0) {
          // get rid of the read select interests
          selectionKey_.interestOps(0);
          state_ = FrameBufferState.READ_FRAME_COMPLETE;
        }

        return true;
      }

      // if we fall through to this point, then the state must be invalid.
      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
      return false;
    }

    /** Give this FrameBuffer a chance to write its output to the final client. */
    public boolean write() {
      if (state_ == FrameBufferState.WRITING) {
        try {
          if (trans_.write(buffer_) < 0) {
            return false;
          }
        } catch (TTransportException e) {
          LOGGER.warn("Got an Exception during write", e);
          return false;
        }

        // we're done writing. now we need to switch back to reading.
        if (buffer_.remaining() == 0) {
          prepareRead();
        }
        return true;
      }

      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
      return false;
    }

    /** Give this FrameBuffer a chance to set its interest to write, once data has come in. */
    public void changeSelectInterests() {
      switch (state_) {
        case AWAITING_REGISTER_WRITE:
          // set the OP_WRITE interest
          selectionKey_.interestOps(SelectionKey.OP_WRITE);
          state_ = FrameBufferState.WRITING;
          break;
        case AWAITING_REGISTER_READ:
          prepareRead();
          break;
        case AWAITING_CLOSE:
          close();
          selectionKey_.cancel();
          break;
        default:
          LOGGER.error("changeSelectInterest was called, but state is invalid ({})", state_);
      }
    }

    /** Shut the connection down. */
    public void close() {
      // if we're being closed due to an error, we might have allocated a
      // buffer that we need to subtract for our memory accounting.
      if (state_ == FrameBufferState.READING_FRAME
          || state_ == FrameBufferState.READ_FRAME_COMPLETE
          || state_ == FrameBufferState.AWAITING_CLOSE) {
        readBufferBytesAllocated.addAndGet(-buffer_.array().length);
      }
      trans_.close();
      if (eventHandler_ != null) {
        eventHandler_.deleteContext(context_, inProt_, outProt_);
      }
    }

    /** Check if this FrameBuffer has a full frame read. */
    public boolean isFrameFullyRead() {
      return state_ == FrameBufferState.READ_FRAME_COMPLETE;
    }

    /**
     * After the processor has processed the invocation, whatever thread is managing invocations
     * should call this method on this FrameBuffer so we know it's time to start trying to write
     * again. Also, if it turns out that there actually isn't any data in the response buffer, we'll
     * skip trying to write and instead go back to reading.
     */
    public void responseReady() {
      // the read buffer is definitely no longer in use, so we will decrement
      // our read buffer count. we do this here as well as in close because
      // we'd like to free this read memory up as quickly as possible for other
      // clients.
      readBufferBytesAllocated.addAndGet(-buffer_.array().length);

      if (response_.len() == 0) {
        // go straight to reading again. this was probably an oneway method
        state_ = FrameBufferState.AWAITING_REGISTER_READ;
        buffer_ = null;
      } else {
        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());

        // set state that we're waiting to be switched to write. we do this
        // asynchronously through requestSelectInterestChange() because there is
        // a possibility that we're not in the main thread, and thus currently
        // blocked in select(). (this functionality is in place for the sake of
        // the HsHa server.)
        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
      }
      requestSelectInterestChange();
    }

    /** Actually invoke the method signified by this FrameBuffer. */
    public void invoke() {
      frameTrans_.reset(buffer_.array());
      response_.reset();

      try {
        if (eventHandler_ != null) {
          eventHandler_.processContext(context_, inTrans_, outTrans_);
        }
        processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
        responseReady();
        return;
      } catch (TException te) {
        LOGGER.warn("Exception while invoking!", te);
      } catch (Throwable t) {
        LOGGER.error("Unexpected throwable while invoking!", t);
      }
      // This will only be reached when there is a throwable.
      state_ = FrameBufferState.AWAITING_CLOSE;
      requestSelectInterestChange();
    }

    /**
     * Perform a read into buffer.
     *
     * @return true if the read succeeded, false if there was an error or the connection closed.
     */
    private boolean internalRead() {
      try {
        return trans_.read(buffer_) >= 0;
      } catch (TTransportException e) {
        LOGGER.warn("Got an Exception in internalRead", e);
        return false;
      }
    }

    /** We're done writing, so reset our interest ops and change state accordingly. */
    private void prepareRead() {
      // we can set our interest directly without using the queue because
      // we're in the select thread.
      selectionKey_.interestOps(SelectionKey.OP_READ);
      // get ready for another go-around
      buffer_ = ByteBuffer.allocate(4);
      state_ = FrameBufferState.READING_FRAME_SIZE;
    }

    /**
     * When this FrameBuffer needs to change its select interests and execution might not be in its
     * select thread, then this method will make sure the interest change gets done when the select
     * thread wakes back up. When the current thread is this FrameBuffer's select thread, then it
     * just does the interest change immediately.
     */
    protected void requestSelectInterestChange() {
      if (Thread.currentThread() == this.selectThread_) {
        changeSelectInterests();
      } else {
        this.selectThread_.requestSelectInterestChange(this);
      }
    }
  } // FrameBuffer

  public class AsyncFrameBuffer extends FrameBuffer {
    public AsyncFrameBuffer(
        TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread)
        throws TTransportException {
      super(trans, selectionKey, selectThread);
    }

    public TProtocol getInputProtocol() {
      return inProt_;
    }

    public TProtocol getOutputProtocol() {
      return outProt_;
    }

    public void invoke() {
      frameTrans_.reset(buffer_.array());
      response_.reset();

      try {
        if (eventHandler_ != null) {
          eventHandler_.processContext(context_, inTrans_, outTrans_);
        }
        ((TAsyncProcessor) processorFactory_.getProcessor(inTrans_)).process(this);
        return;
      } catch (TException te) {
        LOGGER.warn("Exception while invoking!", te);
      } catch (Throwable t) {
        LOGGER.error("Unexpected throwable while invoking!", t);
      }
      // This will only be reached when there is a throwable.
      state_ = FrameBufferState.AWAITING_CLOSE;
      requestSelectInterestChange();
    }
  }
}