mindspore_rl.utils.discounted_return 源代码

# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Discounted return.
"""

import mindspore
import mindspore.nn as nn
from mindspore import Tensor
from mindspore.ops import operations as P
from mindspore import context
import mindspore.ops.operations._rl_inner_ops as rl_ops


[文档]class DiscountedReturn(nn.Cell): r""" Calculate discounted return. Set discounted return as :math:`G`, discounted factor as :math:`\gamma`, reward as :math:`R`, timestep as :math:`t`, max timestep as :math:`N`. Then :math:`G_{t} = \Sigma_{t=0}^N{\gamma^tR_{t+1}}` For the reward sequence contain multi-episode, :math:`done` is introduced for indicating episode boundary, :math:`last\_state\_value` represents value after final step of last episode. Args: gamma (float): Discounted factor between [0, 1]. need_bprop (bool): Whether need to calculate the backpropagation of discounted returns. Default: False. Inputs: - **reward** (Tensor) - The reward sequence contains multi-episode. Tensor of shape :math:`(Timestep, Batch, ...)` - **done** (Tensor) - The episode done flag. Tensor of shape :math:`(Timestep, Batch)`. The data type must be bool. - **last_state_value** (Tensor) - The value after final step of last episode. Tensor of shape :math:`(Batch, ...)` Returns: Discounted return. Examples: >>> net = DiscountedReturn(gamma=0.99) >>> reward = Tensor([[1, 1, 1, 1]], dtype=mindspore.float32) >>> done = Tensor([[False, False, True, False]]) >>> last_state_value = Tensor([2.], dtype=mindspore.float32) >>> ret = net(reward, done, last_state_value) >>> print(output.shape) (2, 2) """ def __init__(self, gamma, need_bprop=False): super(DiscountedReturn, self).__init__() if gamma > 1.0 or gamma < 0.0: raise ValueError('The discounted factor should be a number in range [0, 1], but got {}.'.format(gamma)) # Fused operator only supported in GPU backend so far. Ascend and CPU backends will support it soon. self.enable_op_fusion = context.get_context('device_target') in ['GPU'] self.need_bprop = need_bprop self.fused_op = rl_ops.DiscountedReturn(gamma) self.gamma = Tensor([gamma], mindspore.float32) self.zeros_like = P.ZerosLike() def construct(self, reward, done, last_state_value): """ Returns discounted return. """ if self.enable_op_fusion and not self.need_bprop: return self.fused_op(reward, done, last_state_value) discounted_return = self.zeros_like(reward) step = reward.shape[0] - 1 while step >= 0: last_state_value = reward[step] + (1 - done[step]) * self.gamma * last_state_value discounted_return[step] = last_state_value step -= 1 return discounted_return