如何將數據處理速度提升1000+倍

  • 2020 年 2 月 17 日
  • 筆記

以下文章來源於氣象雜貨鋪 ,作者bugsuse

利用Python進行數據處理時經常使用的是pandas和numpy,這兩個工具的功能都很強大,尤其是pandas,更是Python中數據處理方面最強大的工具之一。

但是如果不能有效利用pandas和numpy中的各種函數和方法,反而會降低數據處理的效率。

以下就以PyGotham 2019的一個演講介紹如何大幅提升數據處理的速度。notebook和數據見文末鏈接,程式碼較多,建議下載notebook和數據測試。

import pandas as pd  import numpy as np  import re  import time    from IPython.core.interactiveshell import InteractiveShell  InteractiveShell.ast_node_interactivity = "all"    pd.set_option('max_columns', 15)  pd.set_option('chained_assignment', None)

讀取數據

此CSV示例數據是用於練習如何向量化.apply函數時使用。數據已經進行了清洗和預處理。

df = pd.read_csv('sample_data_pygotham2019.csv',                   parse_dates=['Date Created', 'Original Record: Date Created'])

註:parse_dates 參數可將給定的列進行解析為日期格式。

數據檢查

df.shape  df.head(5)  df.dtypes
Internal ID                               int64  ID                                        int64  Category                                 object  Lead Source                              object  Lead Source Category                     object  Current Status                           object  Status at Time of Lead                   object  Original Record: Date Created    datetime64[ns]  Date Created                     datetime64[ns]  Inactive                                 object  Specialty                                object  Providers                               float64  duplicate_leads                            bool  bad_leads                                  bool  dtype: object

條件表達式的向量化

常規條件處理都是使用if...else...語句,將函數應用到.apply方法。

def set_lead_status(row):      if row['Current Status'] == '- None -':          return row['Status at Time of Lead']      else:          return row['Current Status']
%%timeit  test = df.apply(set_lead_status, axis=1)
8.15 s ± 722 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

但是這種方法的執行速度非常慢,如果涉及數據量更大,那麼無疑非常消耗時間。

np.where

np.where給定一個條件表達式,當條件表達式為真或假時返回對應的值。

