guavacache源碼閱讀筆記
- 2021 年 6 月 18 日
- 筆記
- guavacache源碼閱讀
guavacache源碼閱讀筆記
官方文檔:
//github.com/google/guava/wiki/CachesExplained
中文版:
//www.jianshu.com/p/88ec858cc021?from=singlemessage
參考文檔:
美團技術團隊:《緩存那些事兒》
//tech.meituan.com/2017/03/17/cache-about.html
緩存在很多數情況下都非常有用,例如,如果計算一個值或者獲取一個值時,代價十分昂貴的話,你就可以考慮使用緩存,並且針對某個特定的輸入,你可能不止一次需要它的值。
緩存Cache和ConcurrentMap很相像,但又不完全一樣。最根本的區別是,ConcurrentMap會保存所有添加到其中的元素直到它們被明確的移除。在另一方面,Cache通常可以配置一個自動化的回收策略去限制它的內存空間。在某些情況下,LoadingCache還是非常有用的,即使它不清除條目。
Guava Cache是Google開源的Java重用工具集庫Guava里的一款緩存工具,其主要實現的緩存功能有:
- 自動將entry節點加載進緩存結構中;
- 當緩存的數據超過設置的最大值時,使用LRU算法移除;
- 具備根據entry節點上次被訪問或者寫入時間計算它的過期機制;
- 緩存的key被封裝在WeakReference引用內;
- 緩存的Value被封裝在WeakReference或SoftReference引用內;
- 統計緩存使用過程中命中率、異常率、未命中率等統計數據。
Guava Cache的架構設計靈感來源於ConcurrentHashMap,我們前面也提到過,簡單場景下可以自行編碼通過hashmap來做少量數據的緩存,但是,如果結果可能隨時間改變或者是希望存儲的數據空間可控的話,自己實現這種數據結構還是有必要的。
核心類圖
核心類及接口的說明,簡單的理解如下:
- Cache接口是Guava對外暴露的緩存接口,提供以下動作:
public interface Cache<K, V> {
/**
* 當key存在緩存中的時候返回value,否則返回null
*/
@Nullable
V getIfPresent(@CompatibleWith("K") Object key);
/**
* 如果key存在緩存中返回value,否則通過loader獲取value
* 如果已緩存返回,否則創建,緩存,返回。
*/
V get(K key, Callable<? extends V> loader) throws ExecutionException;
/**
* 返回一個map,這個map包含所有存在緩存中的entries
*/
ImmutableMap<K, V> getAllPresent(Iterable<?> keys);
/**
* 添加一個緩存entry
* @since 11.0
*/
void put(K key, V value);
void putAll(Map<? extends K, ? extends V> m);
/**
* 移除給定的key的緩存entry
* */
void invalidate(@CompatibleWith("K") Object key);
/**
* 移除給定的keys的緩存entries
* @since 11.0
*/
void invalidateAll(Iterable<?> keys);
/**
* 移除所有的緩存
* */
void invalidateAll();
/**
* 返回緩存個數
* */
@CheckReturnValue
long size();
/**
* 統計
*/
@CheckReturnValue
CacheStats stats();
/**
* 返回所有緩存entries的一個視圖,一個線程安全的map
*/
@CheckReturnValue
ConcurrentMap<K, V> asMap();
/**
* 根據實現策略,主動清除無效緩存
*/
void cleanUp();
}
-
LoadingCache接口繼承自Cache接口,增加了獲取不到緩存自動加載的特性。
通過CacheBuilder構造傳入自動加載策略CacheLoader
LoadingCache<String, String> loadingCache = CacheBuilder.newBuilder() .build( new CacheLoader<String, String>() { @Override public String load(String key) throws Exception { //創建加載緩存 return key; } });
public interface LoadingCache<K, V> extends Cache<K, V>, Function<K, V> {
/**
* 獲取緩存,獲取不到就 創建 緩存 返回
* 顯示聲明異常ExecutionException 需要手動捕獲處理
*/
V get(K key) throws ExecutionException;
/**
* 未檢查的獲取方法,加載緩存過程中可能拋出異常
* 獲取緩存,獲取不到就 創建 緩存 返回
*/
V getUnchecked(K key);
ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException;
@Deprecated
@Override
V apply(K key);
void refresh(K key);
@Override
ConcurrentMap<K, V> asMap();
}
- LocalManualCache是Cache接口的標準實現,顧名思義手動的獲取緩存,當加載不到緩存需手動傳入Callable<? extends V> loader 手動加載。在實現細節中,Callable接口也是被封裝成匿名CacheLoader,負責加載key到緩存。
@Override
public V get(K key, final Callable<? extends V> valueLoader) throws ExecutionException {
checkNotNull(valueLoader);
return localCache.get(
key,
new CacheLoader<Object, V>() {
@Override
public V load(Object key) throws Exception {
return valueLoader.call();
}
});
}
- LocalLoadingCache實現LoadingCache接口並繼承LocalManualCache,實現自動加載緩存特性。
static class LocalLoadingCache<K, V> extends LocalManualCache<K, V>
implements LoadingCache<K, V> {
LocalLoadingCache(
CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) {
super(new LocalCache<K, V>(builder, checkNotNull(loader)));
}
// LoadingCache methods
@Override
public V get(K key) throws ExecutionException {
return localCache.getOrLoad(key);
}
@Override
public V getUnchecked(K key) {
try {
return get(key);
} catch (ExecutionException e) {
throw new UncheckedExecutionException(e.getCause());
}
}
@Override
public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException {
return localCache.getAll(keys);
}
@Override
public void refresh(K key) {
localCache.refresh(key);
}
@Override
public final V apply(K key) {
return getUnchecked(key);
}
// Serialization Support
private static final long serialVersionUID = 1;
@Override
Object writeReplace() {
return new LoadingSerializationProxy<>(localCache);
}
}
-
LocalCache是核心存儲層,是真正意義上數據存放的地方,繼承了java.util.AbstractMap同時也實現了ConcurrentMap接口,實現方式參照了1.7版本的ConcurrentHashMap,使用多個segments方式的細粒度鎖,在保證線程安全的同時,支持高並發場景需求。LocalCache類似於Map,它是存儲鍵值對的集合,不同的是它還需要處理evict、expire、dynamic load等算法邏輯,需要一些額外信息來實現這些操作。如下圖cache的內存數據模型,可以看到,使用ReferenceEntry接口來封裝一個鍵值對,而用ValueReference來封裝Value值,之所以用Reference命令,是因為Cache要支持WeakReference Key和SoftReference、WeakReference value。同時每一個segment中維護了writeQueue,accessQueue,keyReferenceQueue,valueReferenceQueue,recencyQueue隊列來支持不同回收策略。
大致看一下LocalCache和Segment的成員變量和構造方法(LocalCache這個類實在是太龐大了,裏面嵌入了大量的內部類 – -)
-
class LocalCache<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> { // Constants 常量 /** * 最大容量 1073741824 */ static final int MAXIMUM_CAPACITY = 1 << 30; /** * 最大允許的segment數 * 65536 */ static final int MAX_SEGMENTS = 1 << 16; // slightly conservative /** * 獲取鎖最大重試次數 static final int CONTAINS_VALUE_RETRIES = 3; /** * 每個segment在訪問操作結束會緩衝一定的次數後執行回收緩存操作 * 驅除閥值 * 它只在每次第 64 次調用postReadCleanup()時執行 * 63 */ static final int DRAIN_THRESHOLD = 0x3F; /** * 每次驅除reference queues隊列時 最大驅除個數 */ // TODO(fry): empirically optimize this static final int DRAIN_MAX = 16; // Fields 成員變量 static final Logger logger = Logger.getLogger(LocalCache.class.getName()); /** * Mask value for indexing into segments. The upper bits of a key's hash code are used to choose * the segment. */ final int segmentMask; /** * Shift value for indexing within segments. Helps prevent entries that end up in the same segment * from also ending up in the same bucket. */ final int segmentShift; /** The segments, each of which is a specialized hash table. */ final Segment<K, V>[] segments; /** The concurrency level. * 最大並發數 * */ final int concurrencyLevel; /** Strategy for comparing keys. * * 比較keys的工具類 * */ final Equivalence<Object> keyEquivalence; /** Strategy for comparing values. * 比較值的工具類 * */ final Equivalence<Object> valueEquivalence; /** Strategy for referencing keys. * * key引用強度 * */ final Strength keyStrength; /** Strategy for referencing values. * * value引用強度 * */ final Strength valueStrength; /** The maximum weight of this map. UNSET_INT if there is no maximum. * 最大容納大小 * */ final long maxWeight; /** Weigher to weigh cache entries. * 計算每個cache的權重 由用戶自己實現 * */ final Weigher<K, V> weigher; /** How long after the last access to an entry the map will retain that entry. * 訪問過期時間 * */ final long expireAfterAccessNanos; /** How long after the last write to an entry the map will retain that entry. * 寫過期時間 * */ final long expireAfterWriteNanos; /** How long after the last write an entry becomes a candidate for refresh. * 刷新時間 * */ final long refreshNanos; /** Entries waiting to be consumed by the removal listener. * * 移除通知隊列 等待被removal listener消費 * */ // TODO(fry): define a new type which creates event objects and automates the clear logic final Queue<RemovalNotification<K, V>> removalNotificationQueue; /** * A listener that is invoked when an entry is removed due to expiration or garbage collection of * soft/weak entries. * * 監聽一個entry因為到期回收或被垃圾回收器回收而觸發的動作 */ final RemovalListener<K, V> removalListener; /** Measures time in a testable way. * 時間工具 * */ final Ticker ticker; /** Factory used to create new entries. */ final EntryFactory entryFactory; /** * Accumulates global cache statistics. Note that there are also per-segments stats counters which * must be aggregated to obtain a global stats view. * * 統計 */ final StatsCounter globalStatsCounter; /** The default cache loader to use on loading operations. * 緩存自動加載策略 * */ final @Nullable CacheLoader<? super K, V> defaultLoader; /** * Creates a new, empty map with the specified strategy, initial capacity and concurrency level. */ LocalCache( CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) { concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS); keyStrength = builder.getKeyStrength(); valueStrength = builder.getValueStrength(); keyEquivalence = builder.getKeyEquivalence(); valueEquivalence = builder.getValueEquivalence(); maxWeight = builder.getMaximumWeight(); weigher = builder.getWeigher(); expireAfterAccessNanos = builder.getExpireAfterAccessNanos(); expireAfterWriteNanos = builder.getExpireAfterWriteNanos(); refreshNanos = builder.getRefreshNanos(); removalListener = builder.getRemovalListener(); removalNotificationQueue = (removalListener == NullListener.INSTANCE) ? LocalCache.<RemovalNotification<K, V>>discardingQueue() : new ConcurrentLinkedQueue<RemovalNotification<K, V>>(); ticker = builder.getTicker(recordsTime()); entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries()); globalStatsCounter = builder.getStatsCounterSupplier().get(); defaultLoader = loader; int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY); if (evictsBySize() && !customWeigher()) { initialCapacity = (int) Math.min(initialCapacity, maxWeight); } // Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless // maximumSize/Weight is specified in which case ensure that each segment gets at least 10 // entries. The special casing for size-based eviction is only necessary because that eviction // happens per segment instead of globally, so too many segments compared to the maximum size // will result in random eviction behavior. int segmentShift = 0; int segmentCount = 1; while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) { ++segmentShift; segmentCount <<= 1; } this.segmentShift = 32 - segmentShift; segmentMask = segmentCount - 1; this.segments = newSegmentArray(segmentCount); int segmentCapacity = initialCapacity / segmentCount; if (segmentCapacity * segmentCount < initialCapacity) { ++segmentCapacity; } int segmentSize = 1; while (segmentSize < segmentCapacity) { segmentSize <<= 1; } if (evictsBySize()) {//使用按大小回收策略 // Ensure sum of segment max weights = overall max weights //計算每個segment平分下來的大小 long maxSegmentWeight = maxWeight / segmentCount + 1; long remainder = maxWeight % segmentCount; for (int i = 0; i < this.segments.length; ++i) { if (i == remainder) { maxSegmentWeight--; } this.segments[i] = createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get()); } } else { for (int i = 0; i < this.segments.length; ++i) { this.segments[i] = createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get()); } } }
// Inner Classes @SuppressWarnings("serial") // This class is never serialized. static class Segment<K, V> extends ReentrantLock { @Weak final LocalCache<K, V> map; /** * 當前segment中生效的元素的個數 * */ volatile int count; /** * 當前segment中生效元素的大小的總和 * */ @GuardedBy("this") long totalWeight; /** * 修改次數 */ int modCount; /** * 擴容閥值 threshold = table.length * loadFactor(0.75) */ int threshold; /** * 當前segment中存放元素的表 * */ volatile @Nullable AtomicReferenceArray<ReferenceEntry<K, V>> table; /** 允許segment最大大小 */ final long maxSegmentWeight; /** * 存放key已經被垃圾回收的entries的隊列 * 根據引用強度(強引用 軟引用 虛引用)清除元素策略 */ final @Nullable ReferenceQueue<K> keyReferenceQueue; /** * 存放value已經被垃圾回收的entries的隊列 */ final @Nullable ReferenceQueue<V> valueReferenceQueue; /** * recencyQueue 啟用條件和accessQueue一樣。 * 每次訪問操作都會將該entry加入到隊列尾部,並更新accessTime。 * 如果遇到寫入操作,則將該隊列內容排干,如果accessQueue隊列中持有該這些 entry,然後將這些entry add到accessQueue隊列。 * 注意,因為accessQueue是非線程安全的,所以如果每次訪問entry時就將該entry加入到accessQueue隊列中,就會導致並發問題。 * 所以這裡每次訪問先將entry臨時加入到並發安全的ConcurrentLinkedQueue隊列中,也就是recencyQueue中。 * 在寫入的時候通過加鎖的方式,將recencyQueue中的數據添加到accessQueue隊列中。 * 如此看來,recencyQueue是為 accessQueue服務的 */ final Queue<ReferenceEntry<K, V>> recencyQueue; /** * A counter of the number of reads since the last write, used to drain queues on a small * fraction of read operations. */ final AtomicInteger readCount = new AtomicInteger(); /** * 寫隊列 */ @GuardedBy("this") final Queue<ReferenceEntry<K, V>> writeQueue; /** * 訪問隊列 */ @GuardedBy("this") final Queue<ReferenceEntry<K, V>> accessQueue; /** Accumulates cache statistics. */ final StatsCounter statsCounter; Segment( LocalCache<K, V> map, int initialCapacity, long maxSegmentWeight, StatsCounter statsCounter) { this.map = map; this.maxSegmentWeight = maxSegmentWeight; this.statsCounter = checkNotNull(statsCounter); initTable(newEntryArray(initialCapacity)); keyReferenceQueue = map.usesKeyReferences() ? new ReferenceQueue<K>() : null; valueReferenceQueue = map.usesValueReferences() ? new ReferenceQueue<V>() : null; recencyQueue = map.usesAccessQueue() ? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>() : LocalCache.<ReferenceEntry<K, V>>discardingQueue(); writeQueue = map.usesWriteQueue() ? new WriteQueue<K, V>() : LocalCache.<ReferenceEntry<K, V>>discardingQueue(); accessQueue = map.usesAccessQueue() ? new AccessQueue<K, V>() : LocalCache.<ReferenceEntry<K, V>>discardingQueue(); } ... }
-
CacheLoader是一個抽象類,是用戶自己實現的緩存加載策略,即負責在獲取不到緩存的時候根據策略創建緩存返回。
-
CacheBuilder 由於cache配置項眾多,典型的builder模式場景,複雜對象的構造與其對應配置屬性表示的分離。
它提供三種方式加載到緩存中。分別是:
- 在構建緩存的時候,使用build方法內部調用CacheLoader方法加載數據;
- callable 、callback方式加載數據;
- 使用粗暴直接的方式,直接Cache.put 加載數據,但自動加載是首選的,因為它可以更容易的推斷所有緩存內容的一致性。
build生成器的兩種方式都實現了一種邏輯:從緩存中取key的值,如果該值已經緩存過了則返回緩存中的值,如果沒有緩存過可以通過某個方法來獲取這個值,不同的地方在於cacheloader的定義比較寬泛,是針對整個cache定義的,可以認為是統一的根據key值load value的方法,而callable的方式較為靈活,允許你在get的時候指定load方法。
Guava Cache數據結構圖
ReferenceEntry是對一個鍵值對節點的抽象,它包含了key和值的ValueReference抽象類,Cache由多個Segment組成,而每個Segment包含一個ReferenceEntry數組,每個ReferenceEntry數組項都是一條ReferenceEntry鏈,且一個ReferenceEntry包含key、hash、valueReference、next字段。除了在ReferenceEntry數組項中組成的鏈,在一個Segment中,所有ReferenceEntry還組成access鏈(accessQueue)和write鏈(writeQueue)(後面會介紹鏈的作用)。ReferenceEntry可以是強引用類型的key,也可以WeakReference類型的key,為了減少內存使用量,還可以根據是否配置了expireAfterWrite、expireAfterAccess、maximumSize來決定是否需要write鏈和access鏈確定要創建的具體Reference:StrongEntry、StrongWriteEntry、StrongAccessEntry、StrongWriteAccessEntry等。
對於ValueReference,因為Cache支持強引用的Value、SoftReference Value以及WeakReference Value,因而它對應三個實現類:StrongValueReference、SoftValueReference、WeakValueReference。為了支持動態加載機制,它還有一個LoadingValueReference,在需要動態加載一個key的值時,先把該值封裝在LoadingValueReference中,以表達該key對應的值已經在加載了,如果其他線程也要查詢該key對應的值,就能得到該引用,並且等待改值加載完成,從而保證該值只被加載一次,在該值加載完成後,將LoadingValueReference替換成其他ValueReference類型。ValueReference對象中會保留對ReferenceEntry的引用,這是因為在Value因為WeakReference、SoftReference被回收時,需要使用其key將對應的項從Segment的table中移除。
WriteQueue和AccessQueue :為了實現最近最少使用算法,Guava Cache在Segment中添加了兩條鏈:write鏈(writeQueue)和access鏈(accessQueue),這兩條鏈都是一個雙向鏈表,通過ReferenceEntry中的previousInWriteQueue、nextInWriteQueue和previousInAccessQueue、nextInAccessQueue鏈接而成,但是以Queue的形式表達。WriteQueue和AccessQueue都是自定義了offer、add(直接調用offer)、remove、poll等操作的邏輯,對offer(add)操作,如果是新加的節點,則直接加入到該鏈的結尾,如果是已存在的節點,則將該節點鏈接的鏈尾;對remove操作,直接從該鏈中移除該節點;對poll操作,將頭節點的下一個節點移除,並返回。
了解了cache的整體數據結構後,再來看下針對緩存的相關操作就簡單多了:
- Segment中的evict清除策略操作,是在每一次調用操作的開始和結束時觸發清理工作,這樣比一般的緩存另起線程監控清理相比,可以減少開銷,但如果長時間沒有調用方法的話,會導致不能及時的清理釋放內存空間的問題。evict主要處理四個Queue:1. keyReferenceQueue;2. valueReferenceQueue;3. writeQueue;4. accessQueue。前兩個queue是因為WeakReference、SoftReference被垃圾回收時加入的,清理時只需要遍歷整個queue,將對應的項從LocalCache中移除即可,這裡keyReferenceQueue存放ReferenceEntry,而valueReferenceQueue存放的是ValueReference,要從Cache中移除需要有key,因而ValueReference需要有對ReferenceEntry的引用,這個前面也提到過了。而對後面兩個Queue,只需要檢查是否配置了相應的expire時間,然後從頭開始查找已經expire的Entry,將它們移除即可。
- Segment中的put操作:put操作相對比較簡單,首先它需要獲得鎖,然後嘗試做一些清理工作,接下來的邏輯類似ConcurrentHashMap中的rehash,查找位置並注入數據。需要說明的是當找到一個已存在的Entry時,需要先判斷當前的ValueRefernece中的值事實上已經被回收了,因為它們可以是WeakReference、SoftReference類型,如果已經被回收了,則將新值寫入。並且在每次更新時註冊當前操作引起的移除事件,指定相應的原因:COLLECTED、REPLACED等,這些註冊的事件在退出的時候統一調用Cache註冊的RemovalListener,由於事件處理可能會有很長時間,因而這裡將事件處理的邏輯在退出鎖以後才做。最後,在更新已存在的Entry結束後都嘗試着將那些已經expire的Entry移除。另外put操作中還需要更新writeQueue和accessQueue的語義正確性。
- Segment帶CacheLoader的get操作:1. 先查找table中是否已存在沒有被回收、也沒有expire的entry,如果找到,並在CacheBuilder中配置了refreshAfterWrite,並且當前時間間隔已經超過這個時間,則重新加載值,否則,直接返回原有的值;2. 如果查找到的ValueReference是LoadingValueReference,則等待該LoadingValueReference加載結束,並返回加載的值;3. 如果沒有找到entry,或者找到的entry的值為null,則加鎖後,繼續在table中查找已存在key對應的entry,如果找到並且對應的entry.isLoading()為true,則表示有另一個線程正在加載,因而等待那個線程加載完成,如果找到一個非null值,返回該值,否則創建一個LoadingValueReference,並調用loadSync加載相應的值,在加載完成後,將新加載的值更新到table中,即大部分情況下替換原來的LoadingValueReference。
測試構建實現不同回收策略的LoadingCache的實例,分析put和get的實現的細節。
- put操作過程解析:
1.測試入口:構建一個實現支持緩存最大個數緩存策略且有自動加載特性的本地緩存LoadingCache。
public static void main(String[] args) throws Exception{
debugMaximumSize();
// debugExpireAfterWrite();
// debugExpireAfterAccess();
}
private static void debugMaximumSize() throws Exception{
LoadingCache<String, String> loadingCache = CacheBuilder.newBuilder()
.maximumSize(4)
.build(
new CacheLoader<String, String>() {
@Override
public String load(String key) throws Exception {
System.out.println("key:" + key + " is not exist. do nothing.");
return "111";
}
});
loadingCache.put("zhangtianci","good");
loadingCache.get("zhangtianci");
}
2.進入到實現標準接口Cache的LocalManualCache的put的方法,實際通過擁有一個LocalCache的成員變量調用其put方法。
static class LocalManualCache<K, V> implements Cache<K, V>, Serializable {
//擁有一個LocalCache的成員變量
final LocalCache<K, V> localCache;
@Override
public void put(K key, V value) {
localCache.put(key, value);
}
}
3.進入到LocalCache的put方法,對key進行hash定位到segment[]的下標,調用具體對應segment實例的put方法。
class LocalCache{
@Override
public V put(K key, V value) {
checkNotNull(key);
checkNotNull(value);
int hash = hash(key);
return segmentFor(hash).put(key, hash, value, false);
}
..
}
* Callable
*/
public void callablex() throws ExecutionException
{
Cache<String, String> cache = CacheBuilder.newBuilder()
.maximumSize(1000).build();
String result = cache.get("key", new Callable<String>()
{
public String call()
{
return "result";
}
});
System.out.println(result);
}
4.進入到segment的put方法
1.上鎖,寫前清理操作,回收被垃圾回收的entries和過期的entries
2.判斷segment是否需要擴容
3.確定寫入元素在table中的下標並拿到該下標的頭元素,遍歷該鏈表找到這個entry,覆蓋或不做處理或新增。
4.解鎖,寫後清理操作,將removalNotificationQueue隊列裏面註冊的移除事件,一一觸發相應的動作。
@Nullable
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
long now = map.ticker.read();
//寫操作前
// 1.驅逐參考隊列
// 2.驅逐過期entries
preWriteCleanup(now);
//判斷segment是否需要擴容
int newCount = this.count + 1;
if (newCount > this.threshold) { // ensure capacity
expand();
newCount = this.count + 1;
}
//確定寫入元素在table中的下標並拿到該下標的頭元素
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// Look for an existing entry.
//遍歷該鏈表 找到這個entry
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
//找到該entry
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue == null) {//entryValue被垃圾回收了
++modCount;
if (valueReference.isActive()) {
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
setValue(e, key, value, now);
newCount = this.count; // count remains unchanged
} else {
setValue(e, key, value, now);
newCount = this.count + 1;
}
this.count = newCount; // write-volatile
evictEntries(e);
return null;
} else if (onlyIfAbsent) {
// Mimic
// "if (!map.containsKey(key)) ...
// else return map.get(key);
//僅更新access queue隊列的順序
recordLockedRead(e, now);
return entryValue;
} else {
// clobber existing entry, count remains unchanged
//覆蓋現有條目,計數保持不變
++modCount;
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, value, now);
evictEntries(e);
return entryValue;
}
}
}
// Create a new entry.
//沒有找到entry 創建一個新的entry
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
this.count = newCount; // write-volatile
evictEntries(newEntry);
return null;
} finally {
unlock();
//並且在每次更新時註冊當前操作引起的移除事件,指定相應的原因:COLLECTED、REPLACED等,
// 這些註冊的事件在退出的時候統一調用Cache註冊的RemovalListener,
// 由於事件處理可能會有很長時間,因而這裡將事件處理的邏輯在退出鎖以後才做。
postWriteCleanup();
}
}
看看寫前清理操作的實現細節:
加鎖,驅逐參考隊列和驅逐過期entries
@GuardedBy("this")
void preWriteCleanup(long now) {
runLockedCleanup(now);
}
void runLockedCleanup(long now) {
if (tryLock()) {
try {
//驅逐參考隊列
drainReferenceQueues();
//驅逐過期entries
expireEntries(now); // calls drainRecencyQueue
readCount.set(0);
} finally {
unlock();
}
}
}
驅逐參考隊列實現細節
/**
* 驅逐參考隊列
*/
@GuardedBy("this")
void drainReferenceQueues() {
if (map.usesKeyReferences()) {
drainKeyReferenceQueue();
}
if (map.usesValueReferences()) {
drainValueReferenceQueue();
}
}
@GuardedBy("this")
void drainKeyReferenceQueue() {
Reference<? extends K> ref;
int i = 0;
while ((ref = keyReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
map.reclaimKey(entry);
if (++i == DRAIN_MAX) { //此次操作最多清除16個
break;
}
}
}
@GuardedBy("this")
void drainValueReferenceQueue() {
Reference<? extends V> ref;
int i = 0;
while ((ref = valueReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ValueReference<K, V> valueReference = (ValueReference<K, V>) ref;
map.reclaimValue(valueReference);
if (++i == DRAIN_MAX) {
break;
}
}
}
驅逐過期entries實現細節
/**
* 驅逐過期entries
* @param now
*/
@GuardedBy("this")
void expireEntries(long now) {
//清空最近使用隊列
//recencyQueue 是在訪問時維護的一個並發安全的最近使用隊列
drainRecencyQueue();
//清空writeQueue所有過期的entry
ReferenceEntry<K, V> e;
while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
//清空accessQueue所有過期的entry
while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
}
/**
* 排干最近使用隊列,將隊列放到accessQueue中
*/
@GuardedBy("this")
void drainRecencyQueue() {
ReferenceEntry<K, V> e;
while ((e = recencyQueue.poll()) != null) {
if (accessQueue.contains(e)) {
accessQueue.add(e);
}
}
}
移除entry實現細節:
@VisibleForTesting
@GuardedBy("this")
boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {
//獲取entry當前segement中所屬鏈表的頭元素
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
//遍歷鏈表
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
ReferenceEntry<K, V> newFirst =
removeValueFromChain(
first,
e,
e.getKey(),
hash,
e.getValueReference().get(),
e.getValueReference(),
cause);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
}
@GuardedBy("this")
@Nullable
ReferenceEntry<K, V> removeValueFromChain(
ReferenceEntry<K, V> first,
ReferenceEntry<K, V> entry,
@Nullable K key,
int hash,
V value,
ValueReference<K, V> valueReference,
RemovalCause cause) {
enqueueNotification(key, hash, value, valueReference.getWeight(), cause);
writeQueue.remove(entry); //維護writeQueue
accessQueue.remove(entry); //維護accessQueue
if (valueReference.isLoading()) {
valueReference.notifyNewValue(null);
return first;
} else {
return removeEntryFromChain(first, entry);//從實際存放數據的table中移除該entry
}
}
put操作替換value實現細節:
@GuardedBy("this")
void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {
ValueReference<K, V> previous = entry.getValueReference();
int weight = map.weigher.weigh(key, value);
checkState(weight >= 0, "Weights must be non-negative");
ValueReference<K, V> valueReference =
map.valueStrength.referenceValue(this, entry, value, weight);//根據回收策略(value的強/弱/軟引用)new 一個ValueReference
entry.setValueReference(valueReference);
recordWrite(entry, weight, now);//記錄寫,是否需要記錄訪問時間和寫時間,並往加入到accessQueue和writeQueue
previous.notifyNewValue(value);
}
/**
記錄寫操作,是否需要記錄訪問時間和寫時間,並往加入到accessQueue和writeQueue
*/
void recordWrite(ReferenceEntry<K, V> entry, int weight, long now) {
// we are already under lock, so drain the recency queue immediately
drainRecencyQueue();
totalWeight += weight;
if (map.recordsAccess()) {
entry.setAccessTime(now);
}
if (map.recordsWrite()) {
entry.setWriteTime(now);
}
accessQueue.add(entry);
writeQueue.add(entry);
}
寫後清理操作實現細節:
void postWriteCleanup() {
runUnlockedCleanup();
}
void runUnlockedCleanup() {
// locked cleanup may generate notifications we can send unlocked
if (!isHeldByCurrentThread()) {
map.processPendingNotifications();
}
}
}
void processPendingNotifications() {
RemovalNotification<K, V> notification;
while ((notification = removalNotificationQueue.poll()) != null) {
try {
removalListener.onRemoval(notification);
} catch (Throwable e) {
logger.log(Level.WARNING, "Exception thrown by removal listener", e);
}
}
}
- get操作過程解析:
進入到實現LoadingCache接口的有自動加載特性的LocalLoadingCache的get方法
static class LocalLoadingCache<K, V> extends LocalManualCache<K, V>
implements LoadingCache<K, V> {
LocalLoadingCache(
CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) {
super(new LocalCache<K, V>(builder, checkNotNull(loader)));
}
// LoadingCache methods
@Override
public V get(K key) throws ExecutionException {
return localCache.getOrLoad(key);
}
其實是調用成員變量localCache的getOrLoad(key)方法,對key進行hash,定位到segment[]下標,調用segment的get方法
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
int hash = hash(checkNotNull(key));
return segmentFor(hash).get(key, hash, loader);
}
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile
// don't call getLiveEntry, which would ignore loading values
//獲取該entry
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
V value = getLiveValue(e, now);//判斷entry的value是否有效
if (value != null) {
recordRead(e, now);
statsCounter.recordHits(1);
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {//是否正在加載
return waitForLoadingValue(e, key, valueReference);
}
}
}
// at this point e is either null or expired;
//創建 緩存 返回
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();//讀後清理操作
}
}
未完待續…
總體來看,Guava Cache基於ConcurrentHashMap的優秀設計借鑒,在高並發場景支持和線程安全上都有相應的改進策略,使用Reference引用命令,提升高並發下的數據……訪問速度並保持了GC的可回收,有效節省空間;同時,write鏈和access鏈的設計,能更靈活、高效的實現多種類型的緩存清理策略,包括基於容量的清理、基於時間的清理、基於引用的清理等;編程式的build生成器管理,讓使用者有更多的自由度,能夠根據不同場景設置合適的模式。