MindPandas Execution Mode Introduction and Configuration Instructions
This article mainly introduces the principle and usage of MindPandas distributed parallel mode.
MindPandas Implementation Principle
MindPandas accelerates Pandas data processing through parallelized computing. The principle is to first slice the original data into a bunch of partitions, then convert the API into a general computing paradigm (map, reduce, injective_map, etc.), and then parallelize the calculation by the backend. The current MindPandas backend has two execution modes, which are multi-threaded mode and multi-process mode.
Data Slicing Principle
Slicing raw data is the basis of parallel computing. The following figure shows the process of converting pandas.DataFrame
to mindpandas.DataFrame
. According to the preset partition_shape
, the original data is divided into a specified number of partition
and partition
will be used as the basic unit of subsequent parallel computing .
The Principle of Multi-thread Mode
Multi-thread mode is implemented based on Python multi-thread. Each data partition and its corresponding computation function are executed in one thread.
Although Python’s multi-thread has a global interpreter lock (GIL) limitation, multi-thread cannot effectively utilize multi-core. However, when the amount of data is small or when dealing with IO-intensive tasks, the multi-threaded backend can still bring significant performance gains.
Principle of Multi-process Mode
The multi-process mode is not limited by Python’s global interpreter lock (GIL) and can achieve real parallel computing. The principle of multi-process mode is similar to that of multi-thread mode. The difference is that after slicing the original data, the partitions are stored in the shared memory of the distributed compute engine, and the mindpandas.DataFrame
stores the corresponding object reference
of the partitions.
When computing is required, the computing function is also stored in the shared memory of the distributed compute engine, and then the object reference
corresponding to the computing function and the object reference
corresponding to the partition is submitted to the distributed compute engine as a task. All tasks will be uniformly scheduled by the distributed compute engine and executed asynchronous parallelism and in the form of multi-process.
Single Machine Multi-process Principle
The multi-process mode can make full use of multi-core, thereby achieving performance improvements ranging from several times to dozens of times. Therefore, the multi-process mode can efficiently deal with scenarios with a large amount of data. However, due to overhead such as process creation and scheduling, performance may be affected when the amount of data processed is small.
Multi-machine Multi-process Principle
In the multi-machine multi-process mode, computing is performed on a cluster composed of multiple servers, which can make full use of the resources of multiple machines to complete computing tasks and break through the resource limitations of single machine.
MindPandas Execution Mode Configuration
Data Partition Configuration
MindPandas supports users to configure the shape of the partition according to the actual usage. Users can use set_partition_shape
to customize the number of rows and columns of the partition.
import mindpandas as pd
pd.set_partition_shape((16, 2))
df = pd.read_csv('data.csv')
df_mean = df.mean()
Multi-threaded Mode Configuration
MindPandas uses the multi-threaded mode as follows:
import mindpandas as pd
pd.set_concurrency_mode('multithread') # MindPandas will use multithread as backend
df = pd.read_csv('data.csv')
df_mean = df.mean()
Multi-process Mode Configuration
When MindPandas is installed, the built-in distributed compute engine has also been installed synchronously, which can be accessed using the command yrctl
in the console.
Note: In multi-process mode, please make sure that the cluster you start is only for your personal use. Using a cluster together with others may lead to potential security risks.
$ yrctl
Usage: yrctl [OPTIONS] COMMAND [ARGS]...
The distributed executor of MindPandas.
Options:
--help Show this message and exit.
Commands:
start used to start the fleeting cluster
stop used to stop the fleeting cluster
Single-machine Multi-process Mode Configuration
To use the distributed compute engine, we need to start the service through the command line to deploy a single-machine cluster. An example command to deploy a cluster is as follows:
yrctl start --master --address <address> --cpu <cpu> --datamem <datamem> --mem <mem>
Common parameters of the yrctl start
command are:
--master
: Flag bit, set the current node as the master node. There is only one master node in the cluster. This flag must be set when deploying a single-machine cluster.--address
: The IP address of the master node. you can fill in 127.0.0.1 when running locally. Required.--cpu
: The number of CPU cores used on this node. The weight of each CPU is 1000 (for example: if you want to use two cores, this parameter value should be set to 2000). Optional, use all available cores by default.--datamem
: The size of the shared memory, in MB. Optional, uses 25% of current free memory by default.--mem
: The total memory (including shared memory) used by MindPandas, in MB. Optional, use 75% of current free memory by default.
To view the parameter usage instructions of yrctl start
, you can view it through yrctl start --help
.
Before starting the cluster, check the following:
This machine is not configured with an http proxy for the IP address of the master node. If this, machine is configured, please cancel the proxy or add the IP address of the master node to the
$no_proxy
environment variable.No other redis service on this machine occupies port 6379, otherwise it will cause port conflict. If there is a conflict between redis or other ports, please refer to FAQ to solve it.
If the cluster deployment is successful, the end of the console echo should show:
Succeeded to start!
After the cluster is deployed, you need to set a multi-process backend to run in the Python script. The method is to call the set_concurrency_mode
interface, set the mode
to "multiprocess"
, and set the IP address of the master node to the address
parameter.
Note: We recommend calling
set_concurrency_mode
immediately afterimport mindpandas
to set the concurrency mode. Switching the parallel mode while the script is running may cause the program failure.
import mindpandas as pd
pd.set_concurrency_mode(mode="multiprocess", address="127.0.0.1")
To stop the distributed compute engine, use the yrctl stop
command:
$ yrctl stop --help
Usage: yrctl stop [OPTIONS]
used to stop the fleeting cluster
Options:
--help Show this message and exit.
After successfully stopping the distributed compute engine, the end of the echo should show:
Succeeded to stop!
Multi-machine Multi-process Mode Use
MindPandas’ multi-process backend supports building clusters on multiple machines and performs distributed computing. The cluster consists of a master node and multiple worker nodes, and services need to be started separately on each machine in the cluster. The startup method is the same as the single-machine multi-process mode, but the master node must be started first, and then other worker nodes must be started.
Start the master node:
yrctl start --master --address <address>
where address
is the IP address of the master node.
Start the worker node:
yrctl start --address=<address>
The address
is the IP address of the master node. If the deployment fails during startup, please refer to FAQ.
After the cluster is deployed, in the Python script, use the "multiprocess"
backend as shown in the following code, where address
is the IP address of the master node in the cluster.
import mindpandas as pd
pd.set_concurrency_mode("multiprocess", address="<master_ip>")
The command to stop the cluster is as follows, which needs to be executed on the master node and each worker node separately:
yrctl stop
Adaptive Concurrency Function
Because the performance of single-process computing is good enough when the amount of data is small. The parallel benefits of multi-process computing are often smaller than the extra overhead of using multi-processes, so MindPandas has added an adaptive concurrency function. When this function is enabled, MindPandas will adaptively switch the concurrency mode according to the data size to improve performance.
Enabling Adaptive Concurrency
The adaptive concurrency feature is set to off by default, and it can be turned on through the set_adaptive_concurrency
interface in a Python script:
import mindpandas as pd
pd.set_adaptive_concurrency(True)
Triggering Conditions
After the adaptive concurrency function is enabled, the conditions for automatically switching the parallel mode are as follows:
Multi-threaded mode is used when reading csv files smaller than 18MB, and multi-process mode is used in other cases.
mindpandas.DataFrame
initialized bypandas.DataFrame
. The one whose memory usage is less than 1GB will use multi-thread mode, and other cases use multi-process mode.mindpandas.DataFrame
initialized bynumpy.ndarray
. The one whose memory usage is less than 1GB will use multi-thread mode, and other cases use multi-process mode.
Precautions
After the adaptive concurrency function is activated, the parallel mode and partition shape are adjusted by MindPandas, and users cannot use
set_concurrency_mode
to modify the concurrency mode.set_adaptive_concurrency(True)
should be called at the beginning of the Python script.After setting
set_adaptive_concurrency(True)
, users are not advised to switch adaptive concurrency back toFalse
unless the Python script has finished running.
Usage Restrictions
The adaptive concurrency feature currently does not support DataFrames created from operations such as
merge
,concat
orjoin
.The concurrency mode of the initialized or read DataFrame/Series before the adaptive concurrency function is enabled cannot be changed.
The adaptive concurrency feature currently uses a specific partition shape, i.e. (2, 2) partition shape for multi-thread mode and (16, 16) partition shape for multiprocessing mode.
Other I/O operations other than
read_csv
, such asread_feather
, currently do not support adaptive concurrency.