/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.thrift; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class uses a single thread to set up non-blocking sockets to a set of remote servers * (hostname and port pairs), and sends a same request to all these servers. It then fetches * responses from servers. * *
Parameters: int maxRecvBufBytesPerServer - an upper limit for receive buffer size per server * (in byte). If a response from a server exceeds this limit, the client will not allocate memory or * read response data for it. * *
int fetchTimeoutSeconds - time limit for fetching responses from all servers (in second). * After the timeout, the fetch job is stopped and available responses are returned. * *
ByteBuffer requestBuf - request message that is sent to all servers. * *
Output: Responses are stored in an array of ByteBuffers. Index of elements in this array * corresponds to index of servers in the server list. Content in a ByteBuffer may be in one of the * following forms: 1. First 4 bytes form an integer indicating length of following data, then * followed by the data. 2. First 4 bytes form an integer indicating length of following data, then * followed by nothing - this happens when the response data size exceeds maxRecvBufBytesPerServer, * and the client will not read any response data. 3. No data in the ByteBuffer - this happens when * the server does not return any response within fetchTimeoutSeconds. * *
In some special cases (no servers are given, fetchTimeoutSeconds less than or equal to 0, * requestBuf is null), the return is null. * *
Note: It assumes all remote servers are TNonblockingServers and use TFramedTransport.
*/
public class TNonblockingMultiFetchClient {
private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingMultiFetchClient.class);
// if the size of the response msg exceeds this limit (in byte), we will
// not read the msg
private int maxRecvBufBytesPerServer;
// time limit for fetching data from all servers (in second)
private int fetchTimeoutSeconds;
// store request that will be sent to servers
private ByteBuffer requestBuf;
private ByteBuffer requestBufDuplication;
// a list of remote servers
private List Server responses are stored in TNonblocingMultiFetchClient.recvBuf, and fetch statistics
* is in TNonblockingMultiFetchClient.stats.
*
* Sanity check for parameters has been done in TNonblockingMultiFetchClient before calling
* this function.
*/
public void run() {
long t1 = System.currentTimeMillis();
int numTotalServers = servers.size();
stats.setNumTotalServers(numTotalServers);
// buffer for receiving response from servers
recvBuf = new ByteBuffer[numTotalServers];
// buffer for sending request
ByteBuffer[] sendBuf = new ByteBuffer[numTotalServers];
long[] numBytesRead = new long[numTotalServers];
int[] frameSize = new int[numTotalServers];
boolean[] hasReadFrameSize = new boolean[numTotalServers];
try {
selector = Selector.open();
} catch (IOException ioe) {
LOGGER.error("Selector opens error", ioe);
return;
}
for (int i = 0; i < numTotalServers; i++) {
// create buffer to send request to server.
sendBuf[i] = requestBuf.duplicate();
// create buffer to read response's frame size from server
recvBuf[i] = ByteBuffer.allocate(4);
stats.incTotalRecvBufBytes(4);
InetSocketAddress server = servers.get(i);
SocketChannel s = null;
SelectionKey key = null;
try {
s = SocketChannel.open();
s.configureBlocking(false);
// now this method is non-blocking
s.connect(server);
key = s.register(selector, s.validOps());
// attach index of the key
key.attach(i);
} catch (Exception e) {
stats.incNumConnectErrorServers();
LOGGER.error("Set up socket to server {} error", server, e);
// free resource
if (s != null) {
try {
s.close();
} catch (Exception ex) {
}
}
if (key != null) {
key.cancel();
}
}
}
// wait for events
while (stats.getNumReadCompletedServers() + stats.getNumConnectErrorServers()
< stats.getNumTotalServers()) {
// if the thread is interrupted (e.g., task is cancelled)
if (Thread.currentThread().isInterrupted()) {
return;
}
try {
selector.select();
} catch (Exception e) {
LOGGER.error("Selector selects error", e);
continue;
}
Iterator