mindflow.cfd.runtime 源代码

# Copyright 2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""simulation run time controller."""
from mindspore import numpy as mnp
from mindspore import Tensor, jit_class


[文档]@jit_class class RunTime: r""" Simulation run time controller. Args: config (dict): The dict of parameters. mesh_info (MeshInfo): The information of the compute mesh. material (Material): The fluid material model. Supported Platforms: ``GPU`` Examples: >>> from mindflow import cfd >>> config = {'mesh': {'dim': 1, 'nx': 100, 'gamma': 1.4, 'x_range': [0, 1], 'pad_size': 3}, ... 'material': {'type': 'IdealGas', 'heat_ratio': 1.4, 'specific_heat_ratio': 1.4, ... 'specific_gas_constant': 1.0}, 'runtime': {'CFL': 0.9, 'current_time': 0.0, 'end_time': 0.2}, ... 'integrator': {'type': 'RungeKutta3'}, 'space_solver': {'is_convective_flux': True, ... 'convective_flux': {'reconstructor': 'WENO5', 'riemann_computer': 'Rusanov'}, ... 'is_viscous_flux': False}, 'boundary_conditions': {'x_min': {'type': 'Neumann'}, ... 'x_max': {'type': 'Neumann'}}} >>> s = cfd.Simulator(config) >>> r = cfd.RunTime(c, s.mesh_info, s.material) """ def __init__(self, config, mesh_info, material): self.current_time = Tensor(config.get('current_time', 0.0)) self.end_time = config.get('end_time', 0.0) self.fixed_timestep = config.get('fixed_timestep', False) if self.fixed_timestep: self.timestep = Tensor(config['timestep']) else: self.timestep = None self.cfl = config.get('CFL', 0.0) self.mesh_info = mesh_info self.material = material self.eps = 1e-8
[文档] def compute_timestep(self, pri_var): """ Computes the physical time step size. Args: pri_var (Tensor): The primitive variables. """ if not self.fixed_timestep: tmp = [] for axis in self.mesh_info.active_axis: tmp.append(self.mesh_info.cell_sizes[axis]) min_cell_size = min(tmp) sound_speed = self.material.sound_speed(pri_var) abs_velocity = 0.0 for i in self.mesh_info.active_axis: abs_velocity += (mnp.abs(pri_var[i + 1, :, :, :]) + sound_speed) dt = min_cell_size / (mnp.max(abs_velocity) + self.eps) self.timestep = self.cfl * dt print("current time = {:.6f}, time step = {:.6f}".format(self.current_time.asnumpy(), self.timestep.asnumpy()))
[文档] def advance(self): """ Simulation advance according to the timestep. Raises: NotImplementedError: If `timestep` is invalid. """ if not self.timestep: raise NotImplementedError() self.current_time += self.timestep
[文档] def time_loop(self, pri_var): """ Weather to continue the simulation. When current time reaches end time or NAN value detected, return False. Args: pri_var (Tensor): The primitive variables. Returns: Bool. Weather to continue the simulation. Raises: ValueError: If pri_var has NAN values. """ if mnp.isnan(pri_var).sum() > 0: raise ValueError('Nan value detected!') if self.timestep and self.timestep < self.eps: return False return self.current_time < self.end_time