summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBryan Duxbury <bryanduxbury@apache.org>2010-11-05 17:14:52 +0000
committerBryan Duxbury <bryanduxbury@apache.org>2010-11-05 17:14:52 +0000
commitbbe36c5d35bca5177c2a5472b58d784d93769da1 (patch)
tree230375b52ebbd232cdfc2aaa32bbae13c0f2b6af
parent01c5cebfdd0f638f09378e0a735419efc69f3c08 (diff)
downloadthrift-bbe36c5d35bca5177c2a5472b58d784d93769da1.tar.gz
THRIFT-970. java: Under heavy load, THttpClient may fail with 'too many open files'
This patch updates our THttpClient to have two different modes of operation: its current functionality and a new mode that uses Apache's HttpClient library to provide higher throughput and better pooling functionality. Patch: Mathias Herberts git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1031668 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--lib/java/ivy.xml1
-rw-r--r--lib/java/src/org/apache/thrift/transport/THttpClient.java186
2 files changed, 184 insertions, 3 deletions
diff --git a/lib/java/ivy.xml b/lib/java/ivy.xml
index cff7136a8..99ef97293 100644
--- a/lib/java/ivy.xml
+++ b/lib/java/ivy.xml
@@ -33,5 +33,6 @@
<dependency org="commons-lang" name="commons-lang" rev="2.5" conf="* -> *,!sources,!javadoc"/>
<dependency org="junit" name="junit" rev="4.4" conf="test -> *,!sources,!javadoc"/>
<dependency org="javax.servlet" name="servlet-api" rev="2.5" conf="* -> *,!sources,!javadoc"/>
+ <dependency org="org.apache.httpcomponents" name="httpclient" rev="4.0.1" conf="* -> *,!sources,!javadoc"/>
</dependencies>
</ivy-module>
diff --git a/lib/java/src/org/apache/thrift/transport/THttpClient.java b/lib/java/src/org/apache/thrift/transport/THttpClient.java
index 419235310..0d39ff049 100644
--- a/lib/java/src/org/apache/thrift/transport/THttpClient.java
+++ b/lib/java/src/org/apache/thrift/transport/THttpClient.java
@@ -19,6 +19,7 @@
package org.apache.thrift.transport;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
@@ -28,17 +29,46 @@ import java.net.HttpURLConnection;
import java.util.HashMap;
import java.util.Map;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.params.CoreConnectionPNames;
+
/**
* HTTP implementation of the TTransport interface. Used for working with a
- * Thrift web services implementation.
+ * Thrift web services implementation (using for example TServlet).
+ *
+ * This class offers two implementations of the HTTP transport.
+ * One uses HttpURLConnection instances, the other HttpClient from Apache
+ * Http Components.
+ * The chosen implementation depends on the constructor used to
+ * create the THttpClient instance.
+ * Using the THttpClient(String url) constructor or passing null as the
+ * HttpClient to THttpClient(String url, HttpClient client) will create an
+ * instance which will use HttpURLConnection.
+ *
+ * When using HttpClient, the following configuration leads to 5-15%
+ * better performance than the HttpURLConnection implementation:
+ *
+ * http.protocol.version=HttpVersion.HTTP_1_1
+ * http.protocol.content-charset=UTF-8
+ * http.protocol.expect-continue=false
+ * http.connection.stalecheck=false
*
+ * Also note that under high load, the HttpURLConnection implementation
+ * may exhaust the open file descriptor limit.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/THRIFT-970">THRIFT-970</a>
*/
+
public class THttpClient extends TTransport {
private URL url_ = null;
- private final ByteArrayOutputStream requestBuffer_ =
- new ByteArrayOutputStream();
+ private final ByteArrayOutputStream requestBuffer_ = new ByteArrayOutputStream();
private InputStream inputStream_ = null;
@@ -48,9 +78,54 @@ public class THttpClient extends TTransport {
private Map<String,String> customHeaders_ = null;
+ private final HttpHost host;
+
+ private final HttpClient client;
+
+ public static class Factory extends TTransportFactory {
+
+ private final String url;
+ private final HttpClient client;
+
+ public Factory(String url) {
+ this.url = url;
+ this.client = null;
+ }
+
+ public Factory(String url, HttpClient client) {
+ this.url = url;
+ this.client = client;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport trans) {
+ try {
+ if (null != client) {
+ return new THttpClient(url, client);
+ } else {
+ return new THttpClient(url);
+ }
+ } catch (TTransportException tte) {
+ return null;
+ }
+ }
+ }
+
public THttpClient(String url) throws TTransportException {
try {
url_ = new URL(url);
+ this.client = null;
+ this.host = null;
+ } catch (IOException iox) {
+ throw new TTransportException(iox);
+ }
+ }
+
+ public THttpClient(String url, HttpClient client) throws TTransportException {
+ try {
+ url_ = new URL(url);
+ this.client = client;
+ this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol());
} catch (IOException iox) {
throw new TTransportException(iox);
}
@@ -58,10 +133,20 @@ public class THttpClient extends TTransport {
public void setConnectTimeout(int timeout) {
connectTimeout_ = timeout;
+ if (null != this.client) {
+ // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the
+ // same HttpClient is used for something else.
+ client.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, connectTimeout_);
+ }
}
public void setReadTimeout(int timeout) {
readTimeout_ = timeout;
+ if (null != this.client) {
+ // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the
+ // same HttpClient is used for something else.
+ client.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, readTimeout_);
+ }
}
public void setCustomHeaders(Map<String,String> headers) {
@@ -111,7 +196,102 @@ public class THttpClient extends TTransport {
requestBuffer_.write(buf, off, len);
}
+ private void flushUsingHttpClient() throws TTransportException {
+
+ if (null == this.client) {
+ throw new TTransportException("Null HttpClient, aborting.");
+ }
+
+ // Extract request and reset buffer
+ byte[] data = requestBuffer_.toByteArray();
+ requestBuffer_.reset();
+
+ HttpPost post = null;
+
+ InputStream is = null;
+
+ try {
+ // Set request to path + query string
+ post = new HttpPost(this.url_.getFile());
+
+ //
+ // Headers are added to the HttpPost instance, not
+ // to HttpClient.
+ //
+
+ post.setHeader("Content-Type", "application/x-thrift");
+ post.setHeader("Accept", "application/x-thrift");
+ post.setHeader("User-Agent", "Java/THttpClient/HC");
+
+ if (null != customHeaders_) {
+ for (Map.Entry<String, String> header : customHeaders_.entrySet()) {
+ post.setHeader(header.getKey(), header.getValue());
+ }
+ }
+
+ post.setEntity(new ByteArrayEntity(data));
+
+ HttpResponse response = this.client.execute(this.host, post);
+ int responseCode = response.getStatusLine().getStatusCode();
+
+ if (responseCode != HttpStatus.SC_OK) {
+ throw new TTransportException("HTTP Response code: " + responseCode);
+ }
+
+ // Read the responses into a byte array so we can release the connection
+ // early. This implies that the whole content will have to be read in
+ // memory, and that momentarly we might use up twice the memory (while the
+ // thrift struct is being read up the chain).
+ // Proceeding differently might lead to exhaustion of connections and thus
+ // to app failure.
+
+ is = response.getEntity().getContent();
+
+ byte[] buf = new byte[1024];
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ int len = 0;
+ do {
+ len = is.read(buf);
+ if (len > 0) {
+ baos.write(buf, 0, len);
+ }
+ } while (-1 != len);
+
+ try {
+ // Indicate we're done with the content.
+ response.getEntity().consumeContent();
+ } catch (IOException ioe) {
+ // We ignore this exception, it might only mean the server has no
+ // keep-alive capability.
+ }
+
+ inputStream_ = new ByteArrayInputStream(baos.toByteArray());
+ } catch (IOException ioe) {
+ // Abort method so the connection gets released back to the connection manager
+ if (null != post) {
+ post.abort();
+ }
+ throw new TTransportException(ioe);
+ } finally {
+ if (null != is) {
+ // Close the entity's input stream, this will release the underlying connection
+ try {
+ is.close();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
+ }
+ }
+
public void flush() throws TTransportException {
+
+ if (null != this.client) {
+ flushUsingHttpClient();
+ return;
+ }
+
// Extract request and reset buffer
byte[] data = requestBuffer_.toByteArray();
requestBuffer_.reset();