# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
pointcloud.py supports loading CAD stp format dataset for 3-D physical simulation computing,
and provides operations related to point cloud data.
"""
import os
import re
from enum import IntEnum
from functools import reduce
from importlib import import_module
from math import sqrt, inf
import multiprocessing as mp
from operator import mul
from time import time
import numpy as np
from mindspore import log as logger
from mindelec._c_minddata import PointCloudImpl
from mindelec._c_minddata import PhysicalQuantity as PhyQuantity
from .pointcloud_util import json_2_dict
from .pointcloud_util import load_data
from .pointcloud_util import SectionInfo
from .pointcloud_util import sections_generation
from .pointcloud_util import bbox_for_one_shape
from .pointcloud_util import minimum_distance
from .pointcloud_util import inner_point_generation
[docs]class SamplingMode(IntEnum):
r"""
Point sampling method, at present support UPPERBOUND(0) and DIMENSIONS(1).
- 'UPPERBOUND': limit the sampling points number upperbound within whole sampling space, the other space
parameters such as sampling number on each axis can be automatically computed according to the space
size ratio.
- 'DIMENSIONS': users can specify the sampling number in each dimension, the axis order is x:y:z.
Supported Platforms:
``Ascend``
"""
UPPERBOUND = 0
DIMENSIONS = 1
[docs]class BBoxType(IntEnum):
r"""
Bounding box for sampling space, only supports cube-shape sampling space, at present supports STATIC(0) and
DYNAMIC(1).
- 'DYNAMIC', generate sampling bbox from the bbox of all 3-D topology models and space extension
constants, models bbox can be computed automatically after read all files, then add extension
constants on each direction the DYNAMIC sampling bbox can be obtained. Each model is different.
Space=(x_min - x_neg, y_min - y_neg, z_min - z_neg, x_max + x_pos, y_max + y_pos, z_max + z_pos)
- 'STATIC', users can specify the sampling space on each dimension,
in (x_min, y_min, z_min, x_max, y_max, z_max) order.
Supported Platforms:
``Ascend``
"""
STATIC = 0
DYNAMIC = 1
[docs]class StdPhysicalQuantity(IntEnum):
"""
Standard physical quantities fields that Maxwell equations concern about,
material solving stage will deal with these standard physical fields.
Supported Platforms:
``Ascend``
"""
MU = 0
EPSILON = 1
SIGMA = 2
TAND = 3
DE_C_INTER_PHYSICAL_QUANTITY = {
StdPhysicalQuantity.MU: PhyQuantity.DE_PHYSICAL_MU,
StdPhysicalQuantity.EPSILON: PhyQuantity.DE_PHYSICAL_EPSILON,
StdPhysicalQuantity.SIGMA: PhyQuantity.DE_PHYSICAL_SIGMA,
StdPhysicalQuantity.TAND: PhyQuantity.DE_PHYSICAL_TAND,
}
[docs]class PointCloudSamplingConfig:
r"""
Sampling space config for PointCloud-Tensor generation.
Args:
sampling_mode (int): Point sampling method. 0(UPPERBOUND) and 1(DIMENSIONS) are supported.
bbox_type (int): Bounding box type for sampling space, only supports cube-shape sampling space. 0(STATIC) and
1(DYNAMIC) are supported.
mode_args (Union[int, tuple]): sampling upperbound number for SamplingMode. Default: None
bbox_args (tuple): bounding_box arguments for sampling, has different definition in different bbox_type.
Default: None
Raises:
TypeError: if `sampling_mode` is not an int.
TypeError: if `bbox_type` is not an int.
TypeError: if `mode_args` is not one of int or tuple.
TypeError: if `bbox_args` is not a tuple.
TypeError: if `sampling_mode` is 0 but `mode_args` is not int.
TypeError: if `sampling_mode` is 1 but `mode_args` is not a tuple of three integers.
ValueError: if `sampling_mode` is 1 but the length of `mode_args` is not three.
ValueError: if `sampling_mode` not in [0(UPPERBOUND), 1(DIMENSIONS)].
ValueError: if `bbox_type` not in [0(STATIC), 1(DYNAMIC)].
Supported Platforms:
``Ascend``
"""
def __init__(self, sampling_mode, bbox_type, mode_args=None, bbox_args=None):
if not isinstance(sampling_mode, int) or isinstance(sampling_mode, bool):
raise TypeError("sampling_mode should be int, but got {} with type {}".format(sampling_mode,
type(sampling_mode)))
if not isinstance(bbox_type, int) or isinstance(sampling_mode, bool):
raise TypeError("bbox_type should be int, but got {} with type {}".format(bbox_type, type(bbox_type)))
if mode_args is not None and not isinstance(mode_args, (int, tuple)) or isinstance(mode_args, bool):
raise TypeError("mode_args should be int or tuple, but got {} with type {}".format(mode_args,
type(mode_args)))
if bbox_args is not None and not isinstance(bbox_args, tuple):
raise TypeError("bbox_args should be tuple, but got {} with type {}".format(bbox_args, type(bbox_args)))
if sampling_mode not in [0, 1]:
raise ValueError("Only UPPERBOUND(0) and DIMENSIONS(1) are supported values for sampling_mode, \
but got: {} ".format(sampling_mode))
if bbox_type not in [0, 1]:
raise ValueError("Only STATIC(0) and DYNAMIC(1) are supported values for bbox_type, \
but got: {} ".format(bbox_type))
if sampling_mode == 0 and (not isinstance(mode_args, int) or isinstance(mode_args, bool)):
raise TypeError("mode_args should always be int if sampling_mode is \"UPPERBOUND(0)\" , \
but got: {} with type {}".format(mode_args, type(mode_args)))
if sampling_mode == 1:
_check_mode_args(mode_args)
self.sampling_mode = sampling_mode
self.bbox_type = bbox_type
self.mode_args = mode_args
self.bbox_args = bbox_args
[docs]class MaterialConfig:
r"""
Material solution config for PointCloud-Tensor generation, which influence the material solving stage.
Args:
json_file (str): Material information for each sub-model json file path.
material_dir (str): Directory path for all material, physical quantities information of each material
record in a text file.
physical_field (dict): Standard physical quantities fields that Maxwell equations concern about,
material solving stage will deal with these standard physical fields. The key of physical_field dict
is physical quantity name, the value is default value for this physical quantity.
customize_physical_field (dict, option): User can specify physical quantities fields according to their
demand, similarly, material solving stage will take care of them. Default: None
remove_vacuum (bool, option): Remove sub-solid whose material property is vacuum. Default: True
Raises:
TypeError: if `json_file` is not a str.
TypeError: if `material_dir` is not a str.
TypeError: if `physical_field` is not a dict.
TypeError: if `customize_physical_field` is not a dict.
TypeError: if `remove_vacuum` is not a bool.
Supported Platforms:
``Ascend``
"""
def __init__(self, json_file, material_dir, physical_field, customize_physical_field=None,
remove_vacuum=True):
if not isinstance(json_file, str):
raise TypeError("json_file should be str, but got {} with type {}".format(json_file, type(json_file)))
if not isinstance(material_dir, str):
raise TypeError("material_dir should be str, but got {} with type {}".format(material_dir,
type(material_dir)))
if not isinstance(physical_field, dict):
raise TypeError("physical_field should be dict, but got {} with type {}"
.format(physical_field, type(physical_field)))
if customize_physical_field is not None and not isinstance(customize_physical_field, dict):
raise TypeError("customize_physical_field should be dict, but got {} with type {}"
.format(customize_physical_field, type(customize_physical_field)))
if not isinstance(remove_vacuum, bool):
raise TypeError("remove_vacuum should be bool, but got {} with type {}".format(remove_vacuum,
type(remove_vacuum)))
self.json_file = json_file
self.material_dir = material_dir
self.physical_field = dict()
for k, v in physical_field.items():
de_key = DE_C_INTER_PHYSICAL_QUANTITY[k]
self.physical_field[de_key] = v
self.customize_physical_field = customize_physical_field
self.remove_vacuum = remove_vacuum
def print_physical_field(self):
if self.physical_field is not None:
for k, v in self.physical_field.items():
logger.info("key: {0}, value: {1}".format(k, v))
if self.customize_physical_field is not None:
for k, v in self.customize_physical_field.items():
logger.info("key: {0}, value: {1}".format(k, v))
GLOBAL_ALL_SECTIONS = []
GLOBAL_ALL_TENSOR_OUTPUTS = []
def collect_section_result(group_sections):
global GLOBAL_ALL_SECTIONS
GLOBAL_ALL_SECTIONS.extend(group_sections)
def collect_space_result(space_solution):
global GLOBAL_ALL_TENSOR_OUTPUTS
GLOBAL_ALL_TENSOR_OUTPUTS.extend(space_solution)
def _check_mode_args(inputs):
"""check int types"""
if not isinstance(inputs, tuple):
raise TypeError("mode_args should always be tuple if sampling_mode is \"DIMENSIONS(1)\","
" but got: {} with type {}".format(inputs, type(inputs)))
if not len(inputs) == 3:
raise ValueError("mode_args should always be tuple with length of 3 if sampling_mode is"
" \"DIMENSIONS(1)\", but got: {} with length of {}".format(inputs, len(inputs)))
if not all(isinstance(x, int) and not isinstance(x, bool) for x in inputs):
raise TypeError("mode_args should always be tuple of 3 integers if sampling_mode is \"DIMENSIONS(1)\","
" but got: {} with type ({}, {}, {})".format(inputs, type(inputs[0]),
type(inputs[1]), type(inputs[2])))
[docs]class PointCloud:
"""
Read the stp files to generate PointCloud data, for downstream physical-equation AI simulation. Besides, you can
analyse the space topological information for any 3-D model in stp format. (The most popular format in CAD)
Args:
data_dir (str): stp files directory, raw data
sampling_config (PointCloudSamplingConfig): Sampling space config for PointCloud-Tensor generation.
material_config (MaterialConfig): Material solution config for PointCloud-Tensor generation, which
influence the material solving stage.
num_parallel_workers (int, option): Parallel workers number, this arguments can take effect on all computing
stages, including reading model, section building, space solving and material solving. Default: os.cpu_count()
Raises:
TypeError: if `data_dir` is not a str.
TypeError: if `sampling_config` is not an instance of class PointCloudSamplingConfig.
TypeError: if `material_config` is not an instance of class MaterialConfig.
TypeError: if `num_parallel_workers` is not an int.
Supported Platforms:
``Ascend``
Examples:
>>> import mindelec.data as md
>>> from mindelec.data.pointcloud import SamplingMode, BBoxType, StdPhysicalQuantity
>>> sampling_config = md.PointCloudSamplingConfig(SamplingMode.UPPERBOUND, BBoxType.DYNAMIC, 1000000000,
... (5., 5., 5., 5., 5., 5.))
>>> std_physical_info = {
... StdPhysicalQuantity.MU: 1.0,
... StdPhysicalQuantity.EPSILON: 1.0,
... StdPhysicalQuantity.SIGMA: 0.,
... StdPhysicalQuantity.TAND: 0.,
... }
>>> material_config = md.MaterialConfig(JSON_FILE, MATERIAL_DIR, std_physical_info, None, True)
>>> pointcloud = md.PointCloud(STP_DIR, sampling_config, material_config)
"""
def __init__(self, data_dir, sampling_config, material_config, num_parallel_workers=os.cpu_count()):
if not isinstance(data_dir, str):
raise TypeError("data_dir should be str, but got {} with type {}".format(data_dir, type(data_dir)))
if not isinstance(sampling_config, PointCloudSamplingConfig):
raise TypeError("sampling_config should be an instance of {}, but got {} with type {}"
.format(PointCloudSamplingConfig, sampling_config, type(sampling_config)))
if not isinstance(material_config, MaterialConfig):
raise TypeError("material_config should be an instance of {}, but got {} with type {}"
.format(MaterialConfig, material_config, type(material_config)))
if not isinstance(num_parallel_workers, int):
raise TypeError("num_parallel_workers should be int, but got {} with type {}"
.format(num_parallel_workers, type(num_parallel_workers)))
self._data_dir = data_dir
self._sampling_config = sampling_config
self._material_config = material_config
self._num_parallel_workers = num_parallel_workers
self._whole_model_bbox = None
self._sub_model_bbox_matrix = None
self._sampling_distance = list()
self._sampling_space = dict()
self._tensor_offset_table = list()
self._model_list = list()
self._real_model_index = list()
self._init_resource()
def _init_resource(self):
"""
Private member function, this function is for initialize all computing resources, includes reading raw stp data
and constructing sampling space
"""
self._read_step_files()
self._init_sampling_space()
def _read_step_files(self):
"""
Private member function, this function is for reading raw stp data, all these data will be stored in model_list
member variable.
"""
logger.info("Start to read stp files.")
t_read_begin = time()
if self._data_dir[-1] != '/':
self._data_dir += '/'
file_list = [name for name in os.listdir(self._data_dir) if os.path.isfile(self._data_dir + name)
and name.endswith(".stp")]
file_list = self._files_filter(file_list)
n_file = len(file_list)
shared_mem_ret = mp.Manager().dict()
block_size = int(n_file / self._num_parallel_workers)
block_index = []
file_groups = [[] for _ in range(self._num_parallel_workers)]
for i in range(self._num_parallel_workers - 1):
block_begin = i * block_size
block_end = (i + 1) * block_size
file_groups[i] = file_list[block_begin:block_end]
block_index.append((block_begin, block_end))
last_begin = (self._num_parallel_workers - 1) * block_size
file_groups[-1] = file_list[last_begin:]
block_index.append((last_begin, n_file))
process_pool0 = mp.Pool(processes=self._num_parallel_workers)
for group_id, file_group in enumerate(file_groups):
process_pool0.apply_async(func=load_data, args=(self._data_dir, file_group, group_id, shared_mem_ret))
process_pool0.close()
process_pool0.join()
self._model_list = [None] * n_file
for key in shared_mem_ret.keys():
begin = block_index[key][0]
end = block_index[key][1]
self._model_list[begin:end] = shared_mem_ret.get(key)
logger.info("Reading files successfully. Time costs: {}".format(time() - t_read_begin))
def _files_filter(self, file_list):
"""
Private member function. This function can find index for all stp file and sort them in numerical order.
Args:
file_list (list): Stp file name list in target directory
Returns:
list, processed file_list.(After filter and sorter)
"""
numerical_file_list = []
vacuum_solid_index = set()
if self._material_config is not None:
if self._material_config.remove_vacuum:
material_dict = json_2_dict(self._material_config.json_file)["solids"]
for solid in material_dict:
if solid["material"] == "Vacuum":
vacuum_solid_index.add(solid["index"])
for file in file_list:
real_index = re.findall(r"[-+]?\d*\d+|\d+", file)[0]
if int(real_index) < 0:
print("[ERROR]Negative index stp file name is prohibited.")
raise RuntimeError
if self._material_config is not None and int(real_index) in vacuum_solid_index:
continue
numerical_file_list.append((file, int(real_index)))
self._real_model_index.append(int(real_index))
numerical_file_list.sort(key=lambda x: x[1])
self._real_model_index.sort()
return numerical_file_list
def _init_sampling_space(self):
"""
Private member function. This function is for pointcloud sampling space initialization
"""
real_dimension_number = self._sampling_distance_filter()
self._space_construct(real_dimension_number)
logger.info("Sampling space filter has been initialized successfully.")
def _sampling_distance_filter(self):
"""
Private member function. Find the sampling number for each axis
Returns:
Numpy.array, list of sampling number for each axis in x,y,z order
"""
self._find_bbox()
space = []
if self._sampling_config.bbox_type == BBoxType.DYNAMIC:
for i in range(3):
space.append((self._whole_model_bbox[i+3] + self._sampling_config.bbox_args[i+3]) -
(self._whole_model_bbox[i] - self._sampling_config.bbox_args[i]))
elif self._sampling_config.bbox_type == BBoxType.STATIC:
for i in range(3):
space.append(self._sampling_config.bbox_args[i+3] - self._sampling_config.bbox_args[i])
real_sampling_number = []
if self._sampling_config.sampling_mode == SamplingMode.UPPERBOUND:
product = reduce(mul, space, 1)
ratio = pow(self._sampling_config.mode_args / product, 1/3)
ideal_cube_size = 1 / ratio
for i in range(3):
number = max(3, int(space[i] / ideal_cube_size))
real_sampling_number.append(number)
self._sampling_distance.append(space[i] / (number - 1))
elif self._sampling_config.sampling_mode == SamplingMode.DIMENSIONS:
for i in range(len(self._sampling_config.mode_args)):
real_sampling_number.append(self._sampling_config.mode_args[i])
self._sampling_distance.append(space[i] / (real_sampling_number[i] - 1))
return real_sampling_number
def _find_bbox(self):
"""
Private member function, to find the bounding box for original model and all the sub-model
"""
bbox = []
bbox_info = np.zeros(shape=(len(self._model_list), 6))
for model_idx, model in enumerate(self._model_list):
bbox_info[model_idx] = bbox_for_one_shape(model)
for column in range(bbox_info.shape[1]):
if column < 3:
bbox.append(np.min(bbox_info[:, column], axis=0))
else:
bbox.append(np.max(bbox_info[:, column], axis=0))
self._whole_model_bbox = bbox
self._sub_model_bbox_matrix = bbox_info
def _space_construct(self, real_sampling_number):
"""
Private member function, for construct sampling linear space for pointcloud sampling on each axis
Args:
real_sampling_number: Sampling number for each axis in order xyz
"""
logger.info("Sampling numbers: {}".format(real_sampling_number))
axis_fields = ['X', 'Y', 'Z']
if self._sampling_config.bbox_type == BBoxType.DYNAMIC:
for idx, field in enumerate(axis_fields):
lower_bound = self._whole_model_bbox[idx] - self._sampling_config.bbox_args[idx]
upper_bound = self._whole_model_bbox[idx + 3] + self._sampling_config.bbox_args[idx + 3]
self._sampling_space[field] = np.linspace(start=lower_bound, stop=upper_bound,
num=real_sampling_number[idx], endpoint=True)
else:
for idx, field in enumerate(axis_fields):
lower_bound = self._sampling_config.bbox_args[idx]
upper_bound = self._sampling_config.bbox_args[idx + 3]
self._sampling_space[field] = np.linspace(start=lower_bound, stop=upper_bound,
num=real_sampling_number[idx], endpoint=True)
[docs] def model_list(self):
"""
Get model list
Returns:
list, model list
"""
return self._model_list
[docs] def topology_solving(self):
"""
Solve the topology space by ray-casting algorithm, for each point in sampling space we obtain its sub-model
belonging, all the results will be stored in a Global list. num_of_workers processes in total will be applied in
parallel computing.
"""
logger.info("Start to solve space")
n_dim = 4
physical_info_dim = 4 # Init with (x, y, z, model_id), so length = 4, if more material needed, we add it
if self._material_config is not None:
physical_info_dim += len(self._material_config.physical_field)
tensor_shape = [len(self._sampling_space['X']), len(self._sampling_space['Y']),
len(self._sampling_space['Z']), physical_info_dim]
self._offset_base(n_dim, tensor_shape)
model_groups = self._model_distribution()
self._section_building(model_groups)
section_group_vector = self._section_distribution()
self._space_solving(section_group_vector)
[docs] def tensor_build(self):
"""
Building pointcloud tensor by using the information obtained in topology_solving module. If poincloud object
initialized with material config, all material physical info will be considered. All the results will be stored
in a Global list of dictionary, num_of_workers processes in total will be applied in parallel computing.
Returns:
numpy.ndarray, pointcloud result
"""
physical_info_dim = 4 # Init with (x, y, z, model_id), so length = 4, if more material needed, we add it
if self._material_config is not None:
physical_info_dim += len(self._material_config.physical_field)
tensor_shape = (len(self._sampling_space['X']), len(self._sampling_space['Y']),
len(self._sampling_space['Z']), physical_info_dim)
self._tensor_impl = PointCloudImpl(self._material_config.json_file, self._material_config.material_dir,
self._material_config.physical_field, self._num_parallel_workers)
tensor = np.zeros(shape=tensor_shape, dtype=np.float64)
self._tensor_init(tensor)
self._material_solving(tensor)
return tensor
def _offset_base(self, n_dim, shape):
"""
Compute the pointer offset for the final pointcloud tensor which is numpy.nd_array class
Args:
n_dim (int): Dimension number of final nd_array object, it equals to 4
shape (list): The size on each dimension, in x, y, z, physical_quantities order
"""
for i in range(n_dim):
base = 1
for j in range(i + 1, n_dim, 1):
base *= shape[j]
self._tensor_offset_table.append(base)
def _model_distribution(self):
"""
Distribution policy for section building parallel compute, after we test several scheduling policies, we find
out the modular divide policy is optimal in average for most case, actually the topological complexity of each
sub-model is unpredictable, hence at present we have no more choice to do that.
We guess maybe it can be refined by pre-collecting the file size for each stp model and estimate the
corresponding topological complexity, however, it needs more test to prove the effective for this method.
Returns:
scheduling result for all stp model, list[list]. This group divide result will be applied in section
building stage to make the system load as balance as possible.
"""
total_model_num = len(self._model_list)
model_group = [[] for _ in range(self._num_parallel_workers)]
for model_idx in range(total_model_num):
mod = model_idx % self._num_parallel_workers
model_group[mod].append((model_idx, self._model_list[model_idx]))
self._model_list.clear()
return model_group
def _section_building(self, distributed_model_groups):
"""
Private member function, this function is section building module. We use a processes pool API to make use of
all hardware resource. All sections obtained in this function will be stored in a Global list.
Important:
n_block is designed to accelerate the total computing by divide z_sampling space into n_block pieces.
The idea is for one certain stp model, the complexity is similar among different z_sampling, we will
introduce this parameter in next function.
Args:
distributed_model_groups (list[list]): Scheduled model group generated by self._model_distribution(), each
process only consider one unique group at a certain time.
"""
t_section_build_begin = time()
n_block = 4
logger.info("Total block number: {}".format(n_block))
process_pool = mp.Pool(processes=min(os.cpu_count(), n_block * self._num_parallel_workers))
for model_group_id, model_group in enumerate(distributed_model_groups):
for block_id in range(n_block):
process_pool.apply_async(func=sections_generation,
args=(self, model_group, model_group_id, block_id, n_block),
callback=collect_section_result)
process_pool.close()
process_pool.join()
logger.info("Section generation costs: {}.".format(time() - t_section_build_begin))
logger.info("{} sections have been generated in total.".format(len(GLOBAL_ALL_SECTIONS)))
def _section_building_impl(self, model_info_list, block_idx, block_number):
"""
Section building module algorithm implementation. Find the cross section for each stp model on each z-value, we
use num_of_workers processes to finish this task in parallel mode.
Args:
model_info_list (list): A list contains model information.
block_idx (int): z_sampling block index, we divide z_sampling space into block_number pieces
block_number (int): Total block number, it equals to n_block as always
Raises:
TypeError: if `model_info_list` is not a list.
TypeError: if `block_idx` is not an int.
TypeError: if `block_number` is not an int.
Returns:
dict[list], A dictionary record the section information obtained by this function. The key is model index,
the value is a list of section information. section information is a SectionInfo class.
"""
if not isinstance(model_info_list, list):
raise TypeError("model_info_list: {} should be list, but got {}"
.format(model_info_list, type(model_info_list)))
if not isinstance(block_idx, int):
raise TypeError("block_idx: {} should be int, but got {}"
.format(block_idx, type(block_idx)))
if not isinstance(block_number, int):
raise TypeError("block_number: {} should be int, but got {}"
.format(block_number, type(block_number)))
gp = import_module("OCC.Core.gp")
gp_dir = getattr(gp, "gp_Dir")
gp_pln = getattr(gp, "gp_Pln")
gp_pnt = getattr(gp, "gp_Pnt")
brep_builder_api = import_module("OCC.Core.BRepBuilderAPI")
brep_builder_api_make_face = getattr(brep_builder_api, "BRepBuilderAPI_MakeFace")
brep_algo_api = import_module("OCC.Core.BRepAlgoAPI")
brep_algo_api_section = getattr(brep_algo_api, "BRepAlgoAPI_Section")
section_info_dict = dict()
xy_tolerance = 1.
size_z = self._sampling_space["Z"].size
z_sampling = self._sampling_space["Z"][block_idx:size_z:block_number]
for model_info in model_info_list:
model_id = model_info[0]
model = model_info[1]
section_info_dict[model_id] = list()
z_min = self._sub_model_bbox_matrix[model_id][2]
z_max = self._sub_model_bbox_matrix[model_id][5]
x_range = self._sub_model_bbox_matrix[model_id][3] - self._sub_model_bbox_matrix[model_id][0]
for z_id, z_value in enumerate(z_sampling):
if z_value <= z_min:
continue
if z_value >= z_max:
break
t_section_begin = time()
section_plane = gp_pln(gp_pnt(0, 0, z_value), gp_dir(0, 0, 1))
section_face = brep_builder_api_make_face(section_plane,
self._whole_model_bbox[0] - xy_tolerance,
self._whole_model_bbox[3] + xy_tolerance,
self._whole_model_bbox[1] - xy_tolerance,
self._whole_model_bbox[4] + xy_tolerance).Face()
section = brep_algo_api_section(section_face, model).Shape()
t_section_end = time()
real_z_id = block_number * z_id + block_idx
cur_section_info = SectionInfo(section, model_id, z_value, real_z_id, t_section_end - t_section_begin,
x_range)
if cur_section_info.has_shape():
section_info_dict[model_id].append(cur_section_info)
return section_info_dict
def _section_distribution(self):
"""
Private member function. This function is for generating distribution policy for space solving module
Distribution policy for space solving parallel compute, after we test several scheduling policies, we find
out the modular divide policy is the optimal one in average for most case, we use SectionInfo object to express
the topological complexity for each section, by applying SectionInfo.Weight() method we can obtain that and then
we divide all sections into different group to make system load more balance hence accelerate computing speed.
Returns:
list[list], Scheduled section group, it will be applied in space solving module
"""
sections_group_vector = [[] for _ in range(self._num_parallel_workers)]
GLOBAL_ALL_SECTIONS.sort(reverse=True)
for section_idx, section_info in enumerate(GLOBAL_ALL_SECTIONS):
group_idx = section_idx % self._num_parallel_workers
sections_group_vector[group_idx].append(section_info)
if self._num_parallel_workers != 1:
for section_info in GLOBAL_ALL_SECTIONS:
section_info.remove_edges_for_rpc()
return sections_group_vector
def _space_solving(self, distributed_section_groups):
"""
Space solving implementation, we use a processes pool to finish this task in parallel mode.
Args:
distributed_section_groups (list[list]): Scheduled section group, each process can deal with one group at
the same time
"""
t_space_solving_begin = time()
process_pool = mp.Pool(processes=min(os.cpu_count(), self._num_parallel_workers))
for section_info_group in distributed_section_groups:
process_pool.apply_async(func=inner_point_generation, args=(self, section_info_group),
callback=collect_space_result)
process_pool.close()
process_pool.join()
logger.info("Space solving costs: {}.".format(time() - t_space_solving_begin))
logger.info("{} line dictionaries have been solved in total.".format(len(GLOBAL_ALL_TENSOR_OUTPUTS)))
def _inner_point_from_section(self, section_info_obj):
"""
Generate space solution for one section
Args:
section_info_obj(SectionInfo): Topological information for one section
Raises:
TypeError: if `section_info_obj` is not an instance of class SectionInfo.
Returns:
list[dict], A list of dictionary records the space solution obtained from this module, each element in this
list is the space solution from one orthogonal line.
"""
if not isinstance(section_info_obj, SectionInfo):
raise TypeError("model_info_list: {} should be an instance of {}, but got {}"
.format(section_info_obj, SectionInfo, type(section_info_obj)))
section_result = list()
tolerance_constant = 1e-6
z = section_info_obj.z_value()
idz = section_info_obj.z_idx()
section_edges = section_info_obj.edges()
model_idx = section_info_obj.model_idx()
section_bbox = bbox_for_one_shape(section_info_obj.section(), tolerance_constant)
left_most = section_bbox[0]
right_most = section_bbox[3]
section_edges_x_range = np.zeros(shape=(len(section_edges), 2))
for edge_id, section_edge in enumerate(section_edges):
section_edges_x_range[edge_id, 0] = bbox_for_one_shape(section_edge)[0]
section_edges_x_range[edge_id, 1] = bbox_for_one_shape(section_edge)[3]
for idx, x in enumerate(self._sampling_space['X']):
if x <= left_most + sqrt(3) * tolerance_constant:
continue
if x >= right_most - sqrt(3) * tolerance_constant:
break
one_line_result = self._inner_point_from_line(x, idx, z, idz, model_idx, section_bbox, section_edges,
section_edges_x_range)
if one_line_result:
section_result.append(one_line_result)
return section_result
def _get_intersection_coordinate_list(self, idx, x, z, section_edges, edges_x_range, section_bbox):
"""
get_intersection_coordinate_list
Args:
x: The x_value of the line
idx: The corresponding index of x_value in x_sampling
z: The z_value of the key section to be solved
section_bbox: Bounding box of key section
section_edges: List of edges for key section
edges_x_range: x_range for each key section edge, hence we got a matrix with shape = [|E|, 2]
Returns:
intersection_coordinate_list
"""
tolerance = 1e-3
intersection_coordinate_list = []
down_most = section_bbox[1]
up_most = section_bbox[4]
brep_builder_api = import_module("OCC.Core.BRepBuilderAPI")
brep_builder_api_make_edge = getattr(brep_builder_api, "BRepBuilderAPI_MakeEdge")
gp = import_module("OCC.Core.gp")
gp_pnt = getattr(gp, "gp_Pnt")
orthogonal_edge = brep_builder_api_make_edge(gp_pnt(x, down_most - 1, z),
gp_pnt(x, up_most + 1, z)).Edge()
for edge_id, section_edge in enumerate(section_edges):
if x <= edges_x_range[edge_id, 0] or x >= edges_x_range[edge_id, 1]:
continue
min_dist, mp1, _ = minimum_distance(orthogonal_edge, section_edge)
if min_dist <= tolerance:
for u in mp1:
yu = u.Y()
intersection_coordinate_list.append(yu)
def _judge_yu(inner_yu_list, inner_tolerance, inner_intersection_coordinate_list):
pre_yu = -inf
for inner_yu in inner_yu_list:
delta = inner_yu - pre_yu
if delta > inner_tolerance:
inner_intersection_coordinate_list.append(inner_yu)
pre_yu = inner_yu
return inner_intersection_coordinate_list
if len(intersection_coordinate_list) % 2 == 1 and idx != 0:
intersection_coordinate_list.clear()
x_half_left = 0.5 * self._sampling_space['X'][idx - 1] + 0.5 * x
orthogonal_edge_half_left = brep_builder_api_make_edge(gp_pnt(x_half_left, down_most - 1, z),
gp_pnt(x_half_left, up_most + 1, z)).Edge()
for edge_id, section_edge in enumerate(section_edges):
if x_half_left <= edges_x_range[edge_id, 0] or x_half_left >= edges_x_range[edge_id, 1]:
continue
min_dist, mp1, _ = minimum_distance(orthogonal_edge_half_left, section_edge)
if min_dist <= tolerance:
if len(mp1) == 1:
for u in mp1:
yu = u.Y()
intersection_coordinate_list.append(yu)
else:
yu_list = list()
for u in mp1:
yu_list.append(u.Y())
yu_list.sort()
intersection_coordinate_list = _judge_yu(yu_list, tolerance, intersection_coordinate_list)
return intersection_coordinate_list
def _inner_point_from_line(self, x, idx, z, idz, model_idx, section_bbox, section_edges, edges_x_range):
"""
Analyse one orthogonal line for a specific section to find out the space solution. The definition of space
solution is we can determine for all points which sub model which belongs to.
Args:
notation:
we define key section as the current section to be solved
x: The x_value of the line
idx: The corresponding index of x_value in x_sampling
z: The z_value of the key section to be solved
idz: The corresponding index of z_value in z_sampling
model_idx: Index of sub-model key section belongs to
section_bbox: Bounding box of key section
section_edges: List of edges for key section
edges_x_range: x_range for each key section edge, hence we got a matrix with shape = [|E|, 2]
Returns:
result_dict: A dictionary which can be applied to record the result in following form.
key: (idx, idy, idz) ---> offset_value, the value of one point determine the location in final tensor.
value: sub-model index determines which key point belongs to
"""
line_result = dict()
tolerance = 1e-3
intersection_coordinate_list = self._get_intersection_coordinate_list(idx, x, z, section_edges, edges_x_range,
section_bbox)
intersection_coordinate_list.sort()
if len(intersection_coordinate_list) % 2 == 1:
if len(intersection_coordinate_list) == 1:
return line_result
intersection_coordinate_list.pop(1)
if not intersection_coordinate_list:
return line_result
for even in range(0, len(intersection_coordinate_list), 2):
lower_bound_y = intersection_coordinate_list[even]
upper_bound_y = intersection_coordinate_list[even + 1]
for idy, y in enumerate(self._sampling_space['Y']):
if y <= lower_bound_y + sqrt(3) * tolerance:
continue
if y >= upper_bound_y - sqrt(3) * tolerance:
break
index_p = (idx, idy, idz)
offset_value = self._offset_compute(index_p)
line_result[offset_value] = model_idx
return line_result
def _offset_compute(self, point_index):
"""
Private member function, this function is for offset value computing
Args:
point_index: (idx, idy, idz) the index in sampling space for one certain point
Returns:
int, the offset value computed from point index: (idx, idy, idz)
"""
offset_value = 0
for i in range(len(point_index)):
offset_value += point_index[i] * self._tensor_offset_table[i]
return offset_value
def _tensor_init(self, tensor):
"""
Private member function, this function is for initializing the final tensor.
Args:
tensor (ndarray): Final tensor records the pointcloud information
"""
logger.info("Space initialize begin")
x_list = list(self._sampling_space['X'])
y_list = list(self._sampling_space['Y'])
z_list = list(self._sampling_space['Z'])
self._tensor_impl.tensor_init_impl(x_list, y_list, z_list, self._tensor_offset_table, tensor)
def _material_solving(self, tensor):
"""
Private member function, this function is for assigning material information into final tensor.
Args:
tensor (ndarray): Final tensor records the pointcloud information
Important:
If construct pointcloud object without material config, this function will raise error in cc layer.
"""
logger.info("Material analyse begin")
self._tensor_impl.material_analyse_impl(GLOBAL_ALL_TENSOR_OUTPUTS, self._real_model_index, tensor)