%%timeit  # Pandas Series Vectorized baby!!    # you can pass the output directly into a pandas Series    df['normalized_status'] = np.where(      df['Status at Time of Lead'] == '- None -',   # <-- condition      df['Current Status'],                         # <-- return if true      df['Status at Time of Lead']                  # <-- return if false  )
20.8 ms ± 784 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.where` is {round((8.15 * 1000) / 20.8, 1)}x faster than `.apply`")
`np.where` is 391.8x faster than `.apply`

直接使用numpy數組比pandas.Series的速度要快。在上述情況下,只需要處理numpy數組而無需處理pandas.Series的所有資訊,因此要更快一些。

 %%timeit  # NumPy Vectorized baby!!    df['normalized_status'] = np.where(      df['Status at Time of Lead'].values == '- None -',      df['Current Status'].values,      df['Status at Time of Lead'].values  )
9.59 ms ± 310 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
print(f"`np.where` w/ numpy vectorization is {round((8.15 * 1000) / 9.59, 1)}x faster than `.apply`")
`np.where` w/ numpy vectorization is 849.8x faster than `.apply`

當使用 raw=True選項時,會顯著改善.apply的速度。此選項可將numpy數組傳遞給自定義函數,從而代替pd.Series對象。

但按照上述方法執行之後,耗時依然較長:

%%timeit test = df.apply(works_but_slow, axis=1, raw=True) 
8.55 s ± 160 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

np.vectorize

np.vectorize可以將python函數轉換為numpy ufunc,可以處理向量化方法。可以向量化函數,而不需要應用到數據。

# Here is our previous function that I tried to vectorize but couldn't due to the ValueError.  def works_fast_maybe(col1, col2):      if col1 == '- None -':          return col2      else:          return col1
# with the np.vectorize method --> returns a vectorized callable  vectfunc = np.vectorize(works_fast_maybe) #otypes=[np.float],cache=False)
%%timeit  test3 = list(vectfunc(df['Status at Time of Lead'], df['Current Status']))
96 ms ± 4.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

有人認為使用索引設置速度更快,但其實這不是向量化方式。

def can_I_go_any_faster(status_at_time, current_status):      # this works fine if you're setting static values      df['test'] = 'test'# status_at_time      df.loc[status_at_time == '- None -', 'test'] = current_status  # <-- ValueError, indexes don't match!      df.loc[status_at_time != '- None -', 'test'] = status_at_time
%%timeit test4 = can_I_go_any_faster(df['Status at Time of Lead'], df['Current Status'])
40.5 ms ± 443 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

以下方法和上面的方法相差不大。

def can_I_go_any_faster2(status_at_time, current_status):      # this works fine if you're setting static values      df['test'] = 'test'# status_at_time      df.loc[status_at_time == '- None -', 'test'] = 'statys1_isNone'      df.loc[status_at_time != '- None -', 'test'] = 'statys2_notNone'
%%timeit test5 = can_I_go_any_faster2(df['Status at Time of Lead'], df['Current Status'])
37.8 ms ± 568 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

多條件處理

主要類別

list1 = ['LEAD-3 Flame No Contact', 'LEAD-Campaign', 'LEAD-Claim', 'LEAD-Contact Program',           'LEAD-General Pool', 'LEAD-No Contact', 'LEAD-Nurture', 'LEAD-Unqualified', 'PROSPECT-LOST']    list2 = ['- None -', 'CLIENT-Closed-Sold', 'CLIENT-Handoff', 'CLIENT-Implementation', 'CLIENT-Implementation (EMR)',           'CLIENT-Live', 'CLIENT-Partner', 'CLIENT-Referring Consultant', 'CLIENT-Transferred', 'LEAD-Engaged',           'LEAD-Long-Term Opportunity', 'PROSPECT-CURRENT', 'PROSPECT-LONG TERM', 'PROSPECT-NO DECISION']    # apply version  def lead_category(row):      if row['Original Record: Date Created'] == row['Date Created']:          return 'New Lead'      elif row['normalized_status'].startswith('CLI'):          return 'Client Lead'      elif row['normalized_status'] in list1:          return 'MTouch Lead'      elif row['normalized_status'] in list2:          return 'EMTouch Lead'      return 'NA'
%%timeit df['lead_category0'] = df.apply(lead_category, axis=1)
6.91 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

當然也可以使用np.where實現上述功能,但是實現程式碼很難讀。

%%timeit  df['lead_category'] =       np.where(df['Original Record: Date Created'].values == df['Date Created'].values, 'New Lead',              np.where(df['normalized_status'].str.startswith('CLI').values, 'Client Lead',                      np.where(df['normalized_status'].isin(list1).values, 'MTouch Lead',                              np.where(df['normalized_status'].isin(list2).values, 'EMTouch Lead',                                       'NA')                                    )                           )                  )
96.7 ms ± 2.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

np.select

對多個條件選擇或嵌套條件而言,np.select的實現方法更簡單甚至速度更快。

%%timeit  conditions = [      df['Original Record: Date Created'].values == df['Date Created'].values,      df['normalized_status'].str.startswith('CLI').values,      df['normalized_status'].isin(list1).values,      df['normalized_status'].isin(list2).values  ]    choices = [      'New Lead',      'Client Lead',      'MTouch Lead',      'EMTouch Lead'  ]      df['lead_category1'] = np.select(conditions, choices, default='NA')  # Order of operations matter!
82.4 ms ± 865 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

np.selectnp.where 的輸出結果是相同的。

# Their output logic is the same  (df.lead_category == df.lead_category1).all()
True
print(f"`np.select` w/ numpy vectorization is {round((6.9 * 1000) / 82.5, 2)} faster than nested .apply()")
`np.select` w/ numpy vectorization is 83.64 faster than nested .apply()
print(f"`np.select` is {round(96.7 / 83.64, 2)} faster than nested `np.where`")
`np.select` is 1.16 faster than nested `np.where`

向量化多條件表達式

# This is something you might think you can't vectorize, but you sure can!  def sub_conditional(row):      if row['Inactive'] == 'No':          if row['Providers'] == 0:              return 'active_no_providers'          elif row['Providers'] < 5:              return 'active_small'          else:              return 'active_normal'      elif row['duplicate_leads']:          return 'is_dup'      else:          if row['bad_leads']:              return 'active_bad'          else:              return 'active_good'
%%timeit  # Let's time how long it takes to apply a nested multiple condition func  df['lead_type'] = df.apply(sub_conditional, axis=1)
5.72 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

以下是利用np.select實現的多條件處理方法,速度有明顯的提升。

%%timeit    # With np.select, could do .values here for additional speed, but left out to avoid too much text  conditions = [      ((df['Inactive'] == 'No') & (df['Providers'] == 0)),      ((df['Inactive'] == 'No') & (df['Providers'] < 5)),      df['Inactive'] == 'No',      df['duplicate_leads'],  # <-- you can also just evaluate boolean arrays      df['bad_leads'],  ]    choices = [      'active_no_providers',      'active_small',      'active_normal',      'is_dup',      'active_bad',  ]    df['lead_type_vec'] = np.select(conditions, choices, default='NA')
61.1 ms ± 1.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.select` is {round((5.82 * 1000) / 60.4, 2)} faster than nested .apply()")
`np.select` is 96.36 faster than nested .apply()

