privatevoidwrite(Object msg, boolean flush, ChannelPromise promise){ AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { //2.1 调用invokeWriteAndFlush()方法,promise代入 next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { final AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } //2.2 将包装好的task交付给线程池 if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } }
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 //2.1.2 channel已经close了,这里设置失败状态 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; }
protectedfinalvoidsafeSetFailure(ChannelPromise promise, Throwable cause){ //2.1.2.1 尝试设置失败状态 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } }
publicbooleanremove(){ Entry e = flushedEntry; if (e == null) { clearNioBuffers(); returnfalse; } Object msg = e.msg;
ChannelPromise promise = e.promise; int size = e.pendingSize;
removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. ReferenceCountUtil.safeRelease(msg); //3.1 设置promise为成功 safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); }
// recycle the entry e.recycle();
returntrue; }
3.1 设置promise为成功
1 2 3 4 5
privatestaticvoidsafeSuccess(ChannelPromise promise){ // Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return // false. PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger); }
1 2 3 4 5 6 7 8 9 10 11 12
publicstatic <V> voidtrySuccess(Promise<? super V> p, V result, InternalLogger logger){ if (!p.trySuccess(result) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to mark a promise as success because it has succeeded already: {}", p); } else { logger.warn( "Failed to mark a promise as success because it has failed already: {}, unnotified cause:", p, err); } } }
safeExecute(executor, new Runnable() { @Override publicvoidrun(){ notifyListenersNow(); } }); }
DefaultPromise#notifyListeners0()
1 2 3 4 5 6 7
privatevoidnotifyListeners0(DefaultFutureListeners listeners){ GenericFutureListener<?>[] a = listeners.listeners(); int size = listeners.size(); for (int i = 0; i < size; i ++) { notifyListener0(this, a[i]); } }
DefaultPromise#notifyListener0()
1 2 3 4 5 6 7 8 9 10
privatestaticvoidnotifyListener0(Future future, GenericFutureListener l){ try { //调用监听器的回调函数 l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } } }
注册观察者
DefaultPromise#addListener()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener){ checkNotNull(listener, "listener");