diff options
author | Jiayu Liu <Jimexist@users.noreply.github.com> | 2022-05-06 12:56:42 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-06 00:56:42 -0400 |
commit | 53ec08228a14130909d4dd6fc2c98f47a09d76b0 (patch) | |
tree | 19fd3589502b5471be23efe4d1190ad26fcbc91e /lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java | |
parent | 23b86364ef3dbccc19f5f3828e6d115f7b015651 (diff) | |
download | thrift-53ec08228a14130909d4dd6fc2c98f47a09d76b0.tar.gz |
THRIFT-5568: enforce consistent Java formatting (#2581)
* use spotless plugin and google-java-format to enforce a consistent code format
* add a step of spotless check before building
* only run spotless on the src/ directory
Co-authored-by: Christopher Tubbs <ctubbsii@apache.org>
Diffstat (limited to 'lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java')
-rw-r--r-- | lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java | 157 |
1 files changed, 74 insertions, 83 deletions
diff --git a/lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java b/lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java index 13e8031b6..034cc8599 100644 --- a/lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java +++ b/lib/java/src/main/java/org/apache/thrift/TNonblockingMultiFetchClient.java @@ -1,25 +1,23 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.thrift; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -35,49 +33,39 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This class uses a single thread to set up non-blocking sockets to a set - * of remote servers (hostname and port pairs), and sends a same request to - * all these servers. It then fetches responses from servers. + * This class uses a single thread to set up non-blocking sockets to a set of remote servers + * (hostname and port pairs), and sends a same request to all these servers. It then fetches + * responses from servers. * - * Parameters: - * int maxRecvBufBytesPerServer - an upper limit for receive buffer size - * per server (in byte). If a response from a server exceeds this limit, the - * client will not allocate memory or read response data for it. + * <p>Parameters: int maxRecvBufBytesPerServer - an upper limit for receive buffer size per server + * (in byte). If a response from a server exceeds this limit, the client will not allocate memory or + * read response data for it. * - * int fetchTimeoutSeconds - time limit for fetching responses from all - * servers (in second). After the timeout, the fetch job is stopped and - * available responses are returned. + * <p>int fetchTimeoutSeconds - time limit for fetching responses from all servers (in second). + * After the timeout, the fetch job is stopped and available responses are returned. * - * ByteBuffer requestBuf - request message that is sent to all servers. + * <p>ByteBuffer requestBuf - request message that is sent to all servers. * - * Output: - * Responses are stored in an array of ByteBuffers. Index of elements in - * this array corresponds to index of servers in the server list. Content in - * a ByteBuffer may be in one of the following forms: - * 1. First 4 bytes form an integer indicating length of following data, - * then followed by the data. - * 2. First 4 bytes form an integer indicating length of following data, - * then followed by nothing - this happens when the response data size - * exceeds maxRecvBufBytesPerServer, and the client will not read any - * response data. - * 3. No data in the ByteBuffer - this happens when the server does not - * return any response within fetchTimeoutSeconds. + * <p>Output: Responses are stored in an array of ByteBuffers. Index of elements in this array + * corresponds to index of servers in the server list. Content in a ByteBuffer may be in one of the + * following forms: 1. First 4 bytes form an integer indicating length of following data, then + * followed by the data. 2. First 4 bytes form an integer indicating length of following data, then + * followed by nothing - this happens when the response data size exceeds maxRecvBufBytesPerServer, + * and the client will not read any response data. 3. No data in the ByteBuffer - this happens when + * the server does not return any response within fetchTimeoutSeconds. * - * In some special cases (no servers are given, fetchTimeoutSeconds less - * than or equal to 0, requestBuf is null), the return is null. - * - * Note: - * It assumes all remote servers are TNonblockingServers and use - * TFramedTransport. + * <p>In some special cases (no servers are given, fetchTimeoutSeconds less than or equal to 0, + * requestBuf is null), the return is null. * + * <p>Note: It assumes all remote servers are TNonblockingServers and use TFramedTransport. */ public class TNonblockingMultiFetchClient { - private static final Logger LOGGER = LoggerFactory.getLogger( - TNonblockingMultiFetchClient.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingMultiFetchClient.class); // if the size of the response msg exceeds this limit (in byte), we will // not read the msg @@ -97,9 +85,11 @@ public class TNonblockingMultiFetchClient { private TNonblockingMultiFetchStats stats; private ByteBuffer[] recvBuf; - public TNonblockingMultiFetchClient(int maxRecvBufBytesPerServer, - int fetchTimeoutSeconds, ByteBuffer requestBuf, - List<InetSocketAddress> servers) { + public TNonblockingMultiFetchClient( + int maxRecvBufBytesPerServer, + int fetchTimeoutSeconds, + ByteBuffer requestBuf, + List<InetSocketAddress> servers) { this.maxRecvBufBytesPerServer = maxRecvBufBytesPerServer; this.fetchTimeoutSeconds = fetchTimeoutSeconds; this.requestBuf = requestBuf; @@ -154,8 +144,7 @@ public class TNonblockingMultiFetchClient { recvBuf = null; stats.clear(); - if (servers == null || servers.size() == 0 || - requestBuf == null || fetchTimeoutSeconds <= 0) { + if (servers == null || servers.size() == 0 || requestBuf == null || fetchTimeoutSeconds <= 0) { return recvBuf; } @@ -185,9 +174,8 @@ public class TNonblockingMultiFetchClient { } /** - * Private class that does real fetch job. - * Users are not allowed to directly use this class, as its run() - * function may run forever. + * Private class that does real fetch job. Users are not allowed to directly use this class, as + * its run() function may run forever. */ private class MultiFetch implements Runnable { private Selector selector; @@ -195,11 +183,11 @@ public class TNonblockingMultiFetchClient { /** * main entry function for fetching. * - * Server responses are stored in TNonblocingMultiFetchClient.recvBuf, - * and fetch statistics is in TNonblockingMultiFetchClient.stats. + * <p>Server responses are stored in TNonblocingMultiFetchClient.recvBuf, and fetch statistics + * is in TNonblockingMultiFetchClient.stats. * - * Sanity check for parameters has been done in - * TNonblockingMultiFetchClient before calling this function. + * <p>Sanity check for parameters has been done in TNonblockingMultiFetchClient before calling + * this function. */ public void run() { long t1 = System.currentTimeMillis(); @@ -208,7 +196,7 @@ public class TNonblockingMultiFetchClient { stats.setNumTotalServers(numTotalServers); // buffer for receiving response from servers - recvBuf = new ByteBuffer[numTotalServers]; + recvBuf = new ByteBuffer[numTotalServers]; // buffer for sending request ByteBuffer[] sendBuf = new ByteBuffer[numTotalServers]; long[] numBytesRead = new long[numTotalServers]; @@ -246,23 +234,26 @@ public class TNonblockingMultiFetchClient { // free resource if (s != null) { - try {s.close();} catch (Exception ex) {} + try { + s.close(); + } catch (Exception ex) { + } } if (key != null) { - key.cancel(); + key.cancel(); } } } // wait for events - while (stats.getNumReadCompletedServers() + - stats.getNumConnectErrorServers() < stats.getNumTotalServers()) { + while (stats.getNumReadCompletedServers() + stats.getNumConnectErrorServers() + < stats.getNumTotalServers()) { // if the thread is interrupted (e.g., task is cancelled) if (Thread.currentThread().isInterrupted()) { return; } - try{ + try { selector.select(); } catch (Exception e) { LOGGER.error("Selector selects error", e); @@ -275,13 +266,13 @@ public class TNonblockingMultiFetchClient { it.remove(); // get previously attached index - int index = (Integer)selKey.attachment(); + int index = (Integer) selKey.attachment(); if (selKey.isValid() && selKey.isConnectable()) { // if this socket throws an exception (e.g., connection refused), // print error msg and skip it. try { - SocketChannel sChannel = (SocketChannel)selKey.channel(); + SocketChannel sChannel = (SocketChannel) selKey.channel(); sChannel.finishConnect(); } catch (Exception e) { stats.incNumConnectErrorServers(); @@ -294,7 +285,7 @@ public class TNonblockingMultiFetchClient { // if this socket throws an exception, print error msg and // skip it. try { - SocketChannel sChannel = (SocketChannel)selKey.channel(); + SocketChannel sChannel = (SocketChannel) selKey.channel(); sChannel.write(sendBuf[index]); } catch (Exception e) { LOGGER.error("Socket {} writes to server {} error", index, servers.get(index), e); @@ -306,34 +297,38 @@ public class TNonblockingMultiFetchClient { // if this socket throws an exception, print error msg and // skip it. try { - SocketChannel sChannel = (SocketChannel)selKey.channel(); + SocketChannel sChannel = (SocketChannel) selKey.channel(); int bytesRead = sChannel.read(recvBuf[index]); if (bytesRead > 0) { numBytesRead[index] += bytesRead; - if (!hasReadFrameSize[index] && - recvBuf[index].remaining()==0) { + if (!hasReadFrameSize[index] && recvBuf[index].remaining() == 0) { // if the frame size has been read completely, then prepare // to read the actual frame. frameSize[index] = recvBuf[index].getInt(0); if (frameSize[index] <= 0) { stats.incNumInvalidFrameSize(); - LOGGER.error("Read an invalid frame size {} from {}. Does the server use TFramedTransport?", - frameSize[index], servers.get(index)); + LOGGER.error( + "Read an invalid frame size {} from {}. Does the server use TFramedTransport?", + frameSize[index], + servers.get(index)); sChannel.close(); continue; } if (frameSize[index] + 4 > stats.getMaxResponseBytes()) { - stats.setMaxResponseBytes(frameSize[index]+4); + stats.setMaxResponseBytes(frameSize[index] + 4); } if (frameSize[index] + 4 > maxRecvBufBytesPerServer) { stats.incNumOverflowedRecvBuf(); - LOGGER.error("Read frame size {} from {}, total buffer size would exceed limit {}", - frameSize[index], servers.get(index), maxRecvBufBytesPerServer); + LOGGER.error( + "Read frame size {} from {}, total buffer size would exceed limit {}", + frameSize[index], + servers.get(index), + maxRecvBufBytesPerServer); sChannel.close(); continue; } @@ -346,34 +341,30 @@ public class TNonblockingMultiFetchClient { hasReadFrameSize[index] = true; } - if (hasReadFrameSize[index] && - numBytesRead[index] >= frameSize[index]+4) { + if (hasReadFrameSize[index] && numBytesRead[index] >= frameSize[index] + 4) { // has read all data sChannel.close(); stats.incNumReadCompletedServers(); long t2 = System.currentTimeMillis(); - stats.setReadTime(t2-t1); + stats.setReadTime(t2 - t1); } } } catch (Exception e) { - LOGGER.error("Socket {} reads from server {} error", - index, servers.get(index), e); + LOGGER.error("Socket {} reads from server {} error", index, servers.get(index), e); } } } } } - /** - * dispose any resource allocated - */ + /** dispose any resource allocated */ public void close() { try { if (selector.isOpen()) { Iterator<SelectionKey> it = selector.keys().iterator(); while (it.hasNext()) { SelectionKey selKey = it.next(); - SocketChannel sChannel = (SocketChannel)selKey.channel(); + SocketChannel sChannel = (SocketChannel) selKey.channel(); sChannel.close(); } |