如何将数据处理速度提升1000+倍

  • 2020 年 2 月 17 日
  • 筆記

以下文章来源于气象杂货铺 ,作者bugsuse

利用Python进行数据处理时经常使用的是pandas和numpy,这两个工具的功能都很强大,尤其是pandas,更是Python中数据处理方面最强大的工具之一。

但是如果不能有效利用pandas和numpy中的各种函数和方法,反而会降低数据处理的效率。

以下就以PyGotham 2019的一个演讲介绍如何大幅提升数据处理的速度。notebook和数据见文末链接,代码较多,建议下载notebook和数据测试。

import pandas as pd  import numpy as np  import re  import time    from IPython.core.interactiveshell import InteractiveShell  InteractiveShell.ast_node_interactivity = "all"    pd.set_option('max_columns', 15)  pd.set_option('chained_assignment', None)

读取数据

此CSV示例数据是用于练习如何向量化.apply函数时使用。数据已经进行了清洗和预处理。

df = pd.read_csv('sample_data_pygotham2019.csv',                   parse_dates=['Date Created', 'Original Record: Date Created'])

注:parse_dates 参数可将给定的列进行解析为日期格式。

数据检查

df.shape  df.head(5)  df.dtypes
Internal ID                               int64  ID                                        int64  Category                                 object  Lead Source                              object  Lead Source Category                     object  Current Status                           object  Status at Time of Lead                   object  Original Record: Date Created    datetime64[ns]  Date Created                     datetime64[ns]  Inactive                                 object  Specialty                                object  Providers                               float64  duplicate_leads                            bool  bad_leads                                  bool  dtype: object

条件表达式的向量化

常规条件处理都是使用if...else...语句,将函数应用到.apply方法。

def set_lead_status(row):      if row['Current Status'] == '- None -':          return row['Status at Time of Lead']      else:          return row['Current Status']
%%timeit  test = df.apply(set_lead_status, axis=1)
8.15 s ± 722 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

但是这种方法的执行速度非常慢,如果涉及数据量更大,那么无疑非常消耗时间。

np.where

np.where给定一个条件表达式,当条件表达式为真或假时返回对应的值。

