Data cleaning is an essential step to prepare your data for the analysis. While cleaning the data, every now and then, there’s a need to create a new column in the Pandas dataframe. It’s usually conditioned on a function which manipulates an existing column. A strategic way to achieve that is by using Apply function. I want to address a couple of bottlenecks here:
- Pandas: The Pandas library runs on a single thread and it doesn’t parallelize the task. Thus, if you are doing lots of computation or data manipulation on your Pandas dataframe, it can be pretty slow and can quickly become a bottleneck.
- Apply(): The Pandas apply() function is slow! It does not take the advantage of vectorization and it acts as just another loop. It returns a new Series or dataframe object, which carries significant overhead.
So now, you may ask, what to do and what to use? I am going to share 4 techniques that are alternative to Apply function and are going to improve the performance of operation in Pandas dataframe.
Let’s assume, my code using apply function looks like:
df['country'] = df.user_location.apply(lambda row: random_function(row) if (pd.notnull(row)) else row)
where df is dataframe, user_location is a column in df dataframe on which I am applying the function, random_function is a method that I am applying on every row of the user_location column in df dataframe.
np.vectorize
np.vectorize(random_function)(df['User_loc'])
It converts your input function input function into a Universal function (“ufunc”) via np.frompyfunc
. It helps in some level of optimisation (caching) which leads to performance gain.
Dask Library
As per its own website, Dask uses existing Python APIs and data structures to make it easy to switch between Numpy, Pandas, Scikit-learn to their Dask powered equivalents. It’s open source and a flexible library for parallel computing in Python.
First thing first,
!pip3 install dask
and then import the following:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import multiprocessing
df['User_country'] = dd.from_pandas(df.User_loc, npartitions=4*multiprocessing.cpu_count())\
.map_partitions(lambda dframe: dframe.apply(lambda row: random_function(row) if (pd.notnull(row)) else row)).compute(scheduler='processes')
Understanding the components of the code snippet above:
- The dd.from_pandas(df.User_loc, npartitions=4*multiprocessing.cpu_count()) divides the pandas dataframe into chunks. The npartitions denotes the number of pandas dataframe that constitute a single Dask dataframe. In general, go for a just a few more partitions than the number of cores in your machine – not too less (you won’t use all the cores), not too many (a lot of overhead in deciding where to compute each task).
- map_partitions is used for applying the lambda function to each partition.
- compute function’s explanation can be read from the official documentation. Just note that inside compute function, we have assigned ‘processes’ to the scheduler. The primary purpose of using ‘processes’ is so that our scheduler uses all CPU cores. There are 2 more options that you can use instead of ‘processes’. They are: ‘threads’ and ‘single-threaded’.
Swifter Library
It’s a package that efficiently applies any function to a pandas dataframe or series in the fastest available manner.
First, you will need to install swifter (replace pip with pip3 if you are on Python 3.x).
pip install swifter
If you are installing directly on jupyter notebook,
!pip3 install swifter
Subsequently,
import swifter
df.User_loc.swifter.apply(lambda row: random_function(row) if (pd.notnull(row)) else row)
Swifter, essentially, vectorizes when possible. When vectorization is not possible, it switches to dask parallel processing or a simple pandas apply. Thus, swifter uses pandas apply when it leads to faster computation time for smaller data sets, and it shifts to dask parallel processing when that is a faster option for large data sets.
Rapids CuDF
Rapids, is an open source framework from NVIDIA for GPU accelerated end-to-end Data Science and Analytics. cuDF is a Python-based GPU DataFrame library for working with data including loading, joining, aggregating, and filtering data. One of the major advantage here is, cuDF’s API is a mirror of Pandas library. It means that, you just need to convert your pandas dataframe into a cuDF dataframe and that’s all! You will be blessed with the GPU lords!
To show you an example of how to implement this functionality, I am going to borrow the steps heavily from a blog which is this one. There are few more blogs out there which assume that your local machine is configured with a GPU. Since mine isn’t, I used Google Colab. But I ran into several problems in colab and that’s where the blog that I have linked up, helped me.
Assuming that you have all the working knowledge of colab and how to turn on its GPU (Runtime -> Change Runtime Type) , I will cut straight to the chase.
Check the GPU allotted to you by colab (it will run when you are allotted NVIDIA Tesla GPU)
!nvidia-smi
The next step is to install cuDF library.
!pip3 install cudf-cuda100
There a few steps which are required in colab, failing to do those results in a lot of inexplicable errors. Tried and tested!
dev_lib_path = !find / -iname 'libdevice'
nvvm_lib_path = !find / -iname 'libnvvm.so'
rmm_lib_path = !find / -iname 'librmm.so'
import os
if len(dev_lib_path) > 0:
os.environ['NUMBAPRO_LIBDEVICE'] = dev_lib_path[0]
else:
print('The device lib is missing.')
if len(nvvm_lib_path) > 0:
os.environ['NUMBAPRO_NVVM'] = nvvm_lib_path[0]
else:
print('NVVM is missing.')
if len(rmm_lib_path) > 0:
os.environ['NUMBAPRO_RMM'] = rmm_lib_path[0]
else:
print('RMM missing')
Now, you just need to import cuDF, and any other python package that you need and you will be good to go!
import cudf
import numpy as np
df = cudf.DataFrame({'a': np.random.randint(0, 1000, size=1000),
'b': np.random.randint(0, 1000, size=1000)})
Or if you had a pandas dataframe and you want to convert it into a cuDF dataframe, you can do:
df = pd.DataFrame({'a': np.random.randint(0, 1000, size=1000),
'b': np.random.randint(0, 1000, size=1000)})
cudf_df = cudf.DataFrame.from_pandas(pandas_df)
You can implement your own GPU accelerated pandas dataframe operations and run all the steps end-to-end on this colab notebook.
This wraps up my article in which I wanted to share with you a few techniques through which you can speed up your Pandas performance. I did this research because of the similar performance issue that, I thought I was facing. Unfortunately, none of the techniques above, were giving me satisfactory performance gain. I eventually posted a question on stackoverflow and came to know that I had an IO issue (because of API calls) and not much of a CPU issue. So I landed up using lru cache. However, I am glad I did all the research and learnt so much so that I could share this blog with you! Do let me know if you know of any other technique or any feedback or suggestions.
Thanks for reading and I have written other posts related to software engineering and data science as well. You might want to check them out here. You can also subscribe to my blog to receive relevant blogs straight in your inbox and reach out to me here. I am also mentoring in the areas of Data Science, Career and Life in Singapore. You can book an appointment to talk to me.
Hi Shubham
In Dask Library example we have “map_partitions” to apply function o each partition. Do we have similar reduce function to club some data from all partition.
As in each map_parition I am appending data to my parent list. I wan to have consolidated list at end. But I have only list from main thread.
My Code is something like this:
x_train=[]
y_train=[]
def my_func(o1,r1,a1,a2,a3,a4,a5 ):
print(“1: “,o1,r1,a1,a2,a3,a4,a5)
x_train.append([[a1,o1,r1 ],[a2,o1,r1],[a3,o1,r1],[a4,o1,r1]])
y_train.append(a5)
print(“x_train len: “,len(x_train), “y_train len: “,len(y_train) )
return o1
dataset_rnn[‘dummy’] = dd.from_pandas(dataset_rnn, npartitions=4*multiprocessing.cpu_count())\
.map_partitions(lambda dframe: dframe.apply(lambda row: my_func(row.org_id,row.role_id,row.actionsperformed,row.snd_operation_id,row.trd_operation_id,row.frth_operation_id,row.fith_operation_id), axis = 1)).compute(scheduler=’processes’)
print(“x_train”,x_train)
print(“y_train”,y_train)
dataset_rnn