HttpProcessor httpproc = HttpProcessorBuilder.create() // Use standard client-side protocol interceptors .add(new RequestContent()).add(new RequestTargetHost()).add(new RequestConnControl()) .add(new RequestUserAgent("Test/1.1")).add(new RequestExpectContinue(true)).build(); // Create client-side HTTP protocol handler HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor(); // Create client-side I/O event dispatch final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, ConnectionConfig.DEFAULT); // Create client-side I/O reactor final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(); // Create HTTP connection pool BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, ConnectionConfig.DEFAULT); // Limit total number of connections to just two pool.setDefaultMaxPerRoute(2); pool.setMaxTotal(2); // Run the I/O reactor in a separate thread Thread t = new Thread(new Runnable() {
}); // Start the client thread t.start(); // Create HTTP requester HttpAsyncRequester requester = new HttpAsyncRequester(httpproc); // Execute HTTP GETs to the following hosts and HttpHost[] targets = new HttpHost[] {new HttpHost("shuvigoss.win", 80, "http"), new HttpHost("shuvigoss.win", 80, "http"), new HttpHost("shuvigoss.win", 80, "http")}; final CountDownLatch latch = new CountDownLatch(targets.length); for (final HttpHost target : targets) { BasicHttpRequest request = new BasicHttpRequest("GET", "/"); HttpCoreContext coreContext = HttpCoreContext.create(); requester.execute(new BasicAsyncRequestProducer(target, request), new BasicAsyncResponseConsumer(), pool, coreContext, // Handle HTTP response from a callback new FutureCallback<HttpResponse>() {
if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) { //处理连接 processEvents(readyCount); }
// Verify I/O dispatchers for (int i = 0; i < this.workerCount; i++) { final Worker worker = this.workers[i]; final Exception ex = worker.getException(); if (ex != null) { thrownew IOReactorException( "I/O dispatch worker terminated abnormally", ex); } }
if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) { break; } }
for (final HttpHost target : targets) { BasicHttpRequest request = new BasicHttpRequest("GET", "/"); HttpCoreContext coreContext = HttpCoreContext.create(); requester.execute(new BasicAsyncRequestProducer(target, request), new BasicAsyncResponseConsumer(), pool, coreContext, // Handle HTTP response from a callback new FutureCallback<HttpResponse>() {
// New connection is needed finalint maxPerRoute = getMax(route); // Shrink the pool prior to allocating a new connection finalint excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); if (excess > 0) { for (int i = 0; i < excess; i++) { final E lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } }
if (pool.getAllocatedCount() < maxPerRoute) { finalint totalUsed = this.pending.size() + this.leased.size(); finalint freeCapacity = Math.max(this.maxTotal - totalUsed, 0); if (freeCapacity == 0) { returnfalse; } finalint totalAvailable = this.available.size(); if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } }
@Override public SessionRequest connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final Object attachment, final SessionRequestCallback callback){ Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0, "I/O reactor has been shut down"); final SessionRequestImpl sessionRequest = new SessionRequestImpl( remoteAddress, localAddress, attachment, callback); sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
} catch (final CancelledKeyException ex) { final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment(); key.attach(null); if (requestHandle != null) { final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest(); if (sessionRequest != null) { sessionRequest.cancel(); } } } }
尽量平均的分配到SubReactor
1 2 3 4 5
protectedvoidaddChannel(final ChannelEntry entry){ // Distribute new channels among the workers finalint i = Math.abs(this.currentWorker++ % this.workerCount); this.dispatchers[i].addChannel(entry); }
if (this.status == IOReactorStatus.SHUT_DOWN) { // Hard shut down. Exit select loop immediately break; }
if (this.status == IOReactorStatus.SHUTTING_DOWN) { // Graceful shutdown in process // Try to close things out nicely closeSessions(); closeNewChannels(); }
// Process selected I/O events if (readyCount > 0) { processEvents(this.selector.selectedKeys()); }
// Validate active channels validate(this.selector.keys());
// Process closed sessions processClosedSessions();
// If active process new channels if (this.status == IOReactorStatus.ACTIVE) { processNewChannels(); }
// Exit select loop if graceful shutdown has been completed if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0 && this.sessions.isEmpty()) { break; }
if (this.interestOpsQueueing) { // process all pending interestOps() operations processPendingInterestOps(); }
final IOSession session; try { session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback); int timeout = 0; try { timeout = channel.socket().getSoTimeout(); } catch (final IOException ex) { // Very unlikely to happen and is not fatal // as the protocol layer is expected to overwrite // this value anyways }
@Override publicvoidconnected( final NHttpClientConnection conn, final Object attachment)throws IOException, HttpException { final State state = new State(); final HttpContext context = conn.getContext(); context.setAttribute(HTTP_EXCHANGE_STATE, state); requestReady(conn); }