privatevoidwrite(Object msg, boolean flush, ChannelPromise promise){ //5.1 找到下一个outbound的ctx AbstractChannelHandlerContext next = findContextOutbound(); //5.2 touch:记录一下内存的写入位置 final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); //5.3 最终调用下一个ctx重写的write if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { final AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } }
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; }
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) // // The Entry that is the first in the linked-list structure that was flushed private Entry flushedEntry; // The Entry which is the first unflushed in the linked-list structure private Entry unflushedEntry; // The Entry which represents the tail of the buffer private Entry tailEntry;
// increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 //5.3.2.3 增加待刷新的数据大小 incrementPendingOutboundBytes(entry.pendingSize, false); }
publicvoidaddFlush(){ // There is no need to process all entries if there was already a flush before and no new messages // where added in the meantime. // // See https://github.com/netty/netty/issues/2577 //entry获取还未进行flush操作的第一个元素 Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry //指向第一个 flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); //每flush一个对象,需要把对象的大小从总的pendsize decrementPendingOutboundBytes(pending, false, true); } //指向下一个entry entry = entry.next; } while (entry != null);
// All flushed so reset unflushedEntry unflushedEntry = null; } }
protectedvoidflush0(){ if (inFlush0) { //已经flush // Avoid re-entrance return; }
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; }
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; }
try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } } finally { inFlush0 = false; } }
publicbooleanremove(){ Entry e = flushedEntry; if (e == null) { clearNioBuffers(); returnfalse; } Object msg = e.msg;
ChannelPromise promise = e.promise; int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); }
// recycle the entry e.recycle();
returntrue; }
ChannelOutboundBuffer#removeEntry()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
privatevoidremoveEntry(Entry e){ //flushed:The number of flushed entries that are not written yet if (-- flushed == 0) { //已经没有待flush操作的entry,清空指针 // processed everything flushedEntry = null; if (e == tailEntry) { tailEntry = null; unflushedEntry = null; } } else { flushedEntry = e.next; } }
protectedfinalvoidincompleteWrite(boolean setOpWrite){ // Did not write completely. //3.3.1 说明还没有写完,在selector上注册写标识 if (setOpWrite) { setOpWrite(); } else { // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then // use our write quantum. In this case we no longer want to set the write OP because the socket is still // writable (as far as we know). We will find out next time we attempt to write if the socket is writable // and set the write OP if necessary. //3.3.2 写完了,清理Selector上注册的写标识。稍后再执行刷新计划 clearOpWrite();
// Schedule flush again later so other tasks can be picked up in the meantime eventLoop().execute(flushTask); } }
protectedfinalvoidsetOpWrite(){ final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 //判断key是否还有效 if (!key.isValid()) { return; } finalint interestOps = key.interestOps(); //判断Selector上是否注册了OP_WRITE标识,如果没有则注册上。 if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }