본문 바로가기

Data Engineering/Map Reduce

[pandas] Processing Dataframes in Chunks

1. What is Chunks?

Even after optimizing the data type of the data frame and selecting the appropriate column, the size of the data set may not be suitable for memory. At this time, it is more efficients to process the entire data frame in Chunk units than to load it into memory. Only a portion of the entire row should be used in memory for a given time. In other words, we need to process tasks using only a fraction of the data, immediately combine the results, and finally put them back together.

 

When we enter a value in the chunksize keyword in the pandas.read_csv() method, we create an object called pandas.io.parser.TextFileReader. The chunk of the data frame stores the row of the input chunk size.

 

For example, if a data frame with 34,558 rows and requiring 45MB of memory exists and has 1MB of free memory for that data frame, it is possible to approximately infer how to set the chunksize. Since about 250 rows have 0.3MB, setting the chunksize to 250 allows data frames to be processed in free memory.

 

import pandas as pd 
import matplotlib.pyplot as plt 

chunk_iter = pd.read_csv('file.csv', chunksize=250) 
memory_footprints = []

for chunk in chunk_ter: 
    memory_footprints.append(chunk.memory_usage(deep=True).sum()/2**20)
    
plt.hist(memort_footprints)
plt.show()

 

2. Batch processing in Pandas

num_rows = 0
chunk_iter = pd.read_csv('file.csv', chunksize=250) 
for chunk in chunk_iter: 
    num_rows += len(chunk_iter) 
    print(num_rows)

 

The resulting num_rows value of the following code is 250, 500, 750, ..., 34558. It can be confirmed that 250 rows are stored inside the chunk. In batch processing, it is very important to divide chunks, process different chunks independently, and then merge them again.

 

Inside Python, performing operations at each chunk and joining them is very wasteful. This is because Python object reallocate memory and store it. However, this task is possible due to optimization of Pandas object. The pandas.concat() method can stack different Series rows with pre-functionally.

 

import pandas as pd 

dtype = {'col1' : 'float32', 'col2' : 'float64'} 
chunk_iter = pd.read_csv('file.csv', chunksize=250, dtype=dtype) 
distance = [] 

for chunk in chunk_iter:
    diff = chunk['col1'] - chunk['col2']
    distance.append(diff) 
df_dist = pd.concat([df, distance], axis=1)

 

3. value_counts() in chunk

chunk_iter = pd.read_csv('file.csv', chunksize=250, usecols=['col1']) 

overall_vc = []
for chunk in chunk_iter: 
    chunk_vc = chunk['col1'].value_counts() 
    overall_vc.append(chunk_vc) 
combined_vc = pd.concat(overall_vc)
print(combined_vc)

 

As a result of executing the Series.value_counts() method, the number of unique values for column for each chunk is checked, but the number of unique value for column in the entire data frame is not checked. In order to add the value according to the index of each unique value, it is necessary to add the value for each stored index.

 

The DataFrame.groupby() method creates a pandas.GroupBy object and stores an index of unique values. Thereafter, data may be processed for each unique value through the sum method.

 

chunk_iter = pd.read_csv('file.csv', chunksize=250, usecols=['col1']) 

overall_vc = [] 
for chunk in chunk_iter: 
    chunk_vc = chunk['col1'].value_counts() 
    overall_vc.append(chunk_vc)
combined_vc = pd.concat(overall_vc) 
final_vc = combined_vc.groupby(combined_vc.index).sum()

'Data Engineering > Map Reduce' 카테고리의 다른 글

[multiprocessing] What is Parallel Processing?  (0) 2022.11.08