Source code for mindspore.train.callback._landscape

# Copyright 2021-2023 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Process data and Calc loss landscape."""
from __future__ import absolute_import

import os
import time
import json
import stat
import shutil
import numbers

from collections import defaultdict, namedtuple
from concurrent.futures import wait, ALL_COMPLETED, ProcessPoolExecutor

import numpy as np
from scipy import linalg, sparse

from mindspore import log as logger
from mindspore.common.tensor import Tensor
from mindspore.common.parameter import Parameter
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from mindspore.train.summary_pb2 import LossLandscape
from mindspore.train.summary import SummaryRecord
from mindspore.train.summary.enums import PluginEnum
from mindspore.train.anf_ir_pb2 import DataType
from mindspore.train._utils import check_value_type, _make_directory
from mindspore.train.dataset_helper import DatasetHelper
from mindspore.train.metrics import get_metrics
from mindspore import context

# if there is no path, you need to set to empty list
Points = namedtuple("Points", ["x", "y", "z"])


def nptype_to_prototype(np_value):
    """
    Transform the np type to proto type.

    Args:
        np_value (Type): Numpy data type.

    Returns:
        Type, proto data type.
    """
    np2pt_tbl = {
        np.bool_: 'DT_BOOL',
        np.int8: 'DT_INT8',
        np.int16: 'DT_INT16',
        np.int32: 'DT_INT32',
        np.int64: 'DT_INT64',
        np.uint8: 'DT_UINT8',
        np.uint16: 'DT_UINT16',
        np.uint32: 'DT_UINT32',
        np.uint64: 'DT_UINT64',
        np.float16: 'DT_FLOAT16',
        np.float: 'DT_FLOAT64',
        np.float32: 'DT_FLOAT32',
        np.float64: 'DT_FLOAT64',
        None: 'DT_UNDEFINED'
    }
    if np_value is None:
        return None

    np_type = np_value.dtype.type
    proto = np2pt_tbl.get(np_type, None)
    if proto is None:
        raise TypeError("No match for proto data type.")
    return proto


def fill_array_to_tensor(np_value, summary_tensor):
    """
    Package the tensor summary.

    Args:
        np_value (Type): Summary data type.
        summary_tensor (Tensor): The tensor of summary.

    Returns:
        Summary, return tensor summary content.
    """
    # get tensor dtype
    tensor_dtype = nptype_to_prototype(np_value)
    summary_tensor.data_type = DataType.Value(tensor_dtype)

    # get the value list
    tensor_value_list = np_value.reshape(-1).tolist()
    summary_tensor.float_data.extend(tensor_value_list)

    # get the tensor dim
    for vector in np_value.shape:
        summary_tensor.dims.append(vector)

    return summary_tensor


def transfer_tensor_to_tuple(inputs):
    """
    If the input is a tensor, convert it to a tuple. If not, the output is unchanged.
    """
    if isinstance(inputs, Tensor):
        return (inputs,)

    return inputs


class Landscape:
    """Return loss landscape."""
    def __init__(self,
                 intervals,
                 decomposition,
                 landscape_points: Points,
                 convergence_point=None,
                 path_points=None):
        self.landscape_points = landscape_points
        self.decomposition = decomposition
        self.intervals = intervals
        self.num_samples = 2048
        self.convergence_point = convergence_point
        self.path_points = path_points
        self.unit = 'step'
        self.step_per_epoch = 1

    def set_convergence_point(self, convergence_point: Points):
        """Set the convergence point."""
        self.convergence_point = convergence_point

    def transform_to_loss_landscape_msg(self, landscape_data):
        """Transform to loss landscape_msg."""
        landscape_msg = LossLandscape()
        # only save one dim in x and y
        fill_array_to_tensor(landscape_data.landscape_points.x[0], landscape_msg.landscape.x)
        fill_array_to_tensor(landscape_data.landscape_points.y[:, 0], landscape_msg.landscape.y)
        fill_array_to_tensor(landscape_data.landscape_points.z, landscape_msg.landscape.z)

        if landscape_data.path_points:
            landscape_msg.loss_path.intervals.extend(landscape_data.intervals)
            fill_array_to_tensor(landscape_data.path_points.x, landscape_msg.loss_path.points.x)
            fill_array_to_tensor(landscape_data.path_points.y, landscape_msg.loss_path.points.y)
            fill_array_to_tensor(landscape_data.path_points.z, landscape_msg.loss_path.points.z)

        if landscape_data.convergence_point:
            fill_array_to_tensor(landscape_data.convergence_point.x, landscape_msg.convergence_point.x)
            fill_array_to_tensor(landscape_data.convergence_point.y, landscape_msg.convergence_point.y)
            fill_array_to_tensor(landscape_data.convergence_point.z, landscape_msg.convergence_point.z)

        landscape_msg.metadata.decomposition = landscape_data.decomposition
        landscape_msg.metadata.unit = self.unit
        landscape_msg.metadata.step_per_epoch = self.step_per_epoch

        return landscape_msg


