summaryrefslogtreecommitdiff
path: root/lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java
diff options
context:
space:
mode:
authorJiayu Liu <Jimexist@users.noreply.github.com>2022-05-06 12:56:42 +0800
committerGitHub <noreply@github.com>2022-05-06 00:56:42 -0400
commit53ec08228a14130909d4dd6fc2c98f47a09d76b0 (patch)
tree19fd3589502b5471be23efe4d1190ad26fcbc91e /lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java
parent23b86364ef3dbccc19f5f3828e6d115f7b015651 (diff)
downloadthrift-53ec08228a14130909d4dd6fc2c98f47a09d76b0.tar.gz
THRIFT-5568: enforce consistent Java formatting (#2581)
* use spotless plugin and google-java-format to enforce a consistent code format * add a step of spotless check before building * only run spotless on the src/ directory Co-authored-by: Christopher Tubbs <ctubbsii@apache.org>
Diffstat (limited to 'lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java')
-rw-r--r--lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java157
1 files changed, 74 insertions, 83 deletions
diff --git a/lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java b/lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java
index 13e8031b6..034cc8599 100644
--- a/lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java
+++ b/lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java
@@ -1,25 +1,23 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.thrift;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -35,49 +33,39 @@ import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This class uses a single thread to set up non-blocking sockets to a set
- * of remote servers (hostname and port pairs), and sends a same request to
- * all these servers. It then fetches responses from servers.
+ * This class uses a single thread to set up non-blocking sockets to a set of remote servers
+ * (hostname and port pairs), and sends a same request to all these servers. It then fetches
+ * responses from servers.
*
- * Parameters:
- * int maxRecvBufBytesPerServer - an upper limit for receive buffer size
- * per server (in byte). If a response from a server exceeds this limit, the
- * client will not allocate memory or read response data for it.
+ * <p>Parameters: int maxRecvBufBytesPerServer - an upper limit for receive buffer size per server
+ * (in byte). If a response from a server exceeds this limit, the client will not allocate memory or
+ * read response data for it.
*
- * int fetchTimeoutSeconds - time limit for fetching responses from all
- * servers (in second). After the timeout, the fetch job is stopped and
- * available responses are returned.
+ * <p>int fetchTimeoutSeconds - time limit for fetching responses from all servers (in second).
+ * After the timeout, the fetch job is stopped and available responses are returned.
*
- * ByteBuffer requestBuf - request message that is sent to all servers.
+ * <p>ByteBuffer requestBuf - request message that is sent to all servers.
*
- * Output:
- * Responses are stored in an array of ByteBuffers. Index of elements in
- * this array corresponds to index of servers in the server list. Content in
- * a ByteBuffer may be in one of the following forms:
- * 1. First 4 bytes form an integer indicating length of following data,
- * then followed by the data.
- * 2. First 4 bytes form an integer indicating length of following data,
- * then followed by nothing - this happens when the response data size
- * exceeds maxRecvBufBytesPerServer, and the client will not read any
- * response data.
- * 3. No data in the ByteBuffer - this happens when the server does not
- * return any response within fetchTimeoutSeconds.
+ * <p>Output: Responses are stored in an array of ByteBuffers. Index of elements in this array
+ * corresponds to index of servers in the server list. Content in a ByteBuffer may be in one of the
+ * following forms: 1. First 4 bytes form an integer indicating length of following data, then
+ * followed by the data. 2. First 4 bytes form an integer indicating length of following data, then
+ * followed by nothing - this happens when the response data size exceeds maxRecvBufBytesPerServer,
+ * and the client will not read any response data. 3. No data in the ByteBuffer - this happens when
+ * the server does not return any response within fetchTimeoutSeconds.
*
- * In some special cases (no servers are given, fetchTimeoutSeconds less
- * than or equal to 0, requestBuf is null), the return is null.
- *
- * Note:
- * It assumes all remote servers are TNonblockingServers and use
- * TFramedTransport.
+ * <p>In some special cases (no servers are given, fetchTimeoutSeconds less than or equal to 0,
+ * requestBuf is null), the return is null.
*
+ * <p>Note: It assumes all remote servers are TNonblockingServers and use TFramedTransport.
*/
public class TNonblockingMultiFetchClient {
- private static final Logger LOGGER = LoggerFactory.getLogger(
- TNonblockingMultiFetchClient.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingMultiFetchClient.class);
// if the size of the response msg exceeds this limit (in byte), we will
// not read the msg
@@ -97,9 +85,11 @@ public class TNonblockingMultiFetchClient {
private TNonblockingMultiFetchStats stats;
private ByteBuffer[] recvBuf;
- public TNonblockingMultiFetchClient(int maxRecvBufBytesPerServer,
- int fetchTimeoutSeconds, ByteBuffer requestBuf,
- List<InetSocketAddress> servers) {
+ public TNonblockingMultiFetchClient(
+ int maxRecvBufBytesPerServer,
+ int fetchTimeoutSeconds,
+ ByteBuffer requestBuf,
+ List<InetSocketAddress> servers) {
this.maxRecvBufBytesPerServer = maxRecvBufBytesPerServer;
this.fetchTimeoutSeconds = fetchTimeoutSeconds;
this.requestBuf = requestBuf;
@@ -154,8 +144,7 @@ public class TNonblockingMultiFetchClient {
recvBuf = null;
stats.clear();
- if (servers == null || servers.size() == 0 ||
- requestBuf == null || fetchTimeoutSeconds <= 0) {
+ if (servers == null || servers.size() == 0 || requestBuf == null || fetchTimeoutSeconds <= 0) {
return recvBuf;
}
@@ -185,9 +174,8 @@ public class TNonblockingMultiFetchClient {
}
/**
- * Private class that does real fetch job.
- * Users are not allowed to directly use this class, as its run()
- * function may run forever.
+ * Private class that does real fetch job. Users are not allowed to directly use this class, as
+ * its run() function may run forever.
*/
private class MultiFetch implements Runnable {
private Selector selector;
@@ -195,11 +183,11 @@ public class TNonblockingMultiFetchClient {
/**
* main entry function for fetching.
*
- * Server responses are stored in TNonblocingMultiFetchClient.recvBuf,
- * and fetch statistics is in TNonblockingMultiFetchClient.stats.
+ * <p>Server responses are stored in TNonblocingMultiFetchClient.recvBuf, and fetch statistics
+ * is in TNonblockingMultiFetchClient.stats.
*
- * Sanity check for parameters has been done in
- * TNonblockingMultiFetchClient before calling this function.
+ * <p>Sanity check for parameters has been done in TNonblockingMultiFetchClient before calling
+ * this function.
*/
public void run() {
long t1 = System.currentTimeMillis();
@@ -208,7 +196,7 @@ public class TNonblockingMultiFetchClient {
stats.setNumTotalServers(numTotalServers);
// buffer for receiving response from servers
- recvBuf = new ByteBuffer[numTotalServers];
+ recvBuf = new ByteBuffer[numTotalServers];
// buffer for sending request
ByteBuffer[] sendBuf = new ByteBuffer[numTotalServers];
long[] numBytesRead = new long[numTotalServers];
@@ -246,23 +234,26 @@ public class TNonblockingMultiFetchClient {
// free resource
if (s != null) {
- try {s.close();} catch (Exception ex) {}
+ try {
+ s.close();
+ } catch (Exception ex) {
+ }
}
if (key != null) {
- key.cancel();
+ key.cancel();
}
}
}
// wait for events
- while (stats.getNumReadCompletedServers() +
- stats.getNumConnectErrorServers() < stats.getNumTotalServers()) {
+ while (stats.getNumReadCompletedServers() + stats.getNumConnectErrorServers()
+ < stats.getNumTotalServers()) {
// if the thread is interrupted (e.g., task is cancelled)
if (Thread.currentThread().isInterrupted()) {
return;
}
- try{
+ try {
selector.select();
} catch (Exception e) {
LOGGER.error("Selector selects error", e);
@@ -275,13 +266,13 @@ public class TNonblockingMultiFetchClient {
it.remove();
// get previously attached index
- int index = (Integer)selKey.attachment();
+ int index = (Integer) selKey.attachment();
if (selKey.isValid() && selKey.isConnectable()) {
// if this socket throws an exception (e.g., connection refused),
// print error msg and skip it.
try {
- SocketChannel sChannel = (SocketChannel)selKey.channel();
+ SocketChannel sChannel = (SocketChannel) selKey.channel();
sChannel.finishConnect();
} catch (Exception e) {
stats.incNumConnectErrorServers();
@@ -294,7 +285,7 @@ public class TNonblockingMultiFetchClient {
// if this socket throws an exception, print error msg and
// skip it.
try {
- SocketChannel sChannel = (SocketChannel)selKey.channel();
+ SocketChannel sChannel = (SocketChannel) selKey.channel();
sChannel.write(sendBuf[index]);
} catch (Exception e) {
LOGGER.error("Socket {} writes to server {} error", index, servers.get(index), e);
@@ -306,34 +297,38 @@ public class TNonblockingMultiFetchClient {
// if this socket throws an exception, print error msg and
// skip it.
try {
- SocketChannel sChannel = (SocketChannel)selKey.channel();
+ SocketChannel sChannel = (SocketChannel) selKey.channel();
int bytesRead = sChannel.read(recvBuf[index]);
if (bytesRead > 0) {
numBytesRead[index] += bytesRead;
- if (!hasReadFrameSize[index] &&
- recvBuf[index].remaining()==0) {
+ if (!hasReadFrameSize[index] && recvBuf[index].remaining() == 0) {
// if the frame size has been read completely, then prepare
// to read the actual frame.
frameSize[index] = recvBuf[index].getInt(0);
if (frameSize[index] <= 0) {
stats.incNumInvalidFrameSize();
- LOGGER.error("Read an invalid frame size {} from {}. Does the server use TFramedTransport?",
- frameSize[index], servers.get(index));
+ LOGGER.error(
+ "Read an invalid frame size {} from {}. Does the server use TFramedTransport?",
+ frameSize[index],
+ servers.get(index));
sChannel.close();
continue;
}
if (frameSize[index] + 4 > stats.getMaxResponseBytes()) {
- stats.setMaxResponseBytes(frameSize[index]+4);
+ stats.setMaxResponseBytes(frameSize[index] + 4);
}
if (frameSize[index] + 4 > maxRecvBufBytesPerServer) {
stats.incNumOverflowedRecvBuf();
- LOGGER.error("Read frame size {} from {}, total buffer size would exceed limit {}",
- frameSize[index], servers.get(index), maxRecvBufBytesPerServer);
+ LOGGER.error(
+ "Read frame size {} from {}, total buffer size would exceed limit {}",
+ frameSize[index],
+ servers.get(index),
+ maxRecvBufBytesPerServer);
sChannel.close();
continue;
}
@@ -346,34 +341,30 @@ public class TNonblockingMultiFetchClient {
hasReadFrameSize[index] = true;
}
- if (hasReadFrameSize[index] &&
- numBytesRead[index] >= frameSize[index]+4) {
+ if (hasReadFrameSize[index] && numBytesRead[index] >= frameSize[index] + 4) {
// has read all data
sChannel.close();
stats.incNumReadCompletedServers();
long t2 = System.currentTimeMillis();
- stats.setReadTime(t2-t1);
+ stats.setReadTime(t2 - t1);
}
}
} catch (Exception e) {
- LOGGER.error("Socket {} reads from server {} error",
- index, servers.get(index), e);
+ LOGGER.error("Socket {} reads from server {} error", index, servers.get(index), e);
}
}
}
}
}
- /**
- * dispose any resource allocated
- */
+ /** dispose any resource allocated */
public void close() {
try {
if (selector.isOpen()) {
Iterator<SelectionKey> it = selector.keys().iterator();
while (it.hasNext()) {
SelectionKey selKey = it.next();
- SocketChannel sChannel = (SocketChannel)selKey.channel();
+ SocketChannel sChannel = (SocketChannel) selKey.channel();
sChannel.close();
}