mindpandas.channel
- class mindpandas.channel.DataSender(address, namespace='default', num_shards=1, dataset_name='dataset', full_batch=False, max_queue_size=10)
channel的发送方(输入端),通过channel发送新对象。
- 参数:
address (str) - 当前sender运行的节点的ip地址。
namespace (str, 可选) - channel所属的命名空间。默认值:
'default'
,sender将在命名空间default
中运行。不同命名空间的DataSender和DataReceiver不能相互连接。num_shards (int, 可选) - 指定将数据划分为多少个分片。默认值:
1
。dataset_name (str, 可选) - 数据集的名称。默认值:
'dateset'
。full_batch (bool, 可选) - 如果为
True
,则每个分片将获得sender发送的完整数据。否则,每个分片只能获取部分数据。默认值:False
。max_queue_size (int, 可选) - 队列中能够缓存的最大元素数量。默认值:
10
。
- 异常:
ValueError - 当 num_shards 为无效值时。
说明
分布式执行引擎必须提前启动。
- send(obj)
通过channel发送对象。
- 参数:
obj (Union[numpy.ndarray, list, mindpandas.DataFrame]) - 要发送的对象。
- 异常:
TypeError - 如果 obj 的类型不合法。
ValueError - 如果 obj 的长度不是正整数或不能被分片数整除。
- property num_shards
返回当前channel的 num_shards 。
- property full_batch
返回 full_batch 的值。
- get_queue(shard_id=None)
返回与指定的 shard_id 对应的数据集中尚未消费的对象引用。
- 参数:
shard_id (int, 可选) - 请求分片的id。默认值:
None
,将返回所有分片。
- 返回:
List,存储分片中数据的引用的列表。
- class mindpandas.channel.DataReceiver(address, namespace='default', shard_id=0, dataset_name='dataset')
从channel接收数据的类。负责接收来自channel的新对象。
- 参数:
address (str) - 当前receiver运行的节点的ip地址。
namespace (str, 可选) - channel所属的命名空间。默认值:
'default'
,receiver将在命名空间default
中运行。不同命名空间的DataSender和DataReceiver不能相互连接。shard_id (int, 可选) - 指定当前receiver接收数据集的哪个分片。默认值:
0
,receiver将从id为0的分片获取数据。dataset_name (str, 可选) - 数据集的名称。默认值:
'dataset'
。
说明
必须提前启动分布式执行引擎,并且提前初始化DataSender。要与正确的DataSender配对,namespace 和 dataset_name 必须与DataSender相同。
- recv()
通过channel获取数据。
- 返回:
object,分片中最近没有被消费的对象。
- 异常:
ValueError - 当前receiver的 shard_id 无效时。
- property shard_id
返回当前receiver的 shard_id 。
- property num_shards
返回当前channel的 num_shards 。