# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Debugger python API."""
import os.path
from typing import Iterable
from mindinsight.debugger.api.conditions import WatchpointHit, HitDetail, WatchpointHandle, WatchpointHitImpl
from mindinsight.debugger.api.debugger_engine import DebuggerEngine
from mindinsight.debugger.api.debugger_tensor import DebuggerTensor, DebuggerTensorImpl
from mindinsight.debugger.api.node import Node, NodeImpl, NodeUniqueId
from mindinsight.debugger.common.exceptions.exceptions import DebuggerParamValueError
from mindinsight.debugger.common.log import LOGGER as log
from mindinsight.debugger.common.utils import (
validate_type, validate_slots, parse_param_to_iterable_obj
)
from mindinsight.debugger.dump.parser import DebuggerParser
from mindinsight.debugger.stream_cache.data_loader import DataLoader
from mindinsight.domain.graph.base import NodeType
from mindinsight.domain.graph.query import construct_filter_func
[文档]class DumpAnalyzer:
"""
Analyzer to inspect the dump data.
.. warning::
All APIs in this class are experimental prototypes that are subject to
change or deletion.
Args:
dump_dir (str): The path of the dump folder.
mem_limit (int, optional): The memory limit for checking watchpoints in MB.
Optional values: from 2048 MB to 2147483647 MB. None means no limit is set, only limited by computor memory.
Default: None.
Supported Platforms:
``Ascend`` ``GPU``
Examples:
>>> from mindinsight.debugger import DumpAnalyzer
>>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data")
"""
def __init__(self, dump_dir, mem_limit=None):
self._dump_dir = os.path.realpath(dump_dir)
self._mem_limit = 0 if mem_limit is None else mem_limit
self._data_loader = None
self._debuger_engine = None
self._parser = None
# the key is rank_id, the value is <tensor_feature, TensorImpl> map
self._nodes = {}
self._initialize()
def _initialize(self):
"""Initialize."""
self._validate_mem_limit(self._mem_limit)
self._data_loader = DataLoader(self._dump_dir)
self._debugger_engine = DebuggerEngine(self._data_loader, self._mem_limit)
self._parse()
@staticmethod
def _validate_mem_limit(mem_limit):
"""Validate memory limit."""
validate_type(mem_limit, 'mem_limit', int, 'int or None')
# The unit is MB, from 2G to max value of int32 MB
min_limit_value = 2 * 1024
max_limit_value = 2147483647
if mem_limit and mem_limit < min_limit_value or mem_limit > max_limit_value:
msg = f"If mem_limit is not None, it should be set in [{min_limit_value}, {max_limit_value}]."
raise DebuggerParamValueError(msg)
def _parse(self):
"""Parse graph into nodes and tensors."""
def _add_node_impl(base_nodes, node_type, nodes):
nonlocal id_to_name_map
for b_node in base_nodes:
new_node = NodeImpl(b_node, node_type)
new_node.debugger_engine = self._debugger_engine
nodes[new_node.rank][new_node.unique_id] = new_node
id_to_name_map[new_node.rank][(b_node.graph_name, b_node.name)] = new_node.name
def _update_node_list(node_list, dst_id, cur_node, cur_node_map):
nonlocal id_to_name_map
dst_node_name = id_to_name_map[cur_node.rank].get(dst_id)
if not dst_node_name:
log.info("Failed to find %s in id_to_name_map", dst_id)
return
unique_id = NodeUniqueId(name=dst_node_name,
rank=cur_node.rank, graph_name=cur_node.graph_name)
target_node = cur_node_map.get(unique_id)
if not target_node:
log.error("Failed to find %s in node_map", unique_id)
return
node_list.append(target_node)
self._parser = DebuggerParser(self._data_loader)
ranks = self.get_ranks()
# the key is rank_id, the value is <node_unique_id, NodeImpl> map
self._nodes = {rank_id: {} for rank_id in ranks}
id_to_name_map = {rank_id: {} for rank_id in ranks}
_add_node_impl(self._parser.constants, NodeType.CONSTANT, self._nodes)
_add_node_impl(self._parser.parameters, NodeType.PARAMETER, self._nodes)
_add_node_impl(self._parser.operators, NodeType.OPERATOR, self._nodes)
# update input and output nodes
for node_map in self._nodes.values():
for node in node_map.values():
base_node = node.base_node
if hasattr(base_node, 'inputs'):
# parameter or const node has no inputs
for node_input in base_node.inputs:
_update_node_list(node.input_nodes, (base_node.graph_name, node_input.name), node, node_map)
for node_map in self._nodes.values():
for node in node_map.values():
for node_input in node.input_nodes:
node_input.downstream.append(node)
[文档] def export_graphs(self, output_dir=None):
"""
Export the computational graph(s) in xlsx file(s) to the `output_dir` .
The file(s) will contain the stack info of graph nodes.
Args:
output_dir (str, optional): Output directory to save the file.
None means to use the current working directory. Default: None.
Returns:
str, The path of the generated file.
Examples:
>>> from mindinsight.debugger import DumpAnalyzer
>>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data")
>>> res = my_run.export_graphs()
"""
return self._parser.export_xlsx(output_dir)
[文档] def select_nodes(
self,
query_string,
use_regex=False,
select_by="node_name",
ranks=None,
case_sensitive=True) -> Iterable[Node]:
"""
Select nodes.
Select the matched nodes in the computational graph according to the
query_string. The nodes can be matched by "node_name" or "code_stack",
see the parameters for detail.
Args:
query_string (str): Query string. For a node to be selected, the
match target field must contains or matches the query string.
use_regex (bool, optional): Indicates whether query is a regex. Default:
False.
select_by (str, optional): The field to search when selecting
nodes. Available values are "node_name", "code_stack".
"node_name" means to search the name of the nodes in the
graph. "code_stack" means the stack info of
the node. Default: "node_name".
ranks (Union[int, list[int], None], optional): The rank(s) to select. None means all ranks will be
considered. The selected nodes must exist on the specified ranks. Default: None.
case_sensitive (bool, optional): Whether case-sensitive when
selecting tensors. Default: True.
Returns:
Iterable[Node], the matched nodes.
Examples:
>>> from mindinsight.debugger import DumpAnalyzer
>>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data")
>>> nodes = my_run.select_nodes("Conv2D-op13")
"""
validate_type(query_string, 'query_string', str, 'str')
validate_type(use_regex, 'use_regex', bool, 'bool')
validate_type(case_sensitive, 'case_sensitive', bool, 'bool')
node_filter_func = self._get_filter_func(select_by)
ranks = self._get_iterable_ranks(ranks)
query_filter_func = construct_filter_func(query_string, case_sensitive, use_regex)
nodes = []
for rank_id in ranks:
for node in self._nodes.get(rank_id, {}).values():
if node_filter_func(node, query_filter_func):
nodes.append(node.copy())
return nodes
@staticmethod
def _match_name(node, filter_func):
"""Check if name matched."""
return filter_func(node.name)
@staticmethod
def _match_stack(node, filter_func):
"""Check if stack matched."""
for res in map(filter_func, node.stack):
if res:
return True
return False
@staticmethod
def _get_filter_func(select_by):
"""Get filter function."""
if select_by == 'node_name':
return DumpAnalyzer._match_name
if select_by == 'code_stack':
return DumpAnalyzer._match_stack
raise DebuggerParamValueError(
"The param `select_by` only support `node_name` or `code_stack`.")
[文档] def select_tensors(
self,
query_string,
use_regex=False,
select_by="node_name",
iterations=None,
ranks=None,
slots=None,
case_sensitive=True) -> Iterable[DebuggerTensor]:
"""
Select tensors.
Select the matched tensors in the directory according to the
sepicified filter condition, see the parameters for detail.
Args:
query_string (str): Query string. For a tensor to be selected, the
match target field must contain or match the query string.
use_regex (bool, optional): Indicates whether query is a regex. Default:
False.
select_by (str, optional): The field to search when selecting
tensors. Available values are "node_name", "code_stack".
"node_name" means to search the node name of the tensors in the
graph. "code_stack" means the stack info of
the node that outputs this tensor. Default: "node_name".
iterations (Union[int, list[int], None], optional): The iteration(s) to select. None means all dumped
iterations will be selected. Default: None.
ranks (Union[int, list[int], None], optional): The rank(s) to select. None means all ranks will be selected.
Default: None.
slots (list[int], optional): The slot of the selected tensor. None means all slots will be selected.
Default: None.
case_sensitive (bool, optional): Whether case-sensitive when selecting tensors. Default: True.
Returns:
Iterable[DebuggerTensor], the matched tensors.
Examples:
>>> from mindinsight.debugger import DumpAnalyzer
>>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data")
>>> tensors = my_run.select_tensors("Conv2D-op13")
"""
validate_type(query_string, 'query_string', str, 'str')
validate_type(use_regex, 'use_regex', bool, 'bool')
validate_type(case_sensitive, 'case_sensitive', bool, 'bool')
validate_slots(slots)
node_filter_func = self._get_filter_func(select_by)
ranks = self._get_iterable_ranks(ranks)
dumped_iterations = self.get_iterations(ranks)
iterations = parse_param_to_iterable_obj(iterations, 'iterations', dumped_iterations)
tensors = []
query_filter_func = construct_filter_func(query_string, case_sensitive, use_regex)
for rank_id in ranks:
for node in self._nodes.get(rank_id, {}).values():
if node_filter_func(node, query_filter_func):
tensors.extend(node.get_output_tensors(slots=slots, iterations=iterations))
return tensors
[文档] def get_iterations(self, ranks=None) -> Iterable[int]:
"""
Get available iterations which have data dumped in this run.
Args:
ranks (Union[int, list[int], None], optional): The rank(s) to select.
Get available iterations which are under the specified ranks.
The ranks refers to the number of devices to be used starting from 0
when running distributed training. This number is called rank.
For example, for an 8-card computer, only 4-7 cards are used for
specified training, so 4-7 cards correspond to the ranks 0-3 respectively..
If None, return iterations of all ranks. Default: None.
Returns:
Iterable[int], available iterations which have dumped data, sorted in increasing order.
Examples:
>>> from mindinsight.debugger import DumpAnalyzer
>>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data")
>>> iterations = my_run.get_iterations()
>>> print(list(iterations))
[0]
"""
total_dumped_steps = self._data_loader.load_dumped_step()
ranks = self._get_iterable_ranks(ranks)
iterations = set()
for rank_id in ranks:
for iters_per_graph in total_dumped_steps.get(rank_id, {}).values():
iterations.update(iters_per_graph)
res = list(iterations)
res.sort()
return res
[文档] def get_ranks(self) -> Iterable[int]:
"""
Get the available ranks in this run.
Returns:
Iterable[int], the list of rank id in current dump directory.
Examples:
>>> from mindinsight.debugger import DumpAnalyzer
>>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data")
>>> ranks = my_run.get_ranks()
>>> print(list(ranks))
[0]
"""
return [rank_dir.rank_id for rank_dir in self._data_loader.rank_dirs]
[文档] def check_watchpoints(
self,
watchpoints,
error_on_no_value=False) -> Iterable[WatchpointHit]:
"""
Check the given watchpoints in a batch.
Note:
1. For speed, all watchpoints for the iteration should be given at
the same time to avoid reading tensors len(watchpoints) times.
2. The `check_watchpoints` function start a new process when it is called, needs to be
called in `if __name__ == '__main__'` .
Args:
watchpoints (Iterable[Watchpoint]): The list of watchpoints.
error_on_no_value (bool, optional): Whether to report error code in watchpoint
hit when the specified tensor have no value stored in
dump_dir. Default: False.
Returns:
Iterable[WatchpointHit], the watchpoint hit list, sorted by tensor drop time.
Examples:
>>> from mindinsight.debugger import DumpAnalyzer
>>> from mindinsight.debugger import (TensorTooLargeCondition,
... Watchpoint)
>>>
>>> def test_watchpoints():
... my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data")
... tensors = my_run.select_tensors(
... query_string="Conv2D-op13",
... use_regex=True,
... iterations=[0],
... ranks=[0],
... slots=[0]
... )
... watchpoint = Watchpoint(tensors=tensors,
... condition=TensorTooLargeCondition(abs_mean_gt=0.0))
... # the check_watchpoints function start a new process needs to be called through the main entry
... hit = list(my_run.check_watchpoints(watchpoints=[watchpoint]))[0]
... # print(str(hit))
... # the print result is as follows
... # Watchpoint TensorTooLarge triggered on tensor:
... # rank: 0
... # graph_name: kernel_graph_0
... # node_name: Default/network-WithLossCell/_backbone-AlexNet/conv2-Conv2d/Conv2D-op13
... # slot: 0
... # iteration: 0
... # Threshold: {'abs_mean_gt': 0.0}
... # Hit detail: the setting for watchpoint is abs_mean_gt = 0.0.
... # The actual value of the tensor is abs_mean_gt = 0.06592023578438996.
...
>>> if __name__ == "__main__":
... test_watchpoints()
...
"""
wp_hit_list = []
# key is watchpoint_id, value is a dict with iteration as the key and check_nodes as values
iterations = set()
wp_handles = {wp_id: WatchpointHandle(wp_id, wp) for wp_id, wp in enumerate(watchpoints)}
for wp_handle in wp_handles.values():
iterations.update(wp_handle.get_iterations())
debugger_backend = self._debugger_engine.dbg_service
# check all the watchpoint for the iterations
for iteration in iterations:
log.info("Check watchpoints for iteration %s", iteration)
if not self._data_loader.has_data(iteration):
log.info("No data dumped with iteration id: %s. Ignore checking watchpoint.", iteration)
continue
# adding the watchpoint for current iteration
for wp_handle in wp_handles.values():
wp_handle.add_watchpoint(iteration, self._debugger_engine)
# check the watchpoint for current iteration
# add the hit watchpoints to hit list
hit_list = debugger_backend.check_watchpoints(
iteration=iteration, error_on_no_value=error_on_no_value)
for hit in hit_list:
# the list of slots for the hit node to report
# (for the current watchpoint and iteration)
wp_handle = wp_handles.get(hit.watchpoint_id)
graph_name, node_name = hit.name.split('/', 1)
node = self._get_node(node_name, hit.rank_id, graph_name)
tensor = DebuggerTensorImpl(node=node, slot=hit.slot, iteration=iteration)
if wp_handle.need_check(tensor):
hit_params = hit.parameters
hit_detail = HitDetail(hit_params, wp_handle.condition)
wp_hit = WatchpointHitImpl(tensor=tensor,
condition=wp_handle.condition,
hit_detail=hit_detail,
error_code=hit.error_code)
wp_hit_list.append(wp_hit)
if error_on_no_value:
no_value_hit_list = []
for wp_handle in wp_handles.values():
no_value_hit_list += wp_handle.watchpoint_hit_on_no_value(iteration)
wp_hit_list += no_value_hit_list
# remove all the watchpoints for the previous iterations
for watchpoint_id in wp_handles:
debugger_backend.remove_watchpoint(watchpoint_id=watchpoint_id)
return wp_hit_list
def _get_node(self, node_name, rank_id, graph_name):
"""Get NodeImpl object."""
unique_id = NodeUniqueId(name=node_name, rank=rank_id, graph_name=graph_name)
node = self._nodes.get(rank_id, {}).get(unique_id)
return node
[文档] def list_affected_nodes(self, tensor):
"""
List the nodes that use given tensor as input.
Affected nodes is defined as the nodes use the given tensor as input. If
a node is affected by the given tensor, the node's output value is
likely to change when the given tensor changes.
Args:
tensor (DebuggerTensor): The tensor of which affected nodes will be
returned.
Returns:
Iterable[Node], the affected nodes of the specified tensor.
Examples:
>>> from mindinsight.debugger import DumpAnalyzer
>>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data")
>>> tensor_list = list(my_run.select_tensors(query_string="Conv2D-op13"))
>>> affected_nodes = my_run.list_affected_nodes(tensor_list[0])
"""
self._validate_node(tensor.node)
affected_nodes = [affected_node.copy() for affected_node in tensor.node.downstream]
return affected_nodes
[文档] def get_output_nodes(self, node):
"""
Get the nodes that use the output tensors of the given node.
Args:
node (Node): The specified node.
Returns:
Iterable[Node], output nodes of the specified node.
Examples:
>>> from mindinsight.debugger import DumpAnalyzer
>>> my_run = DumpAnalyzer(dump_dir="/path/to/your/dump_dir_with_dump_data")
>>> node_list = list(my_run.select_nodes(query_string="Conv2D-op13"))
>>> out_nodes = my_run.get_output_nodes(node_list[0])
"""
self._validate_node(node)
output_nodes = node.downstream.copy()
return output_nodes
def _validate_node(self, node):
"""Check if node is in current directory."""
validate_type(node, 'node', NodeImpl, 'Node')
node = self._get_node(node.name, node.rank, node.graph_name)
if node is None:
raise DebuggerParamValueError(f"Failed to find node {node.name} with rank {node.rank} "
"in dump directory.")
def _get_iterable_ranks(self, ranks):
"""
Validate input ranks and return iterable rands.
Args:
ranks (Union[int, list[int], None], optional): The range of ranks.
Returns:
list[int], list of rank id.
"""
total_ranks = self.get_ranks()
return parse_param_to_iterable_obj(ranks, 'ranks', total_ranks)