publicstaticfinal Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in){ try { final ByteBuf buffer; //1.1 超出累加器的容量 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain() or if its read-only. // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 //扩容 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } // buffer.writeBytes(in); return buffer; } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) in.release(); } } };
// Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; }
//mark一下当前数据的可读字节 int oldInputLength = in.readableBytes(); //调用子类实现的decode,将in的字节数据解码成对象到out里 decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; }
//经过解码后,没有新生成的对象 if (outSize == out.size()) { //而且in的字节流也没有被消耗,说明这一次无法 if (oldInputLength == in.readableBytes()) { break; } else { //可能被消耗了部分,需要后面几次继续解码才能形成一个对象 continue; } }
//没有消耗任何数据就解码出了对象,有问题 if (oldInputLength == in.readableBytes()) { thrownew DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); }
staticvoidfireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements){ for (int i = 0; i < numElements; i ++) { ctx.fireChannelRead(msgs.getUnsafe(i)); } }