@Override protectedvoidonRemoval(Stack<T> value){ // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } } };
每一个线程都有一个FastThreadLocal的线程变量,存储的数据是Stack
Stack
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int ratioMask, int maxDelayedQueues) { this.parent = parent; threadRef = new WeakReference<Thread>(thread); //element的最大容量 this.maxCapacity = maxCapacity; //该字段表示该线程创建的对象能在其他线程最大缓存对象的数量 availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); //存储handle elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)]; //并不是每次都能把对象回收,回收的比率 this.ratioMask = ratioMask; //通过get获取对象,这个对象是由当前thread管理,但是最终在其他线程进行释放,会将该对象放在其他线程里数据结构里(WeakOrderQueue),该字段表示该线程最大缓存对象的数量 this.maxDelayedQueues = maxDelayedQueues; }
boolean success = false; do { //2.1.1.1 将该weakQueue的link对象转到该线程stack上 if (cursor.transfer(this)) { success = true; break; } WeakOrderQueue next = cursor.next; if (cursor.owner.get() == null) { // If the thread associated with the queue is gone, unlink it, after // performing a volatile read to confirm there is no data left to collect. // We never unlink the first queue, as we don't want to synchronize on updating the head. if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this)) { success = true; } else { break; } } }
if (srcStart != srcEnd) { final DefaultHandle[] srcElems = head.elements; final DefaultHandle[] dstElems = dst.elements; int newDstSize = dstSize; for (int i = srcStart; i < srcEnd; i++) { DefaultHandle element = srcElems[i]; if (element.recycleId == 0) { element.recycleId = element.lastRecycledId; } elseif (element.recycleId != element.lastRecycledId) { thrownew IllegalStateException("recycled already"); } srcElems[i] = null;
if (dst.dropHandle(element)) { // Drop the object. continue; } element.stack = dst; dstElems[newDstSize ++] = element; }
if (srcEnd == LINK_CAPACITY && head.next != null) { // Add capacity back as the Link is GCed. this.head.reclaimSpace(LINK_CAPACITY); this.head.link = head.next; }
head.readIndex = srcEnd; if (dst.size == newDstSize) { returnfalse; } dst.size = newDstSize; returntrue; } else { // The destination stack is full already. returnfalse; } } }
voidpush(DefaultHandle<?> item){ Thread currentThread = Thread.currentThread(); //判断即将回收的对象是否是当前进行回收的线程创建的 if (threadRef.get() == currentThread) { // The current Thread is the thread that belongs to the Stack, we can try to push the object now. //1. 同线程回收入口 pushNow(item); } else { // The current Thread is not the one that belongs to the Stack // (or the Thread that belonged to the Stack was collected already), we need to signal that the push // happens later. //2. 回收非本线程的对象 pushLater(item, currentThread); } }
int size = this.size; //1.1 大于最大容量或者达到需要drop的比例,并不是回收调所有对象 if (size >= maxCapacity || dropHandle(item)) { // Hit the maximum capacity or should drop - drop the possibly youngest object. return; } if (size == elements.length) { elements = Arrays.copyOf(elements, min(size << 1, maxCapacity)); }
elements[size] = item; this.size = size + 1; }
1.1 判断是否drop掉,不进行回收:Stack#dropHandle()
1 2 3 4 5 6 7 8 9 10
booleandropHandle(DefaultHandle<?> handle){ if (!handle.hasBeenRecycled) { if ((++handleRecycleCount & ratioMask) != 0) { // Drop the object. returntrue; } handle.hasBeenRecycled = true; } returnfalse; }
privatevoidpushLater(DefaultHandle<?> item, Thread thread){ // we don't want to have a ref to the queue as the value in our weak map // so we null it out; to ensure there are no races with restoring it later // we impose a memory ordering here (no-op on x86) //1.1 获得本线程的Map<Stack<?>, WeakOrderQueue> Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); //1.2 通过回收对象的stack来获取本线程的WeakOrderQueue WeakOrderQueue queue = delayedRecycled.get(this); //2.在找不到queue的情况下,创建weakOrderQueue if (queue == null) { //2.1 如果可以延迟回收的数量已经达到上限,则塞进去一个无用的WeakOrderQueue
if (delayedRecycled.size() >= maxDelayedQueues) { // Add a dummy queue so we know we should drop the object delayedRecycled.put(this, WeakOrderQueue.DUMMY); return; } // Check if we already reached the maximum number of delayed queues and if we can allocate at all. //2.2 判断该stack是否满足条件,满足则创建新的queue对象 if ((queue = WeakOrderQueue.allocate(this, thread)) == null) { // drop object return; } //2.3 将新创建的queue插入Map<Stack<?>, WeakOrderQueue>线程变量里 //this delayedRecycled.put(this, queue); //发现是无用的WeakOrderQueue,则当前stack不进行回收,直接drop } elseif (queue == WeakOrderQueue.DUMMY) { // drop object return; }
static WeakOrderQueue allocate(Stack<?> stack, Thread thread){ // We allocated a Link so reserve the space return Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY) ? newQueue(stack, thread) : null; }
//2.2.1 判断该stack能否还能缓存space大小的对象 staticbooleanreserveSpace(AtomicInteger availableSharedCapacity, int space){ assert space >= 0; //cas操作更新剩余空间 for (;;) { int available = availableSharedCapacity.get(); //不够满足的话则返回false if (available < space) { returnfalse; } //最终更新量为available - space if (availableSharedCapacity.compareAndSet(available, available - space)) { returntrue; } } }
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread){ //2.2.2 创建WeakQueue对象 final WeakOrderQueue queue = new WeakOrderQueue(stack, thread); // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so // may be accessed while its still constructed. stack.setHead(queue);
return queue; }
2.2.2 创建WeakQueue对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14
privateWeakOrderQueue(Stack<?> stack, Thread thread){ //2.2.2.1 创建一个Link节点 tail = new Link();
// Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the // Stack itself GCed. //2.2.2.2 创建头结节点 head = new Head(stack.availableSharedCapacity); //2.2.2.3 将link节点跟在head节点后 head.link = tail; //owner拥有者字段是对回收对象创建者线程的弱引用 owner = new WeakReference<Thread>(thread); }