Tensorflow 窗口時間序列數據的處理
Tensorflow 時間序列數據的處理
數據集簡介
數據來源:Kaggle Ubiquant Market Prediction
數據集描述了多個投資項目在一個時間序列下的300個匿名特徵(”f_0″至”f_299″)以及一個目標特徵(”target”)。要求根據後續時間節點的匿名特徵預測目標特徵。
本文的主要目標是構建特定長度的時間序列RNN網路訓練和測試集。
訓練集和驗證集、測試集的劃分
由於給出的要求是預測後續時間點的目標特徵,模型的建立是基於過去的模式在將來依然存在。因此,對於這樣的模型,跨時間劃分訓練集、驗證集和測試集是合理的。數據集中給出了時間序號(”time_id”)從0開始至1219,共計3141410條。取其中百分之二作為測試集,從時間序號1201至1219。
窗口序列數據的獲取和應用
解決該問題的思路很簡單。將該數據集中各投資項目視為獨立的時間序列,可以先根據investment_id劃分數據集,再在劃分後的數據集上分別通過滑動窗口的方法獲取定長的時間序列數據。
但在實際應用中會遇到一些問題。首先,通過滑動窗口的方法獲取的時間序列數據有較大的重複性。假設目標的時間序列長度為20,若將窗口序列數據集直接寫入磁碟會佔用原數據集近二十倍的空間。
相對應的,在訓練過程中完全採用實時計算獲取窗口序列也不是一個可取的方法。計算窗口序列的過程會在每個epoch中重複執行,計算函數的效率直接影響到訓練的速度。
一個折中的方案是只將窗口序列中各時間點的數據在原數據集中對應的序號的記錄下來作為序號數據集寫入磁碟。在訓練過程中通過讀取原數據集和序號數據集生成batch。
由於RNN網路允許不定長的時間序列作為輸入,而非矩陣形式的批次回影響輸入的效率,故通過全零填充未達到要求長度的窗口序列並為此在原數據集中插入一行全零行(注意:全零行的插入需要在標準化、歸一化等預處理操作之後)。
MIN_LEN = 20 # 最小窗口序列長度,低於該長度的窗口序列會被全零行填充
FEATURE_NUM = 300
ZERO_INDEX = 3141410 # 全零行序號
def form_indexes(data,time_range): # data:原數據集 time_range:時間序列範圍
id_list = sorted(data['investment_id'].unique())
if 0 in id_list:
id_list.remove(0)
indexes_list = []
for id in tqdm(id_list):
sub_data = data[data['investment_id']==id].sort_values(by=['time_id'])
time_list = tuple(sorted(sub_data['time_id'].unique()))
for t in range(time_range[0],time_range[1]):
if t in time_list:
i_t = time_list.index(t)
temp = list(sub_data[max(i_t-MIN_LEN+1,0):i_t+1].index.values)
indexes = [ZERO_INDEX]*(MIN_LEN-len(temp)) + temp
return indexes_list
在訓練前構建窗口序列數據訓練集和測試集(驗證集)
通過tf.data.Dataset的from_generator方法構建數據集的益處在於只有在數據被使用時(讀取或預讀取)才會運行生成器函數,不會佔用過多記憶體,同時shuffle和分批次等操作都能較為簡便的完成。
train_indexset= pd.read_parquet('trainindex.parquet')
val_indexset= pd.read_parquet('valindex.parquet')
def gen_func(train_val_or_test): # 生成器函數
if train_val_or_test == 1:
for indexes in train_indexset.iterrows():
features = data.iloc[indexes[1].values].values[:,4:]
label = data.iloc[indexes[1].values[-1]]['target']
yield (features,label)
elif train_val_or_test == 2:
for indexes in val_indexset.iterrows():
features = data.iloc[indexes[1].values].values[:,4:]
label = data.iloc[indexes[1].values[-1]]['target']
yield (features,label)
else:
print("error input")
raise ValueError
# 指定輸出的形狀和數據類型
featureSpec = tf.TensorSpec(
shape=[MIN_LEN,FEATURE_NUM],
dtype=tf.dtypes.float32,
name=None
)
labelSpec = tf.TensorSpec(
shape=[],
dtype=tf.dtypes.float32,
name=None
)
train_data = tf.data.Dataset.from_generator(generator=gen_func,args=[1] ,output_signature=(featureSpec,labelSpec))
val_data = tf.data.Dataset.from_generator(generator=gen_func,args=[2] ,output_signature=(featureSpec,labelSpec))
以下模型和超參數只做展示用途所用,不具有指導意義。
MIN_LEN = 20
FEATURE_NUM = 300
BATCH_SIZE = 1000
EPOCH_NUM = 50
def build_RNNmodel():
model = tf.keras.models.Sequential(
[
tf.keras.layers.Masking(mask_value=0.,
input_shape=(MIN_LEN, FEATURE_NUM)),
tf.keras.layers.LSTM(1024,activation='tanh',
return_sequences=True,
dropout=0.5,
kernel_initializer=tf.initializers.TruncatedNormal(stddev=0.01),
),
tf.keras.layers.LSTM(256,activation='tanh',
dropout=0.5,
kernel_initializer=tf.initializers.TruncatedNormal(stddev=0.01),
),
tf.keras.layers.Dense(1,activation='relu')
]
)
return model
train_batchs = train_data.batch(batch_size=BATCH_SIZE).prefetch(BATCH_SIZE)
val_batchs = val_data.batch(batch_size=BATCH_SIZE).prefetch(BATCH_SIZE)
# 設置prefetch可以預讀取後續批次數據提高運行速度
model = build_RNNmodel()
model.compile(loss='mae', optimizer=tf.keras.optimizers.Adam(0.0001))
history = model.fit(train_batchs,epochs=EPOCH_NUM,validation_data=val_batchs)
這裡只取了一部分整體數據的一部分作為演示,每個batch有1000條窗口序列,每個epoch有451個batch,運行一個epoch的時間約為530秒。