# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""3d geometry"""
from __future__ import absolute_import
import numpy as np
from mindspore import log as logger
from .geometry_base import Geometry, GEOM_TYPES, SamplingConfig
from .geom_utils import sample, generate_mesh
from ..utils.check_func import check_param_type, check_param_type_value
_SPACE = " "
[docs]class FixedPoint(Geometry):
r"""
Definition of fixed point object.
Args:
name (str): name of the fixed point.
coord (Union[int, float, tuple, list, numpy.ndarray]): coordinate of the fixed point. if the parameter type is
tuple or list, the element support tuple[int, int], tuple[float, float], list[int, int], list[float, float].
dtype (numpy.dtype): Data type of sampled point data type. Default: ``numpy.float32``.
sampling_config (SamplingConfig): sampling configuration. Default: ``None``.
Supported Platforms:
``Ascend`` ``GPU``
Examples:
>>> from mindflow.geometry import generate_sampling_config, FixedPoint
>>> hypercube_random = dict({
... 'domain': dict({
... 'random_sampling': True,
... 'size': 1,
... 'sampler': 'uniform'
... })
... })
>>> sampling_config = generate_sampling_config(hypercube_random)
>>> point = FixedPoint("FixedPoint", [-1, 2, 1], sampling_config=sampling_config)
>>> domain = point.sampling(geom_type="domain")
>>> print(domain.shape)
(1, 3)
"""
def __init__(self, name, coord, dtype=np.float32, sampling_config=None):
if isinstance(coord, (int, float)):
coord = [coord]
super(FixedPoint, self).__init__(name, len(coord), coord, coord, dtype, sampling_config)
self.coord = coord
self.columns_dict = {}
self.length = 0.0
self.vol = 0.0
self.area = 0.0
def _inside(self, points, strict=False):
raise NotImplementedError("{}._inside not implemented".format(self.geom_type))
def _on_boundary(self, points):
raise NotImplementedError("{}._on_boundary not implemented".format(self.geom_type))
def _boundary_normal(self, points):
raise NotImplementedError("{}._boundary_normal not implemented".format(self.geom_type))
[docs] def sampling(self, geom_type="domain"):
"""
sampling points
Args:
geom_type (str): geometry type, which supports ``'domain'`` and ``'BC'``. Default: ``'domain'``.
Returns:
Numpy.ndarray, 2D numpy array with or without boundary normal vectors.
Raises:
ValueError: If `config` is ``None``.
KeyError: If `geom_type` is ``'domain'`` but `config.domain` is ``None``.
KeyError: If `geom_type` is ``'BC'`` but `config.bc` is ``None``.
ValueError: If `geom_type` is neither ``'BC'`` nor ``'domain'``.
"""
config = self.sampling_config
check_param_type_value(geom_type, _SPACE.join((self.geom_type, self.name, "'s geom_type")),
GEOM_TYPES, data_type=str)
if geom_type.lower() == "domain":
logger.info("Sampling domain points for {}:{}, config info: {}"
.format(self.geom_type, self.name, config.domain))
column_name = self.name + "_domain_points"
data = np.tile(self.coord, (self.sampling_config.domain.size, 1))
data = np.reshape(data, (-1, self.dim))
self.columns_dict["domain"] = [column_name]
data = data.astype(self.dtype)
return data
if geom_type.lower() == "bc":
logger.info("Sampling BC points for {}:{}, config info: {}"
.format(self.geom_type, self.name, config.bc))
if config.bc.with_normal:
raise ValueError("Normal is not supported on point: {}")
data = np.tile(self.coord, (self.sampling_config.bc.size, 1))
data = np.reshape(data, (-1, self.dim))
column_data = self.name + "_BC_points"
self.columns_dict["BC"] = [column_data]
data = data.astype(self.dtype)
return data
raise ValueError("Unknown geom_type: {}, only \"domain/BC\" are supported for {}:{}".format(
geom_type, self.geom_type, self.name))
[docs]class HyperCube(Geometry):
r"""
Definition of HyperCube object.
Args:
name (str): name of the hyper cube.
dim (int): number of dimensions.
coord_min (Union[int, float, tuple, list, numpy.ndarray]): minimal coordinate of the hyper cube. if the
parameter type is tuple or list, the element support tuple[int, int], tuple[float, float], list[int, int],
list[float, float].
coord_max (Union[int, float, tuple, list, numpy.ndarray]): maximal coordinate of the hyper cube. if the
parameter type is tuple or list, the element support tuple[int, int], tuple[float, float], list[int, int],
list[float, float].
dtype (numpy.dtype): Data type of sampled point data type. Default: ``numpy.float32``.
sampling_config (SamplingConfig): sampling configuration. Default: ``None``.
Raises:
TypeError: `sampling_config` is not instance of class SamplingConfig.
Supported Platforms:
``Ascend`` ``GPU``
Examples:
>>> from mindflow.geometry import generate_sampling_config, HyperCube
>>> hypercube_random = dict({
... 'domain': dict({
... 'random_sampling': True,
... 'size': 1000,
... 'sampler': 'uniform'
... }),
... 'BC': dict({
... 'random_sampling': True,
... 'size': 200,
... 'sampler': 'uniform',
... 'with_normal': False,
... }),
... })
>>> sampling_config = generate_sampling_config(hypercube_random)
>>> hypercube = HyperCube("HyperCube", 3, [-1, 2, 1], [0, 3, 2], sampling_config=sampling_config)
>>> domain = hypercube.sampling(geom_type="domain")
>>> bc = hypercube.sampling(geom_type="BC")
>>> print(domain.shape)
(1000, 3)
"""
def __init__(self, name, dim, coord_min, coord_max, dtype=np.float32, sampling_config=None):
super(HyperCube, self).__init__(name, dim, coord_min, coord_max, dtype, sampling_config)
if np.any(self.coord_max - self.coord_min <= 0.0):
raise ValueError("coord_min should be smaller than coord_max, but got coord_min: {}, coord_max: {}".format(
self.coord_min, self.coord_max))
self.columns_dict = {}
self.length = self.coord_max - self.coord_min
self.vol = np.prod(self.length)
area = 0
for i in range(dim):
area += self.vol / self.length[i]
self.area = area * 2.0
def _inside(self, points, strict=False):
"""whether inside domain"""
valid_min = (self.coord_min < points).all(axis=-1) if strict else (self.coord_min <= points).all(axis=-1)
valid_max = (self.coord_max > points).all(axis=-1) if strict else (self.coord_max >= points).all(axis=-1)
return np.logical_and(valid_min, valid_max)
def _on_boundary(self, points):
"""whether on geometry's boundary"""
near_boundary = np.logical_or(np.any(np.isclose(points, self.coord_min), axis=-1),
np.any(np.isclose(points, self.coord_max), axis=-1))
return near_boundary
def _boundary_normal(self, points):
"""get the normal vector of boundary points"""
points = self._filter_corner_points(points)
normal = np.isclose(points, self.coord_min) * -1.0 + np.isclose(points, self.coord_max) * 1.0
return normal
def _filter_corner_points(self, points):
corner = np.isclose(points, self.coord_min) + np.isclose(points, self.coord_max)
have_corner = np.count_nonzero(corner, axis=-1)
not_corner_points = np.where(have_corner <= 1)[0]
return points[not_corner_points]
def _random_domain_points(self):
"""randomly generate domain points"""
size = self.sampling_config.domain.size
sampler = self.sampling_config.domain.sampler
data = sample(size, self.dim, sampler) * (self.coord_max - self.coord_min) + self.coord_min
data = np.reshape(data, (-1, self.dim))
return data
def _grid_domain_points(self):
"""generate domain mesh points"""
mesh_size = self.sampling_config.domain.size
if len(mesh_size) != self.dim:
raise ValueError("For grid sampling, length of mesh_size list: {} should be equal to dimension: {}".format(
mesh_size, self.dim
))
mesh_x = generate_mesh(self.coord_min, self.coord_max, mesh_size)
data = np.reshape(mesh_x, (-1, self.dim))
return data
def _random_boundary_points(self, need_normal=False):
"""get boundary points randomly"""
size = self.sampling_config.bc.size
sampler = self.sampling_config.bc.sampler
boundary_points = []
for i in range(self.dim):
area_i = self.vol / self.length[i]
ratio = area_i * 2 / self.area
num_sample = int(size * ratio)
temp_boundary_points = sample(num_sample, self.dim, sampler)
temp_boundary_points[np.arange(num_sample), i] = np.round(temp_boundary_points[np.arange(num_sample), i])
boundary_points.extend([temp_boundary_points])
data = np.concatenate(boundary_points, axis=0)
data = np.random.permutation(data)
data = data * (self.coord_max - self.coord_min) + self.coord_min
if need_normal:
data = self._filter_corner_points(data)
data = np.reshape(data, (-1, self.dim))
data_normal = self._boundary_normal(data)
data_normal = np.reshape(data_normal, (-1, self.dim))
return data, data_normal
data = np.reshape(data, (-1, self.dim))
return data
def _grid_boundary_points(self, need_normal=False):
"""get gird boundary points"""
size = self.sampling_config.bc.size
if self.dim == 1:
ds = self.area / 5.0
else:
ds = (self.area / size) ** (1 / (self.dim - 1))
mesh_size = (self.length / ds).astype(np.int64)
domain_data = generate_mesh(self.coord_min, self.coord_max, mesh_size)
bound_cond = np.where(self._on_boundary(domain_data))[0]
data = domain_data[bound_cond]
data = np.reshape(data, (-1, self.dim))
data = np.random.permutation(data)
if need_normal:
data = self._filter_corner_points(data)
data_normal = self._boundary_normal(data)
data_normal = np.reshape(data_normal, (-1, self.dim))
return data, data_normal
return data
def _get_sdf(self, domain_points):
"""calculate sign distance function"""
center = (self.coord_min + self.coord_max) / 2.0
p_dist = np.abs(domain_points - center) - (self.coord_max - center)
sdf = np.linalg.norm(np.maximum(p_dist, 0.0)) + np.minimum(np.max(p_dist, axis=1), 0.0)
return -sdf
[docs] def sampling(self, geom_type="domain"):
"""
sampling points
Args:
geom_type (str): geometry type: can be ``'domain'`` or ``'BC'``. Default: ``'domain'``.
- ``'domain'``, feasible domain of the problem.
- ``'BC'``, boundary of the problem.
Returns:
Numpy.ndarray, if the with_normal property of boundary configuration is true, returns 2D numpy array with
boundary normal vectors. Otherwise, returns 2D numpy array without boundary normal vectors.
Raises:
ValueError: If `config` is ``None``.
KeyError: If `geom_type` is ``'domain'`` but `config.domain` is ``None``.
KeyError: If `geom_type` is ``'BC'`` but `config.bc` is ``None``.
ValueError: If `geom_type` is neither ``'BC'`` nor ``'domain'``.
"""
config = self.sampling_config
check_param_type(config, _SPACE.join((self.geom_type, self.name, "'s sampling_config")),
data_type=SamplingConfig)
check_param_type_value(geom_type, _SPACE.join((self.geom_type, self.name, "'s geom_type")),
GEOM_TYPES, data_type=str)
if geom_type.lower() == "domain":
check_param_type(config.domain, _SPACE.join((self.geom_type, self.name, "'s domain config")),
exclude_type=type(None))
if config.domain is None:
raise KeyError("Sampling config for domain of {}:{} should not be none"
.format(self.geom_type, self.name))
logger.info("Sampling domain points for {}:{}, config info: {}"
.format(self.geom_type, self.name, config.domain))
column_name = self.name + "_domain_points"
if config.domain.random_sampling:
data = self._random_domain_points()
else:
data = self._grid_domain_points()
self.columns_dict["domain"] = [column_name]
data = data.astype(self.dtype)
if config.domain.with_sdf:
sdf = self._get_sdf(data)
sdf = sdf.astype(self.dtype)
sdf_column_name = self.name + "_domain_sdf"
self.columns_dict.get("domain").append(sdf_column_name)
return data, sdf
return data
if geom_type.lower() == "bc":
check_param_type(config.bc, _SPACE.join((self.geom_type, self.name, "'s bc config")),
exclude_type=type(None))
logger.info("Sampling BC points for {}:{}, config info: {}"
.format(self.geom_type, self.name, config.bc))
if config.bc.with_normal:
if config.bc.random_sampling:
data, data_normal = self._random_boundary_points(need_normal=True)
else:
data, data_normal = self._grid_boundary_points(need_normal=True)
column_data = self.name + "_BC_points"
column_normal = self.name + "_BC_normal"
self.columns_dict["BC"] = [column_data, column_normal]
data = data.astype(self.dtype)
data_normal = data_normal.astype(self.dtype)
return data, data_normal
if config.bc.random_sampling:
data = self._random_boundary_points(need_normal=False)
else:
data = self._grid_boundary_points(need_normal=False)
column_data = self.name + "_BC_points"
self.columns_dict["BC"] = [column_data]
data = data.astype(self.dtype)
return data
raise ValueError("Unknown geom_type: {}, only \"domain/BC\" are supported for {}:{}".format(
geom_type, self.geom_type, self.name))