使用np.select時,直接獲取原始數據可以進一步加速:

%%timeit    # With np.select  conditions = [      ((df['Inactive'].values == 'No') & (df['Providers'].values == 0)),      ((df['Inactive'].values == 'No') & (df['Providers'].values < 5)),      df['Inactive'].values == 'No',      df['duplicate_leads'].values,  # <-- you can also just evaluate boolean arrays      df['bad_leads'].values,  ]    choices = [      'active_no_providers',      'active_small',      'active_normal',      'is_dup',      'active_bad',  ]    df['lead_type_vec'] = np.select(conditions, choices, default='NA')
35.8 ms ± 257 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.select` w/ vectorization is {round((5.72 * 1000) / 35.8, 2)} faster than nested .apply()")
`np.select` w/ vectorization is 159.78 faster than nested .apply()

複雜示例

實際應用中可能會遇到各種各樣的問題,下面展示一些其他情況示例:

字元串

 # Doing a regex search to find string patterns    def find_paid_nonpaid(s):      if re.search(r'non.*?paid', s, re.I):          return 'non-paid'      elif re.search(r'Buyerzone|^paids+', s, re.I):          return 'paid'      else:          return 'unknown'
%%timeit  # our old friend .apply()  df['lead_source_paid_unpaid'] = df['Lead Source'].apply(find_paid_nonpaid)
480 ms ± 12.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

那麼如何使用np.vectorize進行向量化呢?

%%timeit    vect_str = np.vectorize(find_paid_nonpaid)    df['lead_source_paid_unpaid1'] = vect_str(df['Lead Source'])
535 ms ± 9.08 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit  # How does a list comprehension do?  df['lead_source_paid_unpaid2'] = ['non-paid' if re.search(r'non.*?paid', s, re.I)                                    else 'paid' if re.search(r'Buyerzone|^paids+', s, re.I)                                    else 'unknown' for s in df['Lead Source']]
524 ms ± 16.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

pandas提供了.str方法應用於字元串操作。

 %%timeit  # How does this compare?  conditions = [      df['Lead Source'].str.contains(r'non.*?paid', case=False, na=False),      df['Lead Source'].str.contains(r'Buyerzone|^paids+', case=False, na=False),  ]    choices = [      'non-paid',      'paid'  ]    df['lead_source_paid_unpaid1'] = np.select(conditions, choices, default='unknown')
493 ms ± 13.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

如果不搜索字元串,而直接對字元串進行操作,效率如何呢?

%%timeit  df['lowerls'] = df['Lead Source'].apply(lambda x: x.lower())
58 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit  df['lowerls1'] = df['Lead Source'].str.lower()
69.9 ms ± 1.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

字典查表

channel_dict = {      'Billing Service': 'BS', 'Consultant': 'PD', 'Educational': 'PD',      'Enterprise': 'PD', 'Hospital': 'PD', 'IPA': 'PD', 'MBS': 'RCM',      'MSO': 'PD', 'Medical practice': 'PD', 'Other': 'PD', 'Partner': 'PD',      'PhyBillers': 'BS', 'Practice': 'PD', 'Purchasing Group': 'PD',      'Reseller': 'BS', 'Vendor': 'PD', '_Other': 'PD', 'RCM': 'RCM'  }    def a_dict_lookup(row):      if row['Providers'] > 7:          return 'Upmarket'      else:          channel = channel_dict.get(row['Category'])          return channel
%%timeit  df['dict_lookup'] = df.apply(a_dict_lookup, axis=1)
5.15 s ± 58.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit  df['dict_lookup1'] = np.where(      df['Providers'].values > 7,      'Upmarket',      df['Category'].map(channel_dict)  )
17.5 ms ± 144 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

速度提升了 (5.15 * 1000) / 17.5 = 294.2857142857143 倍!

%%timeit  channel_values = df['Category'].map(channel_dict)  df['dict_lookup1'] = np.where(      df['Providers'] > 7,      'Upmarket',      channel_values  )
19.6 ms ± 452 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%%timeit  # Using np.vectorize to vectorize a dictionary .get() method works, but is slower than .map()  channel_values = np.vectorize(channel_dict.get)(df['Category'].values)  df['dict_lookup2'] = np.where(      df['Providers'] > 7,      'Upmarket',      channel_values  )
37.7 ms ± 730 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

兩種方法得到的結果是完全相同的。

print((df['dict_lookup'] == df['dict_lookup1']).all())  print((df['dict_lookup'] == df['dict_lookup2']).all())
True  True

日期操作

# make a new column called 'Start Date' for dummy testing  # ONly do a fraction so we have some NaN values  df['Start Date'] = df['Date Created'].sample(frac=0.8)
def weeks_to_complete(row) -> float:      """Calculate the number of weeks between two dates"""      if pd.isnull(row['Start Date']):          return (row['Original Record: Date Created'] -  row['Date Created']).days / 7      else:          return (row['Date Created'] - row['Start Date']).days / 7
%%timeit  wtc1 = df.apply(weeks_to_complete, axis=1)
12.7 s ± 115 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

一個比較方便的向量化方法是使用pandas.dt獲取方法,其有很多便捷的方法/屬性。

%%timeit  wtc2 = np.where(      df['Start Date'].isnull().values,      (df['Original Record: Date Created'].values - df['Date Created']).dt.days / 7,      (df['Date Created'].values - df['Start Date']).dt.days / 7  )
18.6 ms ± 250 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

另一個方法是使用ndarray類型,即轉換seriesnumpy timedelta數組。此方法更快,但是程式碼更冗餘,涉及到一些基礎的內容。

%%timeit  wtc3 = np.where(      df['Start Date'].isnull().values,      ((df['Original Record: Date Created'].values - df['Date Created'].values).astype('timedelta64[D]') / np.timedelta64(1, 'D')) / 7,      ((df['Date Created'].values - df['Start Date'].values).astype('timedelta64[D]') / np.timedelta64(1, 'D')) / 7  )
7.91 ms ± 113 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

速度提升了 (12.7 * 1000) / 7.91 = 1605.5625790139063 倍!

邏輯表達式所需要的值在其他行

此任務主要是想實現Excel中的函數:

=IF(A2=A1, IF(L2-L1 < 5, 0, 1), 1))

A列表示idL列表示日期。

 def time_looper(df):      """ Using a plain Python for loop"""      output = []      for i, row in enumerate(range(0, len(df))):          if i > 0:                # compare the current id to the row above              if df.iloc[i]['Internal ID'] == df.iloc[i-1]['Internal ID']:                    # compare the current date to the row above                  if (df.iloc[i]['Date Created'] - df.iloc[i-1]['Original Record: Date Created']).days < 5:                      output.append(0)                  else:                      output.append(1)              else:                  output.append(1)          else:              output.append(np.nan)      return output
def time_looper2(df):      """Using pandas dataframe `.iterrows()` method for iterating over rows"""      output = []      for i, row in df.iterrows():          if i > 0:              if df.iloc[i]['Internal ID'] == df.iloc[i-1]['Internal ID']:                  if (df.iloc[i]['Date Created'] - df.iloc[i-1]['Original Record: Date Created']).days < 5:                      output.append(0)                  else:                      output.append(1)              else:                  output.append(1)          else:              output.append(np.nan)      return output
df.sort_values(['Internal ID', 'Date Created'], inplace=True)
%%time df['time_col_raw_for'] = time_looper(df)
CPU times: user 2min 1s, sys: 7.17 ms, total: 2min 1s  Wall time: 2min 1s
%%time  df['time_col_iterrows'] = time_looper2(df)
CPU times: user 2min 19s, sys: 67.5 ms, total: 2min 19s  Wall time: 2min 19s

我們按照如下方法進行向量化:

  • 使用pandas.shift函數,將之前的值向下移動,這樣就可以對比相同軸上的值
  • 使用np.select向量化條件邏輯檢查
%%timeit  previous_id = df['Internal ID'].shift(1).fillna(0).astype(int)  previous_date = df['Original Record: Date Created'].shift(1).fillna(pd.Timestamp('1900'))    conditions = [      ((df['Internal ID'].values ==  previous_id) &       (df['Date Created'] - previous_date).astype('timedelta64[D]') < 5),      df['Internal ID'].values ==  previous_id  ]  choices = [0, 1]  df['time_col1'] = np.select(conditions, choices, default=1)
11.1 ms ± 49.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

速度提升了(((2 * 60) + 19) * 1000) / 11.1 = 12522.522522522522倍!

(df['time_col1'] == df['time_col_iterrows']).all()
False

其他方法

並行函數

from multiprocessing import Pool    def p_apply(df, func, cores=4):      """Pass in your dataframe and the func to apply to it"""      df_split = np.array_split(df, cores)      pool = Pool(n_cores)      df = pd.concat(pool.map(func, df_split))      pool.close()      pool.join()      return df
df = p_apply(df, func=some_big_function)

參考鏈接

https://gitlab.com/cheevahagadog/talks-demos-n-such/blob/master/PyGotham2019/PyGotham-updated.ipynb