%%timeit  # Pandas Series Vectorized baby!!    # you can pass the output directly into a pandas Series    df['normalized_status'] = np.where(      df['Status at Time of Lead'] == '- None -',   # <-- condition      df['Current Status'],                         # <-- return if true      df['Status at Time of Lead']                  # <-- return if false  )
20.8 ms ± 784 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.where` is {round((8.15 * 1000) / 20.8, 1)}x faster than `.apply`")
`np.where` is 391.8x faster than `.apply`

直接使用numpy数组比pandas.Series的速度要快。在上述情况下,只需要处理numpy数组而无需处理pandas.Series的所有信息,因此要更快一些。

 %%timeit  # NumPy Vectorized baby!!    df['normalized_status'] = np.where(      df['Status at Time of Lead'].values == '- None -',      df['Current Status'].values,      df['Status at Time of Lead'].values  )
9.59 ms ± 310 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
print(f"`np.where` w/ numpy vectorization is {round((8.15 * 1000) / 9.59, 1)}x faster than `.apply`")
`np.where` w/ numpy vectorization is 849.8x faster than `.apply`

当使用 raw=True选项时,会显著改善.apply的速度。此选项可将numpy数组传递给自定义函数,从而代替pd.Series对象。

但按照上述方法执行之后,耗时依然较长:

%%timeit test = df.apply(works_but_slow, axis=1, raw=True) 
8.55 s ± 160 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

np.vectorize

np.vectorize可以将python函数转换为numpy ufunc,可以处理向量化方法。可以向量化函数,而不需要应用到数据。

# Here is our previous function that I tried to vectorize but couldn't due to the ValueError.  def works_fast_maybe(col1, col2):      if col1 == '- None -':          return col2      else:          return col1
# with the np.vectorize method --> returns a vectorized callable  vectfunc = np.vectorize(works_fast_maybe) #otypes=[np.float],cache=False)
%%timeit  test3 = list(vectfunc(df['Status at Time of Lead'], df['Current Status']))
96 ms ± 4.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

有人认为使用索引设置速度更快,但其实这不是向量化方式。

def can_I_go_any_faster(status_at_time, current_status):      # this works fine if you're setting static values      df['test'] = 'test'# status_at_time      df.loc[status_at_time == '- None -', 'test'] = current_status  # <-- ValueError, indexes don't match!      df.loc[status_at_time != '- None -', 'test'] = status_at_time
%%timeit test4 = can_I_go_any_faster(df['Status at Time of Lead'], df['Current Status'])
40.5 ms ± 443 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

以下方法和上面的方法相差不大。

def can_I_go_any_faster2(status_at_time, current_status):      # this works fine if you're setting static values      df['test'] = 'test'# status_at_time      df.loc[status_at_time == '- None -', 'test'] = 'statys1_isNone'      df.loc[status_at_time != '- None -', 'test'] = 'statys2_notNone'
%%timeit test5 = can_I_go_any_faster2(df['Status at Time of Lead'], df['Current Status'])
37.8 ms ± 568 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

多条件处理

主要类别

list1 = ['LEAD-3 Flame No Contact', 'LEAD-Campaign', 'LEAD-Claim', 'LEAD-Contact Program',           'LEAD-General Pool', 'LEAD-No Contact', 'LEAD-Nurture', 'LEAD-Unqualified', 'PROSPECT-LOST']    list2 = ['- None -', 'CLIENT-Closed-Sold', 'CLIENT-Handoff', 'CLIENT-Implementation', 'CLIENT-Implementation (EMR)',           'CLIENT-Live', 'CLIENT-Partner', 'CLIENT-Referring Consultant', 'CLIENT-Transferred', 'LEAD-Engaged',           'LEAD-Long-Term Opportunity', 'PROSPECT-CURRENT', 'PROSPECT-LONG TERM', 'PROSPECT-NO DECISION']    # apply version  def lead_category(row):      if row['Original Record: Date Created'] == row['Date Created']:          return 'New Lead'      elif row['normalized_status'].startswith('CLI'):          return 'Client Lead'      elif row['normalized_status'] in list1:          return 'MTouch Lead'      elif row['normalized_status'] in list2:          return 'EMTouch Lead'      return 'NA'
%%timeit df['lead_category0'] = df.apply(lead_category, axis=1)
6.91 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

当然也可以使用np.where实现上述功能,但是实现代码很难读。

%%timeit  df['lead_category'] =       np.where(df['Original Record: Date Created'].values == df['Date Created'].values, 'New Lead',              np.where(df['normalized_status'].str.startswith('CLI').values, 'Client Lead',                      np.where(df['normalized_status'].isin(list1).values, 'MTouch Lead',                              np.where(df['normalized_status'].isin(list2).values, 'EMTouch Lead',                                       'NA')                                    )                           )                  )
96.7 ms ± 2.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

np.select

对多个条件选择或嵌套条件而言,np.select的实现方法更简单甚至速度更快。

%%timeit  conditions = [      df['Original Record: Date Created'].values == df['Date Created'].values,      df['normalized_status'].str.startswith('CLI').values,      df['normalized_status'].isin(list1).values,      df['normalized_status'].isin(list2).values  ]    choices = [      'New Lead',      'Client Lead',      'MTouch Lead',      'EMTouch Lead'  ]      df['lead_category1'] = np.select(conditions, choices, default='NA')  # Order of operations matter!
82.4 ms ± 865 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

np.selectnp.where 的输出结果是相同的。

# Their output logic is the same  (df.lead_category == df.lead_category1).all()
True
print(f"`np.select` w/ numpy vectorization is {round((6.9 * 1000) / 82.5, 2)} faster than nested .apply()")
`np.select` w/ numpy vectorization is 83.64 faster than nested .apply()
print(f"`np.select` is {round(96.7 / 83.64, 2)} faster than nested `np.where`")
`np.select` is 1.16 faster than nested `np.where`

向量化多条件表达式

# This is something you might think you can't vectorize, but you sure can!  def sub_conditional(row):      if row['Inactive'] == 'No':          if row['Providers'] == 0:              return 'active_no_providers'          elif row['Providers'] < 5:              return 'active_small'          else:              return 'active_normal'      elif row['duplicate_leads']:          return 'is_dup'      else:          if row['bad_leads']:              return 'active_bad'          else:              return 'active_good'
%%timeit  # Let's time how long it takes to apply a nested multiple condition func  df['lead_type'] = df.apply(sub_conditional, axis=1)
5.72 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

以下是利用np.select实现的多条件处理方法,速度有明显的提升。

%%timeit    # With np.select, could do .values here for additional speed, but left out to avoid too much text  conditions = [      ((df['Inactive'] == 'No') & (df['Providers'] == 0)),      ((df['Inactive'] == 'No') & (df['Providers'] < 5)),      df['Inactive'] == 'No',      df['duplicate_leads'],  # <-- you can also just evaluate boolean arrays      df['bad_leads'],  ]    choices = [      'active_no_providers',      'active_small',      'active_normal',      'is_dup',      'active_bad',  ]    df['lead_type_vec'] = np.select(conditions, choices, default='NA')
61.1 ms ± 1.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.select` is {round((5.82 * 1000) / 60.4, 2)} faster than nested .apply()")
`np.select` is 96.36 faster than nested .apply()

