[從源碼學設計] Flume 之 memory channel
[從源碼學設計] Flume 之 memory channel
0x00 摘要
在使用Flume時,有時遇到如下錯誤資訊:Space for commit to queue couldn’t be acquired。
究其原因,是在memory channel的使用中出現了問題。
本文就以此為切入點,帶大家一起剖析下 Flume 中 MemoryChannel 的實現
0x01 業務範疇
1.1 用途和特點
Flume的用途:高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統。
這裡我們介紹與本文相關的特點:
- Flume的管道是基於事務,保證了數據在傳送和接收時的一致性.
- Flume是可靠的,容錯性高的,可升級的,易管理的,並且可訂製的。
- 當收集數據的速度超過將寫入數據的時候,也就是當收集資訊遇到峰值時,這時候收集的資訊非常大,甚至超過了系統的寫入數據能力,這時候,Flume會在數據生產者和數據收容器間做出調整,保證其能夠在兩者之間提供平穩的數據.
1.2 Channel
這裡就要介紹channel的概念。channel是一種短暫的存儲容器,它將從source處接收到的event格式的數據快取起來,直到它們被sinks消費掉,它在source和sink間起著橋樑的作用,channel是一個完整的事務,這一點保證了數據在收發的時候的一致性。並且它可以和任意數量的source和sink鏈接。
支援的類型主要有: JDBC channel , File System channel , Memory channel等,大致區別如下:
- Memory Channel:events存儲在Java Heap,即記憶體隊列中(記憶體的大小是可以指定的)。對於流量較高和由於agent故障而準備丟失數據的流程來說,這是一個理想的選擇;
- File Channel:event保存在本地文件中,可靠性高,但吞吐量低於Memory Channel;
- JDBC Channel :event存儲在持久化存儲庫中(其背後是一個資料庫),JDBC channel目前支援嵌入式Derby。這是一個持續的channel,對於可恢復性非常重要的流程來說是理想的選擇;
- Kafka Channel:events存儲在Kafka集群中。Kafka提供高可用性和高可靠性,所以當agent或者kafka broker 崩潰時,events能馬上被其他sinks可用。
本文主要涉及Memory Channel,所以看看其特性。
- 好處:速度快,吞吐量大;
- 壞處:根據電腦工作的原理就可以得知,凡是在記憶體中計算的數據,只要電腦出現故障導致停機,那麼記憶體中數據是不會進行保存的;
- 所適用的場景:高吞吐量,允許數據丟失的業務中;
1.3 研究重點
由此,我們可以總結出來 Flume 的一些重點功能:
- 可靠的,容錯性高的;
- 實現事務;
- 速度快,吞吐量大;
- 可以調節收集的速度以解決生產者消費者不一致;
- 可升級的,易管理,可訂製的;
因為MemoryChannel屬於Flume的重要模組,所以,我們本文就看看是MemoryChannel是如何確保Flume以上特點的,這也是本文的學習思路。
1.4 實際能夠學到什麼
如何回滾,使用鎖,訊號量 ,動態擴容,如何解決生產者消費者不一致問題。
1.5 總述
MemoryChannel還是比較簡單的,主要是通過MemoryTransaction中的putList、takeList與MemoryChannel中的queue進行數據流轉和事務控制,這裡的queue相當於持久化層,只不過放到了記憶體中,如果是FileChannel的話,會把這個queue放到本地文件中。
MemoryChannel受記憶體空間的影響,如果數據產生的過快,同時獲取訊號量超時容易造成數據的丟失。而且Flume進程掛掉,數據也會丟失。
具體是:
- 維持一個隊列,隊列的兩端分別是source和sink。
- source使用doPut方法往putList插入Event
- sink使用doTake方法從queue中獲取event放入takeList,並且提供rollback方法,用於回滾。
- commit方法作用是把putList中的event一次性寫到queue;
下面表示了Event在一個使用了MemoryChannel的agent中數據流向:
source ---> putList ---> queue ---> takeList ---> sink
為了大家更好的理解,我們提前把最終圖例發到這裡。
具體如下圖:
+----------+ +-------+
| Source | +----------------------------------------------------------------+ | Sink |
+-----+----+ | [MemoryChannel] | +---+---+
| | +--------------------------------------------------------+ | ^
| | | [MemoryTransaction] | | |
| | | | | |
| | | | | |
| | | channelCounter | | |
| | | | | |
| | | putByteCounter takeByteCounter | | |
| | | | | |
| | | +-----------+ +------------+ | |doTake |
+----------------> | putList | | takeList +----------------+
doPut | | +----+--+---+ +----+---+---+ | |
| | | ^ | ^ | |
| | | | | | | |
| +--------------------------------------------------------+ |
| | | | | poll |
| | | | | |
| | | rollback rollback | | |
| | +--------------+ +-------------+ | |
| | | | | |
| | | v | |
| | doCommit +--+--+---+ doCommit | |
| +------------> | queue | +-----------+ |
| +---------+ |
+----------------------------------------------------------------+
手機上如圖:
0x02 定義
我們要看看MemoryChannel重要變數的定義,這裡我們沒有按照程式碼順序來,而是重新整理。
2.1 介面
MemorChannel中最重要的部分主要是Channel、Transaction 和Configurable三個介面。
Channel介面 主要聲明了Channel中的三個方法,就是隊列基本功能:
public void put(Event event) throws ChannelException; //從指定的Source中獲得Event放入指定的Channel中
public Event take() throws ChannelException; //從Channel中取出event放入Sink中
public Transaction getTransaction(); //獲得當前Channel的事務實例
Transaction介面 主要聲明了flume中事務機制的四個方法,就是事務功能:
enum TransactionState { Started, Committed, RolledBack, Closed } //枚舉類型,指定了事務的四種狀態,事務開始、提交、失敗回滾、關閉
void begin();
void commit();
void rollback();
void close();
Configurable介面 主要是和flume配置組件相關的,需要從flume配置系統獲取配置資訊的任何組件,都必須實現該介面。該介面中只聲明了一個context方法,用於獲取配置資訊。
大體邏輯如下:
+-----------+ +--------------+ +---------------+
| | | | | |
| Channel | | Transaction | | Configurable |
| | | | | |
+-----------+ +--------------+ +---------------+
^ ^ ^
| | |
| | |
| | |
| +-------------+--------------+ |
| | | |
| | MemorChannel +---------+
+-------+ | |
| |
| |
| |
| |
| |
| |
+----------------------------+
下面我們具體講講成員變數。
2.2 配置參數
首先是一系列業務配置參數。
//定義隊列中一次允許的事件總數
private static final Integer defaultCapacity = 100;
//定義一個事務中允許的事件總數
private static final Integer defaultTransCapacity = 100;
//將物理記憶體轉換成槽(slot)數,默認是100
private static final double byteCapacitySlotSize = 100;
//定義隊列中事件所使用空間的最大位元組數(默認是JVM最大可用記憶體的0.8)
private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80);
//定義byteCapacity和預估Event大小之間的緩衝區百分比:
private static final Integer defaultByteCapacityBufferPercentage = 20;
//添加或者刪除一個event的超時時間,單位秒:
private static final Integer defaultKeepAlive = 3;
// maximum items in a transaction queue
private volatile Integer transCapacity;
private volatile int keepAlive;
private volatile int byteCapacity;
private volatile int lastByteCapacity;
private volatile int byteCapacityBufferPercentage;
private ChannelCounter channelCounter;
這些參數基本都在configure(Context context)中設置,基本邏輯如下:
-
設置 capacity:MemroyChannel的容量,默認是100。
-
設置 transCapacity:每個事務最大的容量,也就是每個事務能夠獲取的最大Event數量。默認也是100。事務容量必須小於等於Channel Queue容量。
-
設置 byteCapacityBufferPercentage:用來確定byteCapacity的一個百分比參數,即我們定義的位元組容量和實際事件容量的百分比,因為我們定義的位元組容量主要考慮Event body,而忽略Event header,因此需要減去Event header部分的記憶體佔用,可以認為該參數定義了Event header佔了實際位元組容量的百分比,默認20%;
-
設置 byteCapacity:byteCapacity等於設置的byteCapacity值或堆的80%乘以1減去byteCapacityBufferPercentage的百分比,然後除以100。具體是首先讀取配置文件定義的byteCapacity,如果沒有定義,則使用默認defaultByteCapacity,而defaultByteCapacity默認是JVM物理記憶體的80%(Runtime.getRuntime().maxMemory() * .80);那麼實際byteCapacity=定義的byteCapacity * (1- Event header百分比)/ byteCapacitySlotSize;byteCapacitySlotSize默認100,即計算百分比的一個係數。
-
設置 keep-alive:增加和刪除一個Event的超時時間(單位:秒)。
-
設置初始化 LinkedBlockingDeque對象,大小為capacity。以及各種訊號量對象。
-
最後初始化計數器。
配置程式碼摘要如下:
public void configure(Context context) {
capacity = context.getInteger("capacity", defaultCapacity);
transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage",
defaultByteCapacityBufferPercentage);
byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() *(1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize);
if (byteCapacity < 1) {
byteCapacity = Integer.MAX_VALUE;
}
keepAlive = context.getInteger("keep-alive", defaultKeepAlive);
resizeQueue(capacity);
if (channelCounter == null) {
channelCounter = new ChannelCounter(getName());
}
}
2.2.1 channel屬性
ChannelCounter 需要單獨說一下。其就是把channel的一些屬性封裝了一下,初始化了一個ChannelCounter,是一個計數器,記錄如當前隊列放入Event數、取出Event數、成功數等。
private ChannelCounter channelCounter;
定義如下:
public class ChannelCounter extends MonitoredCounterGroup implements
ChannelCounterMBean {
private static final String COUNTER_CHANNEL_SIZE = "channel.current.size";
private static final String COUNTER_EVENT_PUT_ATTEMPT ="channel.event.put.attempt";
private static final String COUNTER_EVENT_TAKE_ATTEMPT = "channel.event.take.attempt";
private static final String COUNTER_EVENT_PUT_SUCCESS = "channel.event.put.success";
private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success";
private static final String COUNTER_CHANNEL_CAPACITY = "channel.capacity";
}
2.4 Semaphore和Queue
其次是Semaphore和Queue。主要就是用來協助制事務。
MemoryChannel有三個訊號量用來控制事務,防止容量越界:queueStored,queueRemaining,bytesRemaining。
- queueLock:創建一個Object當做隊列鎖,操作隊列的時候保證數據的一致性;
- queue:使用LinkedBlockingDeque queue維持一個隊列,隊列的兩端分別是source和sink;
- queueStored:來保存queue中當前的保存的event的數目,即已經存儲的容量大小,後面tryAcquire方法可以判斷是否可以take到一個event;
- queueRemaining:來保存queue中當前可用的容量,即空閑的容量大小,可以用來判斷當前是否有可以提交一定數量的event到queue中;
- bytesRemaining : 表示可以使用的記憶體大小。該大小就是計算後的byteCapacity值。
private Object queueLock = new Object();
@GuardedBy(value = "queueLock")
private LinkedBlockingDeque<Event> queue;
private Semaphore queueRemaining;
private Semaphore queueStored;
private Semaphore bytesRemaining;// 表示可以使用的記憶體大小。該大小就是計算後的byteCapacity值。
2.5 MemoryTransaction
內部類MemoryTransaction
是整個事務保證最重要的類。
MemoryTransaction用來接收數據和事務控制。該類繼承BasicTransactionSemantics類。
MemoryTransaction維護了兩個隊列,一個用於Source的put,一個用於Sink的take,容量大小為事務的容量(transCapacity)。
- takeList:take事務用到的隊列;阻塞雙端隊列,從channel中取event先放入takeList,輸送到sink,commit成功,從channel queue中刪除;
- putList:put事務用到的隊列;從source 會先放至putList,然後commit傳送到channel queue隊列;
- channelCounter:channel屬性;ChannelCounter類定義了監控指標數據的一些屬性方法;
- putByteCounter:put位元組數計數器;
- takeByteCounter:take位元組計數器;
private class MemoryTransaction extends BasicTransactionSemantics {
private LinkedBlockingDeque<Event> takeList;
private LinkedBlockingDeque<Event> putList;
private final ChannelCounter channelCounter;
private int putByteCounter = 0;
private int takeByteCounter = 0;
}
無論是Sink,還是Source都會調用getTransaction()方法,獲取當前Channel的事務實例。
介面與成員變數大致邏輯可以理解如下,其中 Channel 的 API 表示這裡是 MemorChannel 的對外 API:
+-----------+ +--------------+ +---------------+
| | | | | |
| Channel | | Transaction | | Configurable |
| | | | | |
+---+-------+ +--------------+ +---------------+
^
| ^ ^
| | |
| | |
| +--------------------------------------------------------+ |
| | | | |
| | MemoryChannel | | |
| | + | |
| | | |
| | MemoryTransaction | |
| | | |
| | Semaphore / Queue | |
| | | |
+--------+ | |
API | | |
| | |
| Config Parameters +------------+
| |
| |
+--------------------------------------------------------+
0x03 使用
看了上面講的,估計大家還是會暈,因為成員變數和概念實在是太多了,所以我們從使用入手分析。
前面提到,memory channel內部有三個隊列,分別是putList,queue,takeList。其中putList,takeList在MemoryTransaction之中。
3.1 channel如何使用
channel之上有一把鎖,當source主動向channel放數據或者sink主動從channel取數據時,會搶鎖,誰取到鎖,誰就可以操作channel。
每次使用時會首先調用tx.begin()開始事務,也就是獲取鎖。然後調用tx.commit()提交數據或者調用tx.rollback()取消操作。
這裡需要注意的是:Source, Sink 都是死循環,搶同一個鎖。所以就會有消費者,生產者速度不一致的情況,所以就需要有 一個內部的 buffer,就是我們的Queue。
3.2 source往channel放數據
這是一個死循環,source一直試圖獲取channel鎖,然後從kafka獲取數據,放入channel中,那每次放入多少個數據呢?在KafkaSource.java中,程式碼是這樣的:
while (eventList.size() < batchUpperLimit &&
System.currentTimeMillis() < maxBatchEndTime) {
}
含義就是:每次最多放batchUpperLimit或最多等待maxBatchEndTime的時間,就結束向channel放數據。
當獲取了足夠的數據,首先放入putList中,然後就會調用tx.commit()將putList的全部數據放入queue中。
3.3 sink從channel取數據
也是一個死循環,sink一直試圖獲取channel鎖,然後從channel取一批數據,放入sink和takeList(僅僅用於回滾,在調用rollback時takeList的數據會回滾到queue中)。每次取多少個event呢?以HDFSEventSink為例,程式碼如下:
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();
if (event == null)
break;
}
batchSize的大小默認是100,由hdfs.batchSize控制。
具體如下:
+--------------->
^ |
| | while(1)
| v
+-----------+ | +----+----+
| Source | | take | Sink |
| | | | |
+-----+-----+ | +---------+
| |
| +-------------+--+
| | Channel |
| | |
While(1) | | |
| | buffer |
| +----------------+
|
| ^
| |
| | put
v ----------------^
0x04 實現事務
此處回答了前面提到的兩個重點:
- 可靠的,容錯性高的;
- 實現事務;
其實就是用事務保證整個流程的高可靠,其核心就在從source抽取數據到channel,從channel抽取到sink,當sink被消費後channel數據刪除的這三個環節。而這些環節在flume中被統一的用事務管理起來。可以說,這是flume高可靠的關鍵一點。
具體涉及到的幾個點如下:
- MemoryTransaction是實現事務的核心。每次使用時會首先調用tx.begin()開始事務,也就是獲取鎖。然後調用tx.commit()提交數據或者調用tx.rollback()取消操作。
- MemoryChannel時設計時考慮了兩個容量:Channel Queue容量和事務容量,而這兩個容量涉及到了數量容量和位元組數容量。
- MemoryChannel 會根據事務容量 transCapacity 創建兩個阻塞雙端隊列putList和takeList,這兩個隊列(相當於兩個臨時緩衝隊列)主要就是用於事務處理的。即,每個事務都有一個Take List和Put List分別用於存儲事務相關的取數據和放數據,等事務提交時才完全同步到Channel Queue,或者失敗把取數據回滾到Channel Queue。
- 首先由一個Channel Queue用於存儲整個Channel的Event數據;
- 當從Source往 Channel中放事件event 時,會先將event放入 putList 隊列,然後將putList隊列中的event 放入 MemoryChannel的queue中。
- 當從 Channel 中將數據傳送給 Sink 時,則會將event先放入 takeList 隊列中,然後從takeList隊列中將event送入Sink,不論是 put 還是 take 發生異常,都會調用 rollback 方法回滾事務。
- 回滾時,會先給 Channel 加鎖防止回滾時有其他執行緒訪問,若takeList 不為空, 就將寫入 takeList中的event再次放入 Channel 中,然後移除 putList 中的所有event(即就是丟棄寫入putList臨時隊列的 event)。
- 因為多個事務要操作Channel Queue,還要考慮Channel Queue的動態擴容問題,因此MemoryChannel使用了鎖來實現;而容量問題則使用了訊號量來實現。
我們下面具體走一下這個流程。
4.1 put事務
此事務發生在在Source到Channel之間,是從指定的Source中獲得Event放入指定的Channel中,具體包括:
- doPut:將批數據先寫入臨時緩衝區 putList;
- doCommit:檢查 channel 記憶體隊列是否足夠合併;
- doRollback:channel 記憶體隊列空間不足,回滾數據;
如下調用。
try {
tx.begin();
//底層就是調用的doPut方法
// Source寫事件調用put方法
reqChannel.put(event);
tx.commit();
} catch (Throwable t) {
// 發生異常則回滾事務
tx.rollback();
if (t instanceof Error) {
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put event on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
}
下面分析doPut方法。
doPut邏輯如下:
- 計算event大概佔用的slot數;
- offer方法往putList中添加event,等事務提交時轉移到Channel Queue,如果滿了則直接拋異常回滾事務;
- 累加這一條event所佔用的slot空間,以便之後做位元組容量限制。
具體程式碼如下:
protected void doPut(Event event) throws InterruptedException {
//增加放入事件計數器
channelCounter.incrementEventPutAttemptCount();
//estimateEventSize計算當前Event body大小
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
/*
* offer若立即可行且不違反容量限制,則將指定的元素插入putList阻塞雙端隊列中(隊尾),
* 並在成功時返回,如果當前沒有空間可用,則拋異常回滾事務
* */
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
//記錄Event的byte值
putByteCounter += eventByteSize;
}
具體如下圖,我們暫時忽略commit與rollback:
+----------+
| Source | +---------------------------+
+-----+----+ | [MemoryChannel] |
| | +---------------------+ |
| | | [MemoryTransaction] | |
| | | | |
| | | | |
| | | channelCounter | |
| | | | |
| | | putByteCounter | |
| | | | |
| | | +-----------+ | |
+----------------> | putList | | |
doPut | | +-----------+ | |
| +---------------------+ |
+---------------------------+
4.2 take事務
此事務發生在Channel到Sink之間,主要是從Channel中取出event放入Sink中,具體包括。
- doTake:將數據取到臨時緩衝區 takeList,並將數據發送到 HDFS;
- doCommit:如果數據全部發送成功,則清除臨時緩衝區 takeList;
- doRollback:數據發送過程中如果出現異常,rollback 將臨時緩衝區 takeList 中的數據歸還給 channel 記憶體隊列;
如下調用:
transaction = channel.getTransaction();
transaction.begin();
......
event = channel.take();
......
transaction.commit();
邏輯如下:
- 判斷takeList中是否還有空間,如果沒有空間則拋出異常;
- 判斷當前MemoryChannel中的queue中是否還有空間,這裡通過訊號量來判斷;
- 從queue頭部彈出一條消息,放入takeList中;
- 估算這條Event所佔空間(slot數),累加takeList中的位元組數;
- 將取出來的這條Event返回;
doTake具體程式碼如下:
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();//將正在從channel中取出的event計數器原子的加一,即增加取出事件計數器
//如果takeList隊列沒有剩餘容量,即當前事務已經消費了最大容量的Event,拋異常
if (takeList.remainingCapacity() == 0) {//takeList隊列剩餘容量為0
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}
//嘗試獲取一個訊號量獲取許可,如果可以獲取到許可的話,證明queue隊列有空間,超時直接返回null
if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
return null;
}
Event event;
synchronized (queueLock) {
event = queue.poll(); //獲取並移除MemoryChannel雙端隊列表示的隊列的頭部(也就是隊列的第一個元素),隊列為空返回null,同一時間只能有一個執行緒訪問,加鎖同步
}
//因為訊號量的保證,Channel Queue不應該返回null,出現了就不正常了
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
"signalling existence of entry");
takeList.put(event); //將取出的event暫存到事務的takeList隊列
//計算當前Event body大小並增加取出隊列位元組數計數器
/* 計算event的byte大小 */
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
//更新takeByteCounter大小
takeByteCounter += eventByteSize;
return event;
}
於是我們把take事務加入,我們暫時忽略commit與rollback。具體如下圖,目前兩個事務是沒有聯繫的:
+----------+ +-------+
| Source | +---------------------------------------------------------+ | Sink |
+-----+----+ | [MemoryChannel] | +---+---+
| | +--------------------------------------------------+ | ^
| | | [MemoryTransaction] | | |
| | | | | |
| | | | | |
| | | channelCounter | | |
| | | | | |
| | | putByteCounter takeByteCounter | | |
| | | | | |
| | | +-----------+ +------------+ | | doTake |
+----------------> | putList | | takeList +----------------+
doPut | | +-----------+ +------+-----+ | |
| | ^ | |
| | | | |
| +--------------------------------------------------+ |
| | |
| | |
| | |
| +---------+ poll | |
| | queue | +---------+ |
| +---------+ |
+---------------------------------------------------------+
4.3 提交事務
commit階段主要做的事情是提交事務,此程式碼繁雜在於其包括了兩個方面的操作:
- 從putList拿數據到Queue;
- 處理 takelist後續操作,就是根據此時具體情況調整各種數值;
commit其邏輯如下:
- 計算takeList中Event數與putList中的Event差值;
int remainingChange = takeList.size() - putList.size();
- 差值小於0,說明takeList小,也就是向該MemoryChannel放的數據比取的數據要多,所以需要判斷該MemoryChannel是否有空間來放;
- 首先通過訊號量來判斷是否還有剩餘空間;這一步tryAcquire方法會將bytesRemaining的值減去putByteCounter的值,如果bytesRemaining原來的值大於putByteCounter則返回true;
- 然後判斷,在給定的keepAlive時間內,能否獲取到充足的queue空間;
- 如果上面的兩個判斷都過了,那麼把putList中的Event放到該MemoryChannel中的queue中;
- 將putList中的Event循環放入queue中;
- 面的工作完成後,清空putList和takeList,一次事務完成;
- 然後將兩個計數器置零;
- 將queueStored的值加上puts的值,更新訊號量;
- 如果takeList比putList大,說明該MemoryChannel中queue的數量應該是減少了,所以把(takeList-putList)的差值加到訊號量queueRemaining;
- 更新channelCounter中的三個變數;
具體如下:
protected void doCommit() throws InterruptedException {
//計算改變的Event數量,即取出數量-放入數量;如果放入的多,那麼改變的Event數量將是負數
//如果takeList更小,說明該MemoryChannel放的數據比取的數據要多,所以需要判斷該MemoryChannel是否有空間來放
int remainingChange = takeList.size() - putList.size(); //takeList.size()可以看成source,putList.size()看成sink
//如果remainingChange小於0,則需要獲取Channel Queue剩餘容量的訊號量
if (remainingChange < 0) { //sink的消費速度慢於source的產生速度
//利用bytesRemaining訊號量判斷是否有足夠空間接收putList中的events所佔的空間
//putByteCounter是需要推到channel中的數據大小,bytesRemainingchannel是容量剩餘
//獲取putByteCounter個位元組容量訊號量,如果失敗說明超過位元組容量限制了,回滾事務
if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
//channel 數據大小容量不足,事物不能提交
throw new ChannelException("Cannot commit transaction. Byte capacity " +
"allocated to store event body " + byteCapacity * byteCapacitySlotSize +
"reached. Please increase heap space/byte capacity allocated to " +
"the channel as the sinks may not be keeping up with the sources");
}
//獲取Channel Queue的-remainingChange個訊號量用於放入-remainingChange個Event,如果獲取不到,則釋放putByteCounter個位元組容量訊號量,並拋出異常回滾事務
//因為source速度快於sink速度,需判斷queue是否還有空間接收event
if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
//remainingChange如果是負數的話,說明source的生產速度,大於sink的消費速度,且這個速度大於channel所能承載的值
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size(); //事務期間生產的event
int takes = takeList.size(); //事務期間等待消費的event
//如果上述兩個訊號量都有空間的話,那麼把putList中的Event放到該MemoryChannel中的queue中。
//鎖住隊列開始,進行數據的流轉
synchronized (queueLock) {//操作Channel Queue時一定要鎖定queueLock
if (puts > 0) {
while (!putList.isEmpty()) { //如果有Event,則循環放入Channel Queue
if (!queue.offer(putList.removeFirst())) {
//如果放入Channel Queue失敗了,說明訊號量控制出問題了,這種情況不應該發生
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
//以上步驟執行成功,清空事務的putList和takeList
putList.clear();
takeList.clear();
}
//更新queue大小控制的訊號量bytesRemaining
//釋放takeByteCounter個位元組容量訊號量
bytesRemaining.release(takeByteCounter);
//重置位元組計數器
takeByteCounter = 0;
putByteCounter = 0;
//釋放puts個queueStored訊號量,這樣doTake方法就可以獲取數據了
queueStored.release(puts); //從queueStored釋放puts個訊號量
//釋放remainingChange個queueRemaining訊號量
if (remainingChange > 0) {
queueRemaining.release(remainingChange);
}
//ChannelCounter一些數據計數
if (puts > 0) { //更新成功放入Channel中的events監控指標數據
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) { //更新成功從Channel中取出的events的數量
channelCounter.addToEventTakeSuccessCount(takes);
}
channelCounter.setChannelSize(queue.size());
}
此處涉及到兩個訊號量:
queueStored表示Channel Queue已存儲事件容量(已存儲的事件數量),隊列取出事件時-1,放入事件成功時+N,取出失敗時-N,即Channel Queue存儲了多少事件。
- queueStored訊號量默認為0。
- 當doTake取出Event時減少一個queueStored訊號量。
- 當doCommit提交事務時需要增加putList 隊列大小的queueStored訊號量。
- 當doRollback回滾事務時需要減少takeList隊列大小的queueStored訊號量。
queueRemaining表示Channel Queue可存儲事件容量(可存儲的事件數量),取出事件成功時+N,放入事件成功時-N。
- queueRemaining訊號量默認為Channel Queue容量。其在提交事務時首先通過remainingChange = takeList.size() – putList.size()計算獲得需要增加多少變更事件;
- 如果小於0表示放入的事件比取出的多,表示有 remainingChange個事件放入,此時應該減少queueRemaining訊號量;
- 而如果大於0,則表示取出的事件比放入的多,表示有queueRemaining個事件取出,此時應該增加queueRemaining訊號量;即消費事件時減少訊號量,生產事件時增加訊號量。
而bytesRemaining是位元組容量訊號量,超出容量則回滾事務。
具體如下圖,現在整體業務已經走通:
+----------+ +-------+
| Source | +---------------------------------------------------------------+ | Sink |
+-----+----+ | [MemoryChannel] | +---+---+
| | +--------------------------------------------------------+ | ^
| | | [MemoryTransaction] | | |
| | | | | |
| | | | | |
| | | channelCounter | | |
| | | | | |
| | | putByteCounter takeByteCounter | | |
| | | | | |
| | | +-----------+ +------------+ | | doTake |
+----------------> | putList | | takeList +----------------+
doPut | | +----+------+ +------+-----+ | |
| | | ^ | |
| | | | | |
| +--------------------------------------------------------+ |
| | | poll |
| | | |
| | | |
| | doCommit +---------+ doCommit | |
| +------------> | queue | +---------+ |
| +---------+ |
+---------------------------------------------------------------+
手機如下圖:
4.4 回滾事務
當一個事務失敗時,會進行回滾,即調用本方法。在回滾時,需要把takeList中暫存的事件回滾到Channel Queue,並回滾queueStored訊號量。具體邏輯如下:
- 得到takeList中的Event數量 int takes = takeList.size();
- 首先把takeList中的Event放回到MemoryChannel中的queue中;
- 先判斷queue中能否有足夠的空間將takeList的Events放回去;
- 從takeList的尾部依次取出Event,放入queue的頭部;
- 然後清空putList;
- 因為清空了putList,所以需要把putList所佔用的空間大小添加到bytesRemaining中;
具體程式碼如下:
protected void doRollback() {
//獲取takeList的大小,然後bytesRemaining中釋放
int takes = takeList.size();
//將takeList中的Event重新放回到queue隊列中。
synchronized (queueLock) { //操作Channel Queue時一定鎖住queueLock
//前置條件判斷,檢查是否有足夠容量回滾事務
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
"Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
//回滾事務的takeList隊列到Channel Queue
while (!takeList.isEmpty()) { //takeList不為空,將其events全部放回queue
//removeLast()獲取並移除此雙端隊列的最後一個元素
queue.addFirst(takeList.removeLast());
}
//最後清空putList
putList.clear();
}
//清空了putList,所以需要把putList佔用的空間添加到bytesRemaining中
//即,釋放putByteCounter個bytesRemaining訊號量
bytesRemaining.release(putByteCounter);
//計數器重置
putByteCounter = 0;
takeByteCounter = 0;
//釋放takeList隊列大小個已存儲事件容量
queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}
具體如下圖:
+----------+ +-------+
| Source | +----------------------------------------------------------------+ | Sink |
+-----+----+ | [MemoryChannel] | +---+---+
| | +--------------------------------------------------------+ | ^
| | | [MemoryTransaction] | | |
| | | | | |
| | | | | |
| | | channelCounter | | |
| | | | | |
| | | putByteCounter takeByteCounter | | |
| | | | | |
| | | +-----------+ +------------+ | |doTake |
+----------------> | putList | | takeList +----------------+
doPut | | +----+--+---+ +----+---+---+ | |
| | | ^ | ^ | |
| | | | | | | |
| +--------------------------------------------------------+ |
| | | | | poll |
| | | | | |
| | | rollback rollback | | |
| | +--------------+ +-------------+ | |
| | | | | |
| | | v | |
| | doCommit +--+--+---+ doCommit | |
| +------------> | queue | +-----------+ |
| +---------+ |
+----------------------------------------------------------------+
手機上如圖:
0x05 動態擴容
此小節回答了如下問題:
- 可升級的,易管理,可訂製的;
MemoryChannel 中使用鎖配合訊號實現動態增減容量。
MemoryChannel會通過configure方法獲取配置文件系統,初始化MemoryChannel,其中對於配置資訊的讀取有兩種方法,只在啟動時讀取一次或者動態的載入配置文件,動態讀取配置文件時若修改了Channel 的容量大小,則會調用 resizeQueue 方法進行調整,如下:
if (queue != null) { //queue不為null,則為動態修改配置文件時,重新指定了capacity
try {
resizeQueue(capacity);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else { //初始化queue,根據指定的capacity申請雙向阻塞隊列,並初始化訊號量
synchronized (queueLock) {
queue = new LinkedBlockingDeque<Event>(capacity);
queueRemaining = new Semaphore(capacity);
queueStored = new Semaphore(0);
}
}
動態調整 Channel 容量主要分為三種情況:
-
新老容量相同,則直接返回;
-
老容量大於新容量,縮容,需先給未被佔用的空間加鎖,防止在縮容時有執行緒再往其寫數據,然後創建新容量的隊列,將原本隊列加入中所有的 event 添加至新隊列中;
-
老容量小於新容量,擴容,然後創建新容量的隊列,將原本隊列加入中所有的 event 添加至新隊列中。
具體程式碼如下:
private void resizeQueue(int capacity) throws InterruptedException {
int oldCapacity;
//首先計算擴容前的Channel Queue的容量
//計算原本的Channel Queue的容量
synchronized (queueLock) {
//老的容量=隊列現有餘額+在事務被處理了但是是未被提交的容量
oldCapacity = queue.size() + queue.remainingCapacity();
}
//新容量和老容量相等,不需要調整返回
if (oldCapacity == capacity) {//如果老容量大於新容量,縮容
return;
} else if (oldCapacity > capacity) {
//縮容
//首先要預占老容量-新容量的大小,以便縮容容量
//首先要預佔用未被佔用的容量,防止其他執行緒進行操作
//嘗試佔用即將縮減的空間,以防被他人佔用
if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
//如果獲取失敗,默認是記錄日誌然後忽略
LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
} else {
//直接縮容量
//鎖定queueLock進行縮容,先創建新capacity的雙端阻塞隊列,然後複製老Queue數據。執行緒安全
//否則,直接縮容,然後複製老Queue的數據,縮容時需要鎖定queueLock,因為這一系列操作要執行緒安全
synchronized (queueLock) {
LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
newQueue.addAll(queue);
queue = newQueue;
}
}
} else { //擴容,加鎖,創建新newQueue,複製老queue數據
//擴容
synchronized (queueLock) {
LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
newQueue.addAll(queue);
queue = newQueue;
}
//增加/減少Channel Queue的新的容量
//釋放capacity - oldCapacity個許可,即就是增加這麼多可用許可
queueRemaining.release(capacity - oldCapacity);
}
}
0x06 丟失數據的可能
回到本文最初的錯誤資訊:Space for commit to queue couldn’t be acquired。
這說明Flume是會出現數據相關問題的。我們首先分析此問題。
6.1 錯誤
6.1.1 異常原因
因為「source往putList放數據,然後提交到queue中」與「sink從channel中取數據到sink和takeList,然後再從putList取數據到queue中」這兩部分是分開來,任他們自由搶鎖,所以,當前者多次搶到鎖,後者沒有搶到鎖,同時queue的大小又太小,撐不住多次往裡放數據,就會導致觸發這個異常。
6.1.2 失敗處理
正常情況下,如果遇到此問題,flume會暫停source向channel放數據,等待幾秒鐘,這期間sink應該會消費channel中的數據,當source再次開始想channel放數據時channel就有足夠的空間了。
但是如果一直出現異常,就需要啟用解決方案。
6.1.3 解決方案
解決這個問題最直接的辦法就是增大queue的大小,增大capacity和transacCapacity之間的差距,queue能撐住多次往裡面放數據即可。
6.2 丟失數據的可能
下面我們看看Flume使用中,丟失數據的可能。
6.2.1 事務保證
根據Flume的架構原理,採用FileChannel的Flume是不可能丟失數據的,因為其內部有完善的事務機制(ACID)。
- Source到Channel是事務性的,
- Channel到Sink也是事務性的,
這兩個環節都不可能丟失數據。
6.2.2 管道容量
一旦管道中所有Flume Agent的容量之和被使用完,Flume 將不再接受來自客戶端的數據。此時,客戶端需要緩衝數據,否則數據可能會丟失。因此,配置管道能夠處理最大預期的停機時間是非常重要的。
6.2.3 MemoryChannel
Channel採用MemoryChannel時候,會出現丟失。
- MemoryChannel受記憶體空間的影響,如果數據產生的過快,同時獲取訊號量超時容易造成數據的丟失。此時Source不再寫入數據,造成未寫入的數據丟失;就是本文的情況;
- Flume進程掛掉,數據也會丟失,因為之前數據在記憶體中;
所以如果想要不丟失數據,需要採用File channel。
Memory Channel 是一個記憶體緩衝區,因此如果Java23 虛擬機(JVM)或機器重新啟動,任何緩衝區中的數據將丟失。另一方面,File Channel是在磁碟上的。即使JVM 或機器重新啟動,File Channel 也不丟失數據,只要磁碟上存儲的數據仍然是起作用的和可訪問的。機器和Agent 一旦開始運行,任何存儲在FileChannel 中的數據將最終被訪問。
6.2.4 數據重複
在Channel發送到Sink這階段,容易出現數據重複問題。
比如:如果flush到HDFS的時候,數據flush了一半之後出問題了,這意味著已經有一半的數據已經發送到HDFS上面了,現在出了問題,同樣需要調用doRollback方法來進行回滾。
回滾並沒有「一半」之說,它只會把整個takeList中的數據返回給channel,然後繼續進行數據的讀寫。這樣開啟下一個事務的時候就容易造成數據重複的問題。
所以,在某種程度上,flume對數據進行採集傳輸的時候,它有可能會造成數據的重複,但是其數據不丟失。
Flume 保證事件至少一次被送到它們的目的地,只有一次傾力寫數據,且不存在任何類型的故障事件只被寫一次。但是像網路超時或部分寫入存儲系統的錯誤,可能導致事件不止被寫一次,因為Flume 將重試寫操作直到它們完全成功。網路超時可能表示寫操作的失敗,或者只是機器運行緩慢。如果是機器運行緩慢,當Flume 重試這將導致重複。因此,確保每個事件都有某種形式的唯一標識符通常是一個好主意,如果需要,最終可以用來刪除事件數據。
0xFF 參考
事件序列化器 Flume 的無數據丟失保證,Channel 和事務
Flume 1.7 源碼分析(一)源碼編譯
Flume 1.7 源碼分析(二)整體架構
Flume 1.7 源碼分析(三)程式入口
Flume 1.7 源碼分析(四)從Source寫數據到Channel
Flume 1.7 源碼分析(五)從Channel獲取數據寫入Sink
flume到底會丟數據嗎?其可靠性如何?——輕鬆搞懂Flume事務機制
Flume架構與源碼分析-MemoryChannel事務實現
flume「Space for commit to queue couldn’t be acquired」異常產生分析
並發性標註 @GuardedBy @NotThreadSafe @ThreadSafe