# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
The class define action space and observation class.
"""
import numpy as np
from mindspore.common import dtype as mstype
np_types = (
np.int8,
np.int16,
np.int32,
np.int64,
np.uint8,
np.uint16,
np.uint32,
np.uint64,
np.float16,
np.float32,
np.float64,
np.bool_,
)
[docs]class Space:
"""
The class for environment action/observation space.
Args:
feature_shape (Union[list(int), tuple(int), int]): The action/observation shape before batching.
dtype (np.dtype): The action/observation space dtype.
low (Union[int, float], optional): The action/observation space lower boundary. Default: ``None`` .
high (Union[int, float], optional): The action/observation space upper boundary. Default: ``None`` .
batch_shape (Union[list(int), tuple(int), int], optional): The batch shape for vectorization.
It usually be used in multi-environment and multi-agent cases. Default: ``None`` .
mask (Sequence[int], optional): The mask for discrete action space. Default: ``None`` .
Examples:
>>> action_space = Space(feature_shape=(6,), dtype=np.int32)
>>> print(action_space.ms_dtype)
Int32
"""
def __init__(
self, feature_shape, dtype, low=None, high=None, batch_shape=None, mask=None
):
if not issubclass(dtype, np_types):
raise ValueError(f"Dtype {dtype} not supported!")
self._feature_shape = tuple(feature_shape)
self._dtype = dtype
self._batch_shape = tuple(batch_shape) if batch_shape is not None else tuple()
self._low, self._high = self._range(low, high)
self._mask = mask
[docs] def sample(self):
"""
Sample a valid action from the space
Returns:
Tensor, a valid action.
"""
if self.is_discrete:
if self._mask is not None:
return np.array([self._get_action(mask) for mask in self._mask]).astype(
self._dtype
)
return np.random.randint(
low=self._low, high=self._high, size=self.shape
).astype(self._dtype)
return np.random.uniform(
low=self._low, high=self._high, size=self.shape
).astype(self._dtype)
@property
def shape(self):
"""
Space shape after batching.
Returns:
The shape of current space.
"""
return self._batch_shape + self._feature_shape
@property
def np_dtype(self):
"""
Numpy data type of current Space.
Returns:
The numpy dtype of current space.
"""
return self._dtype
@property
def ms_dtype(self):
"""
MindSpore data type of current Space.
Returns:
The mindspore data type of current space.
"""
return mstype.pytype_to_dtype(self._dtype)
@property
def is_discrete(self):
"""
Is discrete space.
Returns:
Whether the current space is discrete or continuous.
"""
return issubclass(self._dtype, np.integer) or self._dtype == np.bool_
@property
def num_values(self):
"""
available action number of current Space.
Returns:
The available action of current space.
"""
if not self.is_discrete:
return self.shape[-1]
enums_range = self._high - self._low
if enums_range.shape == ():
return enums_range.item(0)
num = 1
for i in enums_range:
num *= i.item(0)
return num
@property
def boundary(self):
"""
The space boundary of current Space.
Returns:
Uppoer and lower boundary of current space.
"""
return self._low, self._high
def _range(self, low, high):
"""Return the space range."""
if self.is_discrete:
if self._dtype == np.bool_:
dtype_low, dtype_high = 0, 2
else:
dtype_low, dtype_high = (
np.iinfo(self._dtype).min,
np.iinfo(self._dtype).max,
)
else:
dtype_low, dtype_high = np.finfo(self._dtype).min, np.finfo(self._dtype).max
low = dtype_low if low is None else low
high = dtype_high if high is None else high
return np.broadcast_to(low, self._feature_shape), np.broadcast_to(
high, self._feature_shape
)
def _get_action(self, mask):
return (
np.broadcast_to(np.random.choice(np.where(mask)[0]), self._feature_shape)
+ self._low
)
def __repr__(self):
return f"shape {self.shape}, dtype {self.np_dtype}, range ({self._low}, {self._high})"
def __str__(self):
return self.__repr__()