使用np.select时,直接获取原始数据可以进一步加速:

%%timeit    # With np.select  conditions = [      ((df['Inactive'].values == 'No') & (df['Providers'].values == 0)),      ((df['Inactive'].values == 'No') & (df['Providers'].values < 5)),      df['Inactive'].values == 'No',      df['duplicate_leads'].values,  # <-- you can also just evaluate boolean arrays      df['bad_leads'].values,  ]    choices = [      'active_no_providers',      'active_small',      'active_normal',      'is_dup',      'active_bad',  ]    df['lead_type_vec'] = np.select(conditions, choices, default='NA')
35.8 ms ± 257 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
print(f"`np.select` w/ vectorization is {round((5.72 * 1000) / 35.8, 2)} faster than nested .apply()")
`np.select` w/ vectorization is 159.78 faster than nested .apply()

复杂示例

实际应用中可能会遇到各种各样的问题,下面展示一些其他情况示例:

字符串

 # Doing a regex search to find string patterns    def find_paid_nonpaid(s):      if re.search(r'non.*?paid', s, re.I):          return 'non-paid'      elif re.search(r'Buyerzone|^paids+', s, re.I):          return 'paid'      else:          return 'unknown'
%%timeit  # our old friend .apply()  df['lead_source_paid_unpaid'] = df['Lead Source'].apply(find_paid_nonpaid)
480 ms ± 12.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

那么如何使用np.vectorize进行向量化呢?

%%timeit    vect_str = np.vectorize(find_paid_nonpaid)    df['lead_source_paid_unpaid1'] = vect_str(df['Lead Source'])
535 ms ± 9.08 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit  # How does a list comprehension do?  df['lead_source_paid_unpaid2'] = ['non-paid' if re.search(r'non.*?paid', s, re.I)                                    else 'paid' if re.search(r'Buyerzone|^paids+', s, re.I)                                    else 'unknown' for s in df['Lead Source']]
524 ms ± 16.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

pandas提供了.str方法应用于字符串操作。

 %%timeit  # How does this compare?  conditions = [      df['Lead Source'].str.contains(r'non.*?paid', case=False, na=False),      df['Lead Source'].str.contains(r'Buyerzone|^paids+', case=False, na=False),  ]    choices = [      'non-paid',      'paid'  ]    df['lead_source_paid_unpaid1'] = np.select(conditions, choices, default='unknown')
493 ms ± 13.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

如果不搜索字符串,而直接对字符串进行操作,效率如何呢?

%%timeit  df['lowerls'] = df['Lead Source'].apply(lambda x: x.lower())
58 ms ± 2.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit  df['lowerls1'] = df['Lead Source'].str.lower()
69.9 ms ± 1.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

字典查表

channel_dict = {      'Billing Service': 'BS', 'Consultant': 'PD', 'Educational': 'PD',      'Enterprise': 'PD', 'Hospital': 'PD', 'IPA': 'PD', 'MBS': 'RCM',      'MSO': 'PD', 'Medical practice': 'PD', 'Other': 'PD', 'Partner': 'PD',      'PhyBillers': 'BS', 'Practice': 'PD', 'Purchasing Group': 'PD',      'Reseller': 'BS', 'Vendor': 'PD', '_Other': 'PD', 'RCM': 'RCM'  }    def a_dict_lookup(row):      if row['Providers'] > 7:          return 'Upmarket'      else:          channel = channel_dict.get(row['Category'])          return channel
%%timeit  df['dict_lookup'] = df.apply(a_dict_lookup, axis=1)
5.15 s ± 58.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit  df['dict_lookup1'] = np.where(      df['Providers'].values > 7,      'Upmarket',      df['Category'].map(channel_dict)  )
17.5 ms ± 144 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

速度提升了 (5.15 * 1000) / 17.5 = 294.2857142857143 倍!

%%timeit  channel_values = df['Category'].map(channel_dict)  df['dict_lookup1'] = np.where(      df['Providers'] > 7,      'Upmarket',      channel_values  )
19.6 ms ± 452 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%%timeit  # Using np.vectorize to vectorize a dictionary .get() method works, but is slower than .map()  channel_values = np.vectorize(channel_dict.get)(df['Category'].values)  df['dict_lookup2'] = np.where(      df['Providers'] > 7,      'Upmarket',      channel_values  )
37.7 ms ± 730 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

两种方法得到的结果是完全相同的。

print((df['dict_lookup'] == df['dict_lookup1']).all())  print((df['dict_lookup'] == df['dict_lookup2']).all())
True  True