[docs]class SummaryLandscape: """ SummaryLandscape can help you to collect loss landscape information. It can create landscape in PCA direction or random direction by calculating loss. Note: SummaryLandscape only supports Linux systems. Args: summary_dir (str): The path of summary is used to save the model weight, metadata and other data required to create landscape. Examples: >>> import mindspore as ms >>> import mindspore.nn as nn >>> from mindspore.train import Model, Accuracy, Loss >>> from mindspore import SummaryCollector, SummaryLandscape >>> >>> if __name__ == '__main__': ... # If the device_target is Ascend, set the device_target to "Ascend" ... ms.set_context(mode=ms.GRAPH_MODE, device_target="GPU") ... # Create the dataset taking MNIST as an example. Refer to ... # https://gitee.com/mindspore/docs/blob/r2.3.q1/docs/mindspore/code/mnist.py ... ds_train = create_dataset() ... # Define the network structure of LeNet5. Refer to ... # https://gitee.com/mindspore/docs/blob/r2.3.q1/docs/mindspore/code/lenet.py ... network = LeNet5() ... net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean") ... net_opt = nn.Momentum(network.trainable_params(), 0.01, 0.9) ... model = Model(network, net_loss, net_opt, metrics={"Accuracy": Accuracy()}) ... # Simple usage for collect landscape information: ... interval_1 = [1, 2, 3, 4, 5] ... summary_collector = SummaryCollector(summary_dir='./summary/lenet_interval_1', ... collect_specified_data={'collect_landscape':{"landscape_size": 4, ... "unit": "step", ... "create_landscape":{"train":True, ... "result":False}, ... "num_samples": 2048, ... "intervals": [interval_1]} ... }) ... model.train(1, ds_train, callbacks=[summary_collector], dataset_sink_mode=False) ... ... # Simple usage for visualization landscape: ... def callback_fn(): ... # Define the network structure of LeNet5. Refer to ... # https://gitee.com/mindspore/docs/blob/r2.3.q1/docs/mindspore/code/lenet.py ... network = LeNet5() ... net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean") ... metrics = {"Loss": Loss()} ... model = Model(network, net_loss, metrics=metrics) ... # Create the dataset taking MNIST as an example. Refer to ... # https://gitee.com/mindspore/docs/blob/r2.3.q1/docs/mindspore/code/mnist.py ... ds_eval = create_dataset() ... return model, network, ds_eval, metrics ... ... summary_landscape = SummaryLandscape('./summary/lenet_interval_1') ... # parameters of collect_landscape can be modified or unchanged ... summary_landscape.gen_landscapes_with_multi_process(callback_fn, ... collect_landscape={"landscape_size": 4, ... "create_landscape":{"train":False, ... "result":False}, ... "num_samples": 2048, ... "intervals": [interval_1]}, ... device_ids=[1]) """ def __init__(self, summary_dir): self._summary_dir = os.path.realpath(summary_dir) self._ckpt_dir = os.path.join(self._summary_dir, 'ckpt_dir') _make_directory(self._ckpt_dir) # save the model params file, key is epoch, value is the ckpt file path self._model_params_file_map = {} self._epoch_group = defaultdict(list) self._metric_fns = None def _get_model_params(self, epochs): """Get the model params.""" parameters = [] for epoch in epochs: file_path = self._model_params_file_map.get(str(epoch)) parameters.append(list(load_checkpoint(file_path).values())) return parameters def _create_epoch_group(self, intervals): for i, interval in enumerate(intervals): for j in interval: self._epoch_group[i].append(j)
[docs] def clean_ckpt(self): """ Clean the checkpoint. Tutorial Examples: - `Training Optimization Process Visualization <https://www.mindspore.cn/mindinsight/docs/en/master/landscape.html>`_ """ shutil.rmtree(self._ckpt_dir, ignore_errors=True)
[docs] def gen_landscapes_with_multi_process(self, callback_fn, collect_landscape=None, device_ids=None, output=None): """ Use the multi process to generate landscape. Args: callback_fn (python function): A python function object. User needs to write a function, it has no input, and the return requirements are as follows. - mindspore.train.Model: User's model object. - mindspore.nn.Cell: User's network object. - mindspore.dataset: User's dataset object for create loss landscape. - mindspore.train.Metrics: User's metrics object. collect_landscape (Union[dict, None]): The meaning of the parameters when creating loss landscape is consistent with the fields with the same name in SummaryCollector. The purpose of setting here is to allow users to freely modify creating parameters. Default: ``None`` . - landscape_size (int): Specify the image resolution of the generated loss landscape. For example, if it is set to ``128`` , the resolution of the landscape is 128 * 128. The calculation time increases with the increase of resolution. Default: ``40`` . Optional values: between 3 and 256. - create_landscape (dict): Select how to create loss landscape. Training process loss landscape(train) and training result loss landscape(result). Default: ``{"train": True, "result": True}``. Optional: ``True`` / ``False`` . - num_samples (int): The size of the dataset used to create the loss landscape. For example, in image dataset, You can set num_samples is 2048, which means that 2048 images are used to create loss landscape. Default: ``2048`` . - intervals (List[List[int]]): Specifies the interval in which the loss landscape. For example: If the user wants to create loss landscape of two training processes, they are 1-5 epoch and 6-10 epoch respectively. They can set [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]. Note: Each interval have at least three epochs. device_ids (List(int)): Specifies which devices are used to create loss landscape. For example: [0, 1] refers to creating loss landscape with device 0 and device 1. Default: ``None`` . output (str): Specifies the path to save the loss landscape. Default: ``None`` . The default save path is the same as the summary file. """ executor = None if len(device_ids) > 1: executor = ProcessPoolExecutor(len(device_ids)) futures = [executor.submit(self._set_context, i) for i in device_ids] wait(futures, return_when=ALL_COMPLETED) output_path = os.path.realpath(output) if output is not None else self._summary_dir summary_record = SummaryRecord(output_path) self._check_device_ids(device_ids) if collect_landscape is not None: try: self._check_collect_landscape_data(collect_landscape) except (ValueError, TypeError) as err: summary_record.close() raise err json_path = os.path.join(self._ckpt_dir, 'train_metadata.json') if not os.path.exists(json_path): summary_record.close() raise FileNotFoundError(f'For "{self.__class__.__name__}", ' f'train_metadata.json file path of {json_path} not exists.') with open(json_path, 'r') as file: data = json.load(file) for key, value in collect_landscape.items(): if key in data.keys(): data[key] = value if "intervals" in collect_landscape.keys(): self._create_epoch_group(collect_landscape.get("intervals")) data["epoch_group"] = self._epoch_group with open(json_path, 'w') as file: json.dump(data, file) os.chmod(json_path, stat.S_IRUSR) for interval, landscape in self._list_landscapes(callback_fn=callback_fn, executor=executor, device_ids=device_ids): summary_record.add_value(PluginEnum.LANDSCAPE.value, f'landscape_{str(interval)}', landscape) summary_record.record(0) summary_record.flush() summary_record.close()
def _list_landscapes(self, callback_fn, executor=None, device_ids=None): """Create landscape with single device and list all landscape.""" if not os.path.exists(os.path.join(self._ckpt_dir, 'train_metadata.json')): raise FileNotFoundError(f'For "{self.__class__.__name__}", train_metadata.json file does not exist ' f'under the path, please use summary_collector to collect information to ' f'create the json file') with open(os.path.join(self._ckpt_dir, 'train_metadata.json'), 'r') as file: data = json.load(file) self._check_json_file_data(data) self._epoch_group = data['epoch_group'] self._model_params_file_map = data['model_params_file_map'] kwargs = dict(proz=0.2, landscape_size=data['landscape_size'], device_ids=device_ids, callback_fn=callback_fn) start = time.time() kwargs['executor'] = executor if data['create_landscape']['train']: for i, epochs in enumerate(self._epoch_group.values()): self._log_message(data['create_landscape'], index=i, interval=epochs) kwargs['epochs'] = epochs mid_time = time.time() landscape_data = self._create_landscape_by_pca(**kwargs) logger.info("Create landscape end, use time: %s s." % (round(time.time() - mid_time, 6))) landscape_data.unit = data['unit'] landscape_data.step_per_epoch = data['step_per_epoch'] landscape_data.num_samples = data['num_samples'] yield [epochs[0], epochs[-1]], landscape_data.transform_to_loss_landscape_msg(landscape_data) if data['create_landscape']['result']: final_epochs = [list(self._epoch_group.values())[-1][-1]] self._log_message(data['create_landscape'], final_epochs=final_epochs) kwargs['epochs'] = final_epochs mid_time = time.time() landscape_data = self._create_landscape_by_random(**kwargs) logger.info("Create landscape end, use time: %s s." % (round(time.time() - mid_time, 6))) landscape_data.unit = data['unit'] landscape_data.step_per_epoch = data['step_per_epoch'] landscape_data.num_samples = data['num_samples'] yield final_epochs, landscape_data.transform_to_loss_landscape_msg(landscape_data) logger.info("Total use time: %s s." % (round(time.time() - start, 6))) def _log_message(self, create_landscape, index=None, interval=None, final_epochs=None): """Generate drawing information using log.""" if final_epochs is None: if create_landscape['result']: msg = f"Start to create the {index + 1}/{len(self._epoch_group) + 1} landscapes, " \ f"checkpoint is {interval}, decomposition is PCA." else: msg = f"Start to create the {index + 1}/{len(self._epoch_group)} landscapes, " \ f"checkpoint is {interval}, decomposition is PCA." else: if create_landscape['train']: msg = f"Start to create the {len(self._epoch_group) + 1}/{len(self._epoch_group) + 1} landscapes, " \ f"checkpoint is {final_epochs}, decomposition is Random. " else: msg = f"Start to create the {1}/{1} landscapes, " \ f"checkpoint is {final_epochs}, decomposition is Random." logger.info(msg) @staticmethod def _set_context(device_id): """Set context.""" context.set_context(device_id=device_id) context.set_context(mode=context.GRAPH_MODE) def _create_landscape_by_pca(self, epochs, proz, landscape_size, device_ids=None, callback_fn=None, executor=None): """Create landscape by PCA.""" multi_parameters = self._get_model_params(epochs) param_matrixs = [] for parameters in multi_parameters: parlis = [] for param in parameters: if ("weight" in param.name or "bias" in param.name) and ("moment" not in param.name): data = param.data.asnumpy() parlis = np.concatenate((parlis, data), axis=None) else: continue param_matrixs.append(parlis) param_matrixs = np.vstack(param_matrixs) param_matrixs = param_matrixs[:-1] - param_matrixs[-1] # Only 2 are needed, as we have to reduce high dimensions into 2D.And we reserve one for loss value. pca = _PCA(n_comps=2) principal_components = pca.compute(param_matrixs.T) v_ori, w_ori = np.array(principal_components[:, 0]), np.array(principal_components[:, -1]) final_params = list(multi_parameters[-1]) # Reshape PCA directions(include dimensions of all parameters) into original shape of Model parameters v_ndarray = self._reshape_vector(v_ori, final_params) w_ndarray = self._reshape_vector(w_ori, final_params) # Reshape PCA directions(include dimensions of only weights) into original shape of Model parameters final_params_filtered = self._filter_weight_and_bias(final_params) v_ndarray_filtered = self._reshape_vector(v_ori, final_params_filtered) w_ndarray_filtered = self._reshape_vector(w_ori, final_params_filtered) v_ndarray, w_ndarray = self._normalize_vector(final_params, v_ndarray, w_ndarray) v_ndarray_filtered, w_ndarray_filtered = self._normalize_vector(final_params_filtered, v_ndarray_filtered, w_ndarray_filtered) # Flat to a single vector and calc alpha, beta v_param = self._flat_ndarray(v_ndarray_filtered) w_param = self._flat_ndarray(w_ndarray_filtered) final_params_numpy = [param.data.asnumpy() for param in final_params] final_params_filtered_numpy = [param.data.asnumpy() for param in final_params_filtered] coefs = self._calc_coefs(multi_parameters, final_params_filtered_numpy, v_param, w_param) # generate coordinates of loss landscape coefs_x = coefs[:, 0][np.newaxis] coefs_y = coefs[:, 1][np.newaxis] x_axis = np.linspace(min(coefs_x[0]) - proz * (max(coefs_x[0]) - min(coefs_x[0])), max(coefs_x[0]) + proz * (max(coefs_x[0]) - min(coefs_x[0])), landscape_size) y_axis = np.linspace(min(coefs_y[0]) - proz * (max(coefs_y[0]) - min(coefs_y[0])), max(coefs_y[0]) + proz * (max(coefs_y[0]) - min(coefs_y[0])), landscape_size) x_points, y_points = np.meshgrid(x_axis, y_axis) test_final_params = dict() for param in final_params: test_final_params[param.name] = param.data.asnumpy() if executor is not None: coefs_parts, y_points_parts = [], [] count_per_parts = len(coefs) // len(device_ids) start = 0 for i in range(len(device_ids)): if i != len(device_ids) - 1: coefs_parts.append(coefs[start:start + count_per_parts]) start = start + count_per_parts else: coefs_parts.append(coefs[start:]) count_per_parts = len(y_points) // len(device_ids) start = 0 logger.info("Use multi process, device_id: %s." % (device_ids)) for i in range(len(device_ids)): if i != len(device_ids) - 1: y_points_parts.append(y_points[start:start + count_per_parts]) start = start + count_per_parts else: y_points_parts.append(y_points[start:]) futures = [] for i, _ in enumerate(device_ids): future = executor.submit(self._cont_loss_wrapper, callback_fn, test_final_params, final_params_numpy, v_ndarray, w_ndarray, x_points, y_points_parts[i], coefs=coefs_parts[i]) futures.append(future) wait(futures, return_when=ALL_COMPLETED) z_points, paths = [], [] for future in futures: paths += future.result()[0] z_points += future.result()[1] else: paths, z_points = self._cont_loss_wrapper(callback_fn, test_final_params, final_params_numpy, v_ndarray, w_ndarray, x_points, y_points, coefs=coefs) paths = np.array(paths) landscape_points = Points(x_points, y_points, np.vstack(z_points)) path_points = Points(coefs_x[0], coefs_y[0], paths.T[0]) zero_index = int(np.argwhere(path_points.x == 0)) convergence_point = Points(np.array([0]), np.array([0]), np.array([path_points.z[zero_index]])) landscape = Landscape(intervals=epochs, decomposition='PCA', landscape_points=landscape_points, path_points=path_points, convergence_point=convergence_point) return landscape def _cont_loss_wrapper(self, callback_fn, test_final_params, final_params_numpy, v_ndarray, w_ndarray, x_points, y_points, coefs=None): """Compute loss wrapper.""" model, network, valid_dataset, metrics = callback_fn() with open(os.path.join(self._ckpt_dir, 'train_metadata.json'), 'r') as file: data = json.load(file) self._check_json_file_data(data) num_samples = data['num_samples'] batch_size = valid_dataset.get_batch_size() num_batches = num_samples // batch_size valid_dataset = valid_dataset.take(num_batches) paths, final_params = [], [] for (key, value) in test_final_params.items(): parameter = Parameter(Tensor(value), name=key, requires_grad=True) final_params.append(parameter) if coefs is not None: for i, coef in enumerate(coefs): loss_data = self._cont_loss(valid_dataset, network, model, metrics, final_params, final_params_numpy, [coef[0]], coef[1], v_ndarray, w_ndarray, path=True) paths.append(loss_data) print("Drawing landscape path total progress is %s/%s, landscape path loss is %s." % (i+1, len(coefs), loss_data[0])) # Start to calc loss landscape z_points = list() # Compute loss landscape for i, _ in enumerate(y_points): print("Drawing landscape total progress: %s/%s." % (i+1, len(y_points))) vals = self._cont_loss(valid_dataset, network, model, metrics, final_params, final_params_numpy, x_points[i], y_points[i][0], v_ndarray, w_ndarray) z_points.append(vals) return paths, z_points def _create_landscape_by_random(self, epochs, proz, landscape_size, device_ids=None, callback_fn=None, executor=None): """Create landscape by Random.""" multi_parameters = self._get_model_params(epochs) final_params = list(multi_parameters[-1]) final_params_numpy = [param.data.asnumpy() for param in final_params] total_params = sum(np.size(p) for p in final_params_numpy) v_rand = np.random.normal(size=total_params) w_rand = np.random.normal(size=total_params) # Reshape Random directions(include dimensions of all parameters) into original shape of Model parameters v_ndarray = self._reshape_random_vector(v_rand, final_params_numpy) w_ndarray = self._reshape_random_vector(w_rand, final_params_numpy) v_ndarray, w_ndarray = self._normalize_vector(final_params, v_ndarray, w_ndarray) boundaries_x, boundaries_y = 5, 5 x_axis = np.linspace(-proz * boundaries_x, proz * boundaries_x, landscape_size) y_axis = np.linspace(-proz * boundaries_y, proz * boundaries_y, landscape_size) x_points, y_points = np.meshgrid(x_axis, y_axis) test_final_params = dict() for param in final_params: test_final_params[param.name] = param.data.asnumpy() if executor is not None: logger.info("Use multi process, device_id: %s." % (device_ids)) y_points_parts = [] count_per_parts = len(y_points) // len(device_ids) start = 0 for i in range(len(device_ids)): if i != len(device_ids) - 1: y_points_parts.append(y_points[start:start + count_per_parts]) start = start + count_per_parts else: y_points_parts.append(y_points[start:]) futures = [] for i in range(len(device_ids)): future = executor.submit(self._cont_loss_wrapper, callback_fn, test_final_params, final_params_numpy, v_ndarray, w_ndarray, x_points, y_points_parts[i]) futures.append(future) wait(futures, return_when=ALL_COMPLETED) z_points = [] for future in futures: z_points += future.result()[1] else: _, z_points = self._cont_loss_wrapper(callback_fn, test_final_params, final_params_numpy, v_ndarray, w_ndarray, x_points, y_points) landscape_points = Points(x_points, y_points, np.vstack(z_points)) convergence_point = Points(np.array([x_axis[len(x_axis)//2]]), np.array([y_axis[len(y_axis)//2]]), np.array([z_points[len(x_axis)//2][len(y_axis)//2]])) landscape = Landscape(intervals=epochs, decomposition='Random', landscape_points=landscape_points, convergence_point=convergence_point) return landscape @staticmethod def _filter_weight_and_bias(parameters): """Filter the weight and bias of parameters.""" filter_params = [] for param in parameters: if ('weight' not in param.name and 'bias' not in param.name) or ('moment' in param.name): continue filter_params.append(param) return filter_params @staticmethod def _reshape_vector(vector, parameters): """Reshape vector into model shape.""" ndarray = list() index = 0 for param in parameters: data = param.data.asnumpy() if ("weight" not in param.name and "bias" not in param.name) or ("moment" in param.name): ndarray.append(np.array(data, dtype=np.float32)) continue vec_it = vector[index:(index + data.size)].reshape(data.shape) ndarray.append(np.array(vec_it, dtype=np.float32)) index += data.size return ndarray @staticmethod def _reshape_random_vector(vector, params_numpy): """ Reshape random vector into model shape.""" ndarray = list() index = 0 for param in params_numpy: len_p = np.size(param) p_size = np.shape(param) vec_it = vector[index:(index + len_p)].reshape(p_size) ndarray.append(np.array(vec_it, dtype=np.float32)) index += len_p return ndarray @staticmethod def _normalize_vector(parameters, get_v, get_w): """ Normalizes the vectors spanning the 2D space, to make trajectories comparable between each other. """ for i, param in enumerate(parameters): # Here as MindSpore ckpt has hyperparameters, we should skip them to make sure # PCA calculation is correct. data = param.data.asnumpy() if ("weight" in param.name or "bias" in param.name) and ("moment" not in param.name): factor_v = np.linalg.norm(data) / np.linalg.norm(get_v[i]) factor_w = np.linalg.norm(data) / np.linalg.norm(get_w[i]) get_v[i] = get_v[i] * factor_v get_w[i] = get_w[i] * factor_w else: get_v[i] = get_v[i] * 0 get_w[i] = get_w[i] * 0 return get_v, get_w @staticmethod def _flat_ndarray(ndarray_vector): """Concatenates a python array of numpy arrays into a single, flat numpy array.""" return np.concatenate([item.flatten() for item in ndarray_vector], axis=None) def _calc_coefs(self, parameter_group, final_param_ndarray, v_vector, w_vector): """ Calculates the scale factors for plotting points in the 2D space spanned by the vectors v and w. """ matris = [v_vector, w_vector] matris = np.vstack(matris) matris = matris.T pas = self._flat_ndarray(final_param_ndarray) coefs = list() for parameters in parameter_group: testi = list() for param in parameters: # Here as MindSpore ckpt has hyperparameters, # we should skip them to make sure PCA calculation is correct if ('weight' not in param.name and 'bias' not in param.name) or ('moment' in param.name): continue testi.append(param.data.asnumpy()) st_vec = self._flat_ndarray(testi) b_vec = st_vec - pas # Here using least square method to get solutions of a equation system to generate alpha and beta. coefs.append(np.hstack(np.linalg.lstsq(matris, b_vec, rcond=None)[0])) return np.array(coefs) def _cont_loss(self, ds_eval, network, model, metrics, parameters, final_params_numpy, alph, beta, get_v, get_w, path=False): """ Calculates the loss landscape based on vectors v and w (which can be principal components). Changes the internal state of model. Executes model. """ logger.info("start to cont loss") vals = list() al_item = 0 for i, _ in enumerate(alph): # calculate new parameters for model parameters_dict = dict() for j, param in enumerate(parameters): parameters_dict[param.name] = self._change_parameter(j, param, final_params_numpy, alph[al_item], beta, get_v, get_w) al_item += 1 # load parameters into model and calculate loss load_param_into_net(network, parameters_dict) del parameters_dict loss = self._loss_compute(model, ds_eval, metrics) if path is False: print("Current local landscape progress is %s/%s, landscape loss is %s." % (i+1, len(alph), loss.get('Loss'))) vals = np.append(vals, loss.get('Loss')) return vals @staticmethod def _change_parameter(index, parameter, final_params_numpy, alpha, beta, get_v, get_w): """Function for changing parameter value with map and lambda.""" data = final_params_numpy[index] data_target = data + alpha * get_v[index] + beta * get_w[index] data_target = Tensor(data_target.astype(np.float32)) parameter.set_data(Tensor(data_target)) return parameter def _loss_compute(self, model, data, metrics): """Compute loss.""" dataset_sink_mode = False self._metric_fns = get_metrics(metrics) for metric in self._metric_fns.values(): metric.clear() network = model.train_network dataset_helper = DatasetHelper(data, dataset_sink_mode) network.set_train(True) network.phase = 'train' for inputs in dataset_helper: inputs = transfer_tensor_to_tuple(inputs) outputs = network(*inputs) self._update_metrics(outputs) metrics = self._get_metrics() return metrics def _update_metrics(self, outputs): """Update metrics local values.""" if isinstance(outputs, Tensor): outputs = (outputs,) if not isinstance(outputs, tuple): raise ValueError(f"The argument 'outputs' should be tuple, but got {type(outputs)}. " f"Modify 'output' to Tensor or tuple. ") for metric in self._metric_fns.values(): metric.update(outputs[0]) def _get_metrics(self): """Get metrics local values.""" metrics = dict() for key, value in self._metric_fns.items(): metrics[key] = value.eval() return metrics def _check_unit(self, unit): """Check unit type and value.""" check_value_type('unit', unit, str) if unit not in ["step", "epoch"]: raise ValueError(f'For "{self.__class__.__name__}", the "unit" in train_metadata.json should be ' f'step or epoch, but got the: {unit}') def _check_landscape_size(self, landscape_size): """Check landscape size type and value.""" check_value_type('landscape_size', landscape_size, int) # landscape size should be between 3 and 256. if landscape_size < 3 or landscape_size > 256: raise ValueError(f'For "{self.__class__.__name__}", "landscape_size" in train_metadata.json should be ' f'between 3 and 256, but got the: {landscape_size}') def _check_create_landscape(self, create_landscape): """Check create landscape type and value.""" check_value_type('create_landscape', create_landscape, dict) for param, value in create_landscape.items(): if param not in ["train", "result"]: raise ValueError(f'For "{self.__class__.__name__}", the key of "create_landscape" should be in ' f'["train", "result"], but got the: {param}.') if len(create_landscape) < 2: raise ValueError(f'For "{self.__class__.__name__}", the key of "create_landscape" should be train ' f'and result, but only got the: {param}') check_value_type(param, value, bool) def _check_intervals(self, intervals): """Check intervals type and value.""" check_value_type('intervals', intervals, list) for _, interval in enumerate(intervals): check_value_type('each interval in intervals', interval, list) #Each interval have at least three epochs. if len(interval) < 3: raise ValueError(f'For "{self.__class__.__name__}", the length of each list in "intervals" ' f'should not be less than three, but got the: {interval}.') for j in interval: if not isinstance(j, int): raise TypeError(f'For "{self.__class__.__name__}", the type of each value in "intervals" ' f'should be int, but got the: {type(j)}.') def _check_device_ids(self, device_ids): """Check device_ids type and value.""" check_value_type('device_ids', device_ids, list) for i in device_ids: if not isinstance(i, int): raise TypeError(f'For "{self.__class__.__name__}.gen_landscapes_with_multi_process", the parameter ' f'"device_ids" type should be int, but got the: {type(i)}.') #device_id should be between 0 and 7. if i < 0 or i > 7: raise ValueError(f'For "{self.__class__.__name__}.gen_landscapes_with_multi_process", the parameter ' f'"device_ids" should be between 0 and 7, but got {i}.') def _check_collect_landscape_data(self, collect_landscape): """Check collect landscape data type and value.""" for param in collect_landscape.keys(): if param not in ["landscape_size", "unit", "num_samples", "create_landscape", "intervals"]: raise ValueError(f'For "{self.__class__.__name__}", the key of collect landscape should be ' f'landscape_size, unit, num_samples create_landscape or intervals, ' f'but got the: {param}. ') if "landscape_size" in collect_landscape: landscape_size = collect_landscape.get("landscape_size") self._check_landscape_size(landscape_size) if "unit" in collect_landscape: unit = collect_landscape.get("unit") self._check_unit(unit) if "num_samples" in collect_landscape: num_samples = collect_landscape.get("num_samples") check_value_type("num_samples", num_samples, int) if "create_landscape" in collect_landscape: create_landscape = collect_landscape.get("create_landscape") self._check_create_landscape(create_landscape) if "intervals" in collect_landscape: intervals = collect_landscape.get("intervals") self._check_intervals(intervals) def _check_json_file_data(self, json_file_data): """Check json file data.""" file_key = ["epoch_group", "model_params_file_map", "step_per_epoch", "unit", "num_samples", "landscape_size", "create_landscape"] for key in json_file_data.keys(): if key not in file_key: raise ValueError(f'"train_metadata" json file should be {file_key}, but got the: {key}') epoch_group = json_file_data["epoch_group"] model_params_file_map = json_file_data["model_params_file_map"] step_per_epoch = json_file_data["step_per_epoch"] unit = json_file_data["unit"] num_samples = json_file_data["num_samples"] landscape_size = json_file_data["landscape_size"] create_landscape = json_file_data["create_landscape"] for _, epochs in enumerate(epoch_group.values()): # Each epoch_group have at least three epochs. if len(epochs) < 3: raise ValueError(f'For "{self.__class__.__name__}", the "epoch_group" in train_metadata.json, ' f'length of each list in "epoch_group" should not be less than 3, ' f'but got: {len(epochs)}. ') for epoch in epochs: if str(epoch) not in model_params_file_map.keys(): raise ValueError(f'For "{self.__class__.__name__}", the "model_params_file_map" in ' f'train_metadata.json does not exist {epoch}th checkpoint in intervals.') check_value_type('step_per_epoch', step_per_epoch, int) self._check_landscape_size(landscape_size) self._check_unit(unit) check_value_type("num_samples", num_samples, int) self._check_create_landscape(create_landscape)
class _PCA: r""" The internal class for computing PCA vectors. .. math:: u, s, vt = svd(x - mean(x)), u_i = u_i * s_i, where :math:`mean` is the mean operator, :math:`svd` is the singular value decomposition operator. :math:`u_i` is line :math:`i` of the :math:`u`, :math:`s_i` is column :math:`i` of the :math:`s`, :math:`i` ranges from :math:`0` to :math:`n\_comps`. Args: n_comps (int): Number of principal components needed. """ def __init__(self, n_comps): self._n_comps = n_comps self._random_status = None self._iterated_power = "auto" self._n_oversamples = 10 @staticmethod def _safe_dot(a, b): """Dot product that handle the matrix case correctly.""" if a.ndim > 2 or b.ndim > 2: if sparse.issparse(b): # Sparse is always 2 dimensional. Implies a is above 3 dimensional. # [n, ..., o, p] @ [l, m] -> [n, ..., o, m] a_2d = a.reshape(-1, a.shape[-1]) ret = a_2d @ b ret = ret.reshape(*a.shape[:-1], b.shape[1]) elif sparse.issparse(a): # Sparse is always 2 dimensional. Implies b is above 3 dimensional. # [l, m] @ [n, ..., o, p, q] -> [l, n, ..., o, q] b_ = np.rollaxis(b, -2) b_2d = b_.reshape((b.shape[-2], -1)) ret = a @ b_2d ret = ret.reshape(a.shape[0], *b_.shape[1:]) else: ret = np.dot(a, b) else: ret = a @ b return ret @staticmethod def _svd_turn(u, v, u_decision=True): """Confirm correction to ensure deterministic output from SVD.""" if u_decision: # rows of v, columns of u max_cols = np.argmax(np.abs(u), axis=0) signs = np.sign(u[max_cols, list(range(u.shape[1]))]) v *= signs[:, np.newaxis] u *= signs else: # rows of u, columns of v max_rows = np.argmax(np.abs(v), axis=1) signs = np.sign(v[list(range(v.shape[0])), max_rows]) v *= signs[:, np.newaxis] u *= signs return u, v @staticmethod def _check_random_status(seed): """Transform seed into a np.random.RandomState instance.""" if isinstance(seed, np.random.RandomState): return seed if seed is None or seed is np.random: return np.random.RandomState() if isinstance(seed, numbers.Integral): return np.random.RandomState(seed) raise ValueError( "%r cannot be used to seed a numpy.random.RandomState instance" % seed ) def compute(self, x): """Main method for computing principal components.""" n_components = self._n_comps # small dimension (the shape is less than 500), and the full amount is calculated. if max(x.shape) <= 500: u, s, _ = self._fit_few(x) # When dimension of x is much, truncated SVD is used for calculation. elif 1 <= n_components < 0.8 * min(x.shape): u, s, _ = self._fit_much(x, n_components) # A case of n_components in (0, 1) else: u, s, _ = self._fit_few(x) for i, _ in enumerate(s): # To prevent s from being equal to 0, a small fixed noise is added. # Adjust 1e-19 was found a good compromise for s. if s[i] == 0: s[i] = 1e-19 u = u[:, :self._n_comps] u *= s[:self._n_comps] return u def _fit_few(self, x): """Compute principal components with full SVD on x, when dimension of x is few.""" mean_ = np.mean(x, axis=0) x -= mean_ u, s, vt = linalg.svd(x, full_matrices=False) u, vt = self._svd_turn(u, vt) return u, s, vt def _fit_much(self, x, n_components): """Compute principal components with truncated SVD on x, when dimension of x is much.""" random_state = self._check_random_status(self._random_status) mean_ = np.mean(x, axis=0) x -= mean_ u, s, vt = self._random_svd(x, n_components, n_oversamples=self._n_oversamples, random_state=random_state) return u, s, vt def _random_svd(self, m, n_components, n_oversamples=10, random_state="warn"): """Compute a truncated randomized SVD.""" n_random = n_components + n_oversamples n_samples, n_features = m.shape # Adjust 7 or 4 was found a good compromise for randomized SVD. n_iter = 7 if n_components < 0.1 * min(m.shape) else 4 if n_samples < n_features: m = m.T q = self._random_range_finder(m, size=n_random, n_iter=n_iter, random_state=random_state) # Project m to the low dimensional space using the basis vectors (q vector). b = self._safe_dot(q.T, m) # Compute the svd on this matrix (b matrix) uhat, s, vt = linalg.svd(b, full_matrices=False) del b u = np.dot(q, uhat) if n_samples < n_features: u, vt = self._svd_turn(u, vt, u_decision=False) else: u, vt = self._svd_turn(u, vt) if n_samples < n_features: return vt[:n_components, :].T, s[:n_components], u[:, :n_components].T return u[:, :n_components], s[:n_components], vt[:n_components, :] def _random_range_finder(self, a, size, n_iter, random_state=None): """Computes an orthonormal matrix whose range approximates the range of A.""" random_state = self._check_random_status(random_state) # Generate normal random vectors. q = random_state.normal(size=(a.shape[1], size)) if a.dtype.kind == "f": # Ensure f32 is retained as f32 q = q.astype(a.dtype, copy=False) if n_iter <= 2: power_iteration_normalizer = "none" else: power_iteration_normalizer = "LU" # use power iterations with q to further compute the top singular vectors of a in q for _ in range(n_iter): if power_iteration_normalizer == "none": q = self._safe_dot(a, q) q = self._safe_dot(a.T, q) elif power_iteration_normalizer == "LU": q, _ = linalg.lu(self._safe_dot(a, q), permute_l=True) q, _ = linalg.lu(self._safe_dot(a.T, q), permute_l=True) # The orthogonal basis is extracted by the linear projection of Q, and the range of a is sampled. q, _ = linalg.qr(self._safe_dot(a, q), mode="economic") return q