diff options
Diffstat (limited to 'lib/java/src/main/java/org/apache/thrift/TBaseAsyncProcessor.java')
-rw-r--r-- | lib/java/src/main/java/org/apache/thrift/TBaseAsyncProcessor.java | 159 |
1 files changed, 79 insertions, 80 deletions
diff --git a/lib/java/src/main/java/org/apache/thrift/TBaseAsyncProcessor.java b/lib/java/src/main/java/org/apache/thrift/TBaseAsyncProcessor.java index f13f068ef..266f0c0ce 100644 --- a/lib/java/src/main/java/org/apache/thrift/TBaseAsyncProcessor.java +++ b/lib/java/src/main/java/org/apache/thrift/TBaseAsyncProcessor.java @@ -18,97 +18,96 @@ */ package org.apache.thrift; -import org.apache.thrift.protocol.*; +import java.util.Collections; +import java.util.Map; import org.apache.thrift.async.AsyncMethodCallback; - +import org.apache.thrift.protocol.*; import org.apache.thrift.server.AbstractNonblockingServer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.Map; - public class TBaseAsyncProcessor<I> implements TAsyncProcessor, TProcessor { - protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); - - final I iface; - final Map<String,AsyncProcessFunction<I, ? extends TBase,?>> processMap; - - public TBaseAsyncProcessor(I iface, Map<String, AsyncProcessFunction<I, ? extends TBase,?>> processMap) { - this.iface = iface; - this.processMap = processMap; + protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); + + final I iface; + final Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> processMap; + + public TBaseAsyncProcessor( + I iface, Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> processMap) { + this.iface = iface; + this.processMap = processMap; + } + + public Map<String, AsyncProcessFunction<I, ? extends TBase, ?>> getProcessMapView() { + return Collections.unmodifiableMap(processMap); + } + + public void process(final AsyncFrameBuffer fb) throws TException { + + final TProtocol in = fb.getInputProtocol(); + final TProtocol out = fb.getOutputProtocol(); + + // Find processing function + final TMessage msg = in.readMessageBegin(); + AsyncProcessFunction fn = processMap.get(msg.name); + if (fn == null) { + TProtocolUtil.skip(in, TType.STRUCT); + in.readMessageEnd(); + + TApplicationException x = + new TApplicationException( + TApplicationException.UNKNOWN_METHOD, "Invalid method name: '" + msg.name + "'"); + LOGGER.debug("Invalid method name", x); + + // this means it is a two-way request, so we can send a reply + if (msg.type == TMessageType.CALL) { + out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); + x.write(out); + out.writeMessageEnd(); + out.getTransport().flush(); + } + fb.responseReady(); + return; } - public Map<String,AsyncProcessFunction<I, ? extends TBase,?>> getProcessMapView() { - return Collections.unmodifiableMap(processMap); + // Get Args + TBase args = fn.getEmptyArgsInstance(); + + try { + args.read(in); + } catch (TProtocolException e) { + in.readMessageEnd(); + + TApplicationException x = + new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage()); + LOGGER.debug("Could not retrieve function arguments", x); + + if (!fn.isOneway()) { + out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); + x.write(out); + out.writeMessageEnd(); + out.getTransport().flush(); + } + fb.responseReady(); + return; } + in.readMessageEnd(); - public void process(final AsyncFrameBuffer fb) throws TException { - - final TProtocol in = fb.getInputProtocol(); - final TProtocol out = fb.getOutputProtocol(); - - //Find processing function - final TMessage msg = in.readMessageBegin(); - AsyncProcessFunction fn = processMap.get(msg.name); - if (fn == null) { - TProtocolUtil.skip(in, TType.STRUCT); - in.readMessageEnd(); - - TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, - "Invalid method name: '" + msg.name + "'"); - LOGGER.debug("Invalid method name", x); - - // this means it is a two-way request, so we can send a reply - if (msg.type == TMessageType.CALL) { - out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); - x.write(out); - out.writeMessageEnd(); - out.getTransport().flush(); - } - fb.responseReady(); - return; - } - - //Get Args - TBase args = fn.getEmptyArgsInstance(); - - try { - args.read(in); - } catch (TProtocolException e) { - in.readMessageEnd(); - - TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, - e.getMessage()); - LOGGER.debug("Could not retrieve function arguments", x); - - if (!fn.isOneway()) { - out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); - x.write(out); - out.writeMessageEnd(); - out.getTransport().flush(); - } - fb.responseReady(); - return; - } - in.readMessageEnd(); - - if (fn.isOneway()) { - fb.responseReady(); - } - - //start off processing function - AsyncMethodCallback resultHandler = fn.getResultHandler(fb, msg.seqid); - try { - fn.start(iface, args, resultHandler); - } catch (Exception e) { - LOGGER.debug("Exception handling function", e); - resultHandler.onError(e); - } - return; + if (fn.isOneway()) { + fb.responseReady(); } - @Override - public void process(TProtocol in, TProtocol out) throws TException { + // start off processing function + AsyncMethodCallback resultHandler = fn.getResultHandler(fb, msg.seqid); + try { + fn.start(iface, args, resultHandler); + } catch (Exception e) { + LOGGER.debug("Exception handling function", e); + resultHandler.onError(e); } + return; + } + + @Override + public void process(TProtocol in, TProtocol out) throws TException {} } |