日期操作

# make a new column called 'Start Date' for dummy testing  # ONly do a fraction so we have some NaN values  df['Start Date'] = df['Date Created'].sample(frac=0.8)
def weeks_to_complete(row) -> float:      """Calculate the number of weeks between two dates"""      if pd.isnull(row['Start Date']):          return (row['Original Record: Date Created'] -  row['Date Created']).days / 7      else:          return (row['Date Created'] - row['Start Date']).days / 7
%%timeit  wtc1 = df.apply(weeks_to_complete, axis=1)
12.7 s ± 115 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

一个比较方便的向量化方法是使用pandas.dt获取方法,其有很多便捷的方法/属性。

%%timeit  wtc2 = np.where(      df['Start Date'].isnull().values,      (df['Original Record: Date Created'].values - df['Date Created']).dt.days / 7,      (df['Date Created'].values - df['Start Date']).dt.days / 7  )
18.6 ms ± 250 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

另一个方法是使用ndarray类型,即转换seriesnumpy timedelta数组。此方法更快,但是代码更冗余,涉及到一些基础的内容。

%%timeit  wtc3 = np.where(      df['Start Date'].isnull().values,      ((df['Original Record: Date Created'].values - df['Date Created'].values).astype('timedelta64[D]') / np.timedelta64(1, 'D')) / 7,      ((df['Date Created'].values - df['Start Date'].values).astype('timedelta64[D]') / np.timedelta64(1, 'D')) / 7  )
7.91 ms ± 113 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

速度提升了 (12.7 * 1000) / 7.91 = 1605.5625790139063 倍!

逻辑表达式所需要的值在其他行

此任务主要是想实现Excel中的函数:

=IF(A2=A1, IF(L2-L1 < 5, 0, 1), 1))

A列表示idL列表示日期。

 def time_looper(df):      """ Using a plain Python for loop"""      output = []      for i, row in enumerate(range(0, len(df))):          if i > 0:                # compare the current id to the row above              if df.iloc[i]['Internal ID'] == df.iloc[i-1]['Internal ID']:                    # compare the current date to the row above                  if (df.iloc[i]['Date Created'] - df.iloc[i-1]['Original Record: Date Created']).days < 5:                      output.append(0)                  else:                      output.append(1)              else:                  output.append(1)          else:              output.append(np.nan)      return output
def time_looper2(df):      """Using pandas dataframe `.iterrows()` method for iterating over rows"""      output = []      for i, row in df.iterrows():          if i > 0:              if df.iloc[i]['Internal ID'] == df.iloc[i-1]['Internal ID']:                  if (df.iloc[i]['Date Created'] - df.iloc[i-1]['Original Record: Date Created']).days < 5:                      output.append(0)                  else:                      output.append(1)              else:                  output.append(1)          else:              output.append(np.nan)      return output
df.sort_values(['Internal ID', 'Date Created'], inplace=True)
%%time df['time_col_raw_for'] = time_looper(df)
CPU times: user 2min 1s, sys: 7.17 ms, total: 2min 1s  Wall time: 2min 1s
%%time  df['time_col_iterrows'] = time_looper2(df)
CPU times: user 2min 19s, sys: 67.5 ms, total: 2min 19s  Wall time: 2min 19s

我们按照如下方法进行向量化:

  • 使用pandas.shift函数,将之前的值向下移动,这样就可以对比相同轴上的值
  • 使用np.select向量化条件逻辑检查
%%timeit  previous_id = df['Internal ID'].shift(1).fillna(0).astype(int)  previous_date = df['Original Record: Date Created'].shift(1).fillna(pd.Timestamp('1900'))    conditions = [      ((df['Internal ID'].values ==  previous_id) &       (df['Date Created'] - previous_date).astype('timedelta64[D]') < 5),      df['Internal ID'].values ==  previous_id  ]  choices = [0, 1]  df['time_col1'] = np.select(conditions, choices, default=1)
11.1 ms ± 49.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

速度提升了(((2 * 60) + 19) * 1000) / 11.1 = 12522.522522522522倍!

(df['time_col1'] == df['time_col_iterrows']).all()
False

其他方法

并行函数

from multiprocessing import Pool    def p_apply(df, func, cores=4):      """Pass in your dataframe and the func to apply to it"""      df_split = np.array_split(df, cores)      pool = Pool(n_cores)      df = pd.concat(pool.map(func, df_split))      pool.close()      pool.join()      return df
df = p_apply(df, func=some_big_function)

参考链接

https://gitlab.com/cheevahagadog/talks-demos-n-such/blob/master/PyGotham2019/PyGotham-updated.ipynb