mindspore.dataset.audio.transforms 源代码

# Copyright 2021-2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
The module audio.transforms is inherited from _c_dataengine and is
implemented based on C++. It's a high performance module to process
audio. Users can apply suitable augmentations on audio data to improve
their training models.
"""

import numpy as np

import mindspore._c_dataengine as cde
from .utils import BorderType, DensityFunction, FadeShape, GainType, Interpolation, MelType, Modulation, NormType, \
    ResampleMethod, ScaleType, WindowType
from .validators import check_allpass_biquad, check_amplitude_to_db, check_band_biquad, check_bandpass_biquad, \
    check_bandreject_biquad, check_bass_biquad, check_biquad, check_complex_norm, check_compute_deltas, \
    check_contrast, check_db_to_amplitude, check_dc_shift, check_deemph_biquad, check_detect_pitch_frequency, \
    check_dither, check_equalizer_biquad, check_fade, check_flanger, check_gain, check_griffin_lim, \
    check_highpass_biquad, check_inverse_mel_scale, check_lfilter, check_lowpass_biquad, check_magphase, \
    check_mask_along_axis, check_mask_along_axis_iid, check_masking, check_mel_scale, check_mu_law_coding, \
    check_overdrive, check_phase_vocoder, check_phaser, check_resample, check_riaa_biquad, check_sliding_window_cmn, \
    check_spectral_centroid, check_spectrogram, check_time_stretch, check_treble_biquad, check_vad, check_vol
from ..transforms.py_transforms_util import Implementation
from ..transforms.transforms import TensorOperation


class AudioTensorOperation(TensorOperation):
    """
    Base class of Audio Tensor Ops.
    """

    def __init__(self):
        super().__init__()
        self.implementation = Implementation.C

    def __call__(self, *input_tensor_list):
        for tensor in input_tensor_list:
            if not isinstance(tensor, (np.ndarray,)):
                raise TypeError("Input should be NumPy audio, got {}.".format(type(tensor)))
        return super().__call__(*input_tensor_list)

    def parse(self):
        raise NotImplementedError("AudioTensorOperation has to implement parse() method.")


[文档]class AllpassBiquad(AudioTensorOperation): r""" Design two-pole all-pass filter with central frequency and bandwidth for audio waveform. An all-pass filter changes the audio's frequency to phase relationship without changing its frequency to amplitude relationship. The system function is: .. math:: H(s) = \frac{s^2 - \frac{s}{Q} + 1}{s^2 + \frac{s}{Q} + 1} Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Note: The dimension of the audio waveform to be processed needs to be (..., time). Args: sample_rate (int): Sampling rate (in Hz), which can't be zero. central_freq (float): Central frequency (in Hz). Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ , in range of (0, 1]. Default: 0.707. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is 0. TypeError: If `central_freq` is not of type float. TypeError: If `Q` is not of type float. ValueError: If `Q` is not in range of (0, 1]. RuntimeError: If input tensor is not in shape of <..., time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.AllpassBiquad(44100, 200.0)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_allpass_biquad def __init__(self, sample_rate, central_freq, Q=0.707): super().__init__() self.sample_rate = sample_rate self.central_freq = central_freq self.quality_factor = Q def parse(self): return cde.AllpassBiquadOperation(self.sample_rate, self.central_freq, self.quality_factor)
DE_C_SCALE_TYPE = {ScaleType.POWER: cde.ScaleType.DE_SCALE_TYPE_POWER, ScaleType.MAGNITUDE: cde.ScaleType.DE_SCALE_TYPE_MAGNITUDE}
[文档]class AmplitudeToDB(AudioTensorOperation): r""" Turn the input audio waveform from the amplitude/power scale to decibel scale. Note: The dimension of the audio waveform to be processed needs to be (..., freq, time). Args: stype (ScaleType, optional): Scale of the input waveform, which can be ScaleType.POWER or ScaleType.MAGNITUDE. Default: ScaleType.POWER. ref_value (float, optional): Multiplier reference value for generating `db_multiplier` . Default: 1.0. The formula is :math:`\text{db_multiplier} = Log10(max(\text{ref_value}, amin))` . amin (float, optional): Lower bound to clamp the input waveform, which must be greater than zero. Default: 1e-10. top_db (float, optional): Minimum cut-off decibels, which must be non-negative. Default: 80.0. Raises: TypeError: If `stype` is not of type :class:`mindspore.dataset.audio.ScaleType` . TypeError: If `ref_value` is not of type float. ValueError: If `ref_value` is not a positive number. TypeError: If `amin` is not of type float. ValueError: If `amin` is not a positive number. TypeError: If `top_db` is not of type float. ValueError: If `top_db` is not a positive number. RuntimeError: If input tensor is not in shape of <..., freq, time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> from mindspore.dataset.audio import ScaleType >>> >>> waveform = np.random.random([1, 400 // 2 + 1, 30]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.AmplitudeToDB(stype=ScaleType.POWER)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_amplitude_to_db def __init__(self, stype=ScaleType.POWER, ref_value=1.0, amin=1e-10, top_db=80.0): super().__init__() self.stype = stype self.ref_value = ref_value self.amin = amin self.top_db = top_db def parse(self): return cde.AmplitudeToDBOperation(DE_C_SCALE_TYPE.get(self.stype), self.ref_value, self.amin, self.top_db)
[文档]class Angle(AudioTensorOperation): """ Calculate the angle of complex number sequence. Note: The dimension of the audio waveform to be processed needs to be (..., complex=2). The first dimension represents the real part while the second represents the imaginary. Raises: RuntimeError: If input tensor is not in shape of <..., complex=2>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.array([[1.43, 5.434], [23.54, 89.38]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Angle()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ def parse(self): return cde.AngleOperation()
[文档]class BandBiquad(AudioTensorOperation): """ Design two-pole band-pass filter for audio waveform. The frequency response drops logarithmically around the center frequency. The bandwidth gives the slope of the drop. The frequencies at band edge will be half of their original amplitudes. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Note: The dimension of the audio waveform to be processed needs to be (..., time). Args: sample_rate (int): Sampling rate (in Hz), which can't be zero. central_freq (float): Central frequency (in Hz). Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ , in range of (0, 1]. Default: 0.707. noise (bool, optional) : If True, uses the alternate mode for un-pitched audio (e.g. percussion). If False, uses mode oriented to pitched audio, i.e. voice, singing, or instrumental music. Default: False. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is 0. TypeError: If `central_freq` is not of type float. TypeError: If `Q` is not of type float. ValueError: If `Q` is not in range of (0, 1]. TypeError: If `noise` is not of type bool. RuntimeError: If input tensor is not in shape of <..., time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.BandBiquad(44100, 200.0)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_band_biquad def __init__(self, sample_rate, central_freq, Q=0.707, noise=False): super().__init__() self.sample_rate = sample_rate self.central_freq = central_freq self.quality_factor = Q self.noise = noise def parse(self): return cde.BandBiquadOperation(self.sample_rate, self.central_freq, self.quality_factor, self.noise)
[文档]class BandpassBiquad(AudioTensorOperation): r""" Design two-pole Butterworth band-pass filter for audio waveform. The frequency response of the Butterworth filter is maximally flat (i.e. has no ripples) in the passband and rolls off towards zero in the stopband. The system function of Butterworth band-pass filter is: .. math:: H(s) = \begin{cases} \frac{s}{s^2 + \frac{s}{Q} + 1}, &\text{if const_skirt_gain=True}; \cr \frac{\frac{s}{Q}}{s^2 + \frac{s}{Q} + 1}, &\text{if const_skirt_gain=False}. \end{cases} Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Note: The dimension of the audio waveform to be processed needs to be (..., time). Args: sample_rate (int): Sampling rate (in Hz), which can't be zero. central_freq (float): Central frequency (in Hz). Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ , in range of (0, 1]. Default: 0.707. const_skirt_gain (bool, optional) : If True, uses a constant skirt gain (peak gain = Q); If False, uses a constant 0dB peak gain. Default: False. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is 0. TypeError: If `central_freq` is not of type float. TypeError: If `Q` is not of type float. ValueError: If `Q` is not in range of (0, 1]. TypeError: If `const_skirt_gain` is not of type bool. RuntimeError: If input tensor is not in shape of <..., time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.BandpassBiquad(44100, 200.0)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_bandpass_biquad def __init__(self, sample_rate, central_freq, Q=0.707, const_skirt_gain=False): super().__init__() self.sample_rate = sample_rate self.central_freq = central_freq self.quality_factor = Q self.const_skirt_gain = const_skirt_gain def parse(self): return cde.BandpassBiquadOperation(self.sample_rate, self.central_freq, self.quality_factor, self.const_skirt_gain)
[文档]class BandrejectBiquad(AudioTensorOperation): r""" Design two-pole Butterworth band-reject filter for audio waveform. The frequency response of the Butterworth filter is maximally flat (i.e. has no ripples) in the passband and rolls off towards zero in the stopband. The system function of Butterworth band-reject filter is: .. math:: H(s) = \frac{s^2 + 1}{s^2 + \frac{s}{Q} + 1} Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Note: The dimension of the audio waveform to be processed needs to be (..., time). Args: sample_rate (int): Sampling rate (in Hz), which can't be zero. central_freq (float): Central frequency (in Hz). Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ , in range of (0, 1]. Default: 0.707. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is 0. TypeError: If `central_freq` is not of type float. TypeError: If `Q` is not of type float. ValueError: If `Q` is not in range of (0, 1]. RuntimeError: If input tensor is not in shape of <..., time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03],[9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.BandrejectBiquad(44100, 200.0)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_bandreject_biquad def __init__(self, sample_rate, central_freq, Q=0.707): super().__init__() self.sample_rate = sample_rate self.central_freq = central_freq self.quality_factor = Q def parse(self): return cde.BandrejectBiquadOperation(self.sample_rate, self.central_freq, self.quality_factor)
[文档]class BassBiquad(AudioTensorOperation): r""" Design a bass tone-control effect, also known as two-pole low-shelf filter for audio waveform. A low-shelf filter passes all frequencies, but increase or reduces frequencies below the shelf frequency by specified amount. The system function is: .. math:: H(s) = A\frac{s^2 + \frac{\sqrt{A}}{Q}s + A}{As^2 + \frac{\sqrt{A}}{Q}s + 1} Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Note: The dimension of the audio waveform to be processed needs to be (..., time). Args: sample_rate (int): Sampling rate (in Hz), which can't be zero. gain (float): Desired gain at the boost (or attenuation) in dB. central_freq (float, optional): Central frequency (in Hz). Default: 100.0. Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ , in range of (0, 1]. Default: 0.707. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is 0. TypeError: If `gain` is not of type float. TypeError: If `central_freq` is not of type float. TypeError: If `Q` is not of type float. ValueError: If `Q` is not in range of (0, 1]. RuntimeError: If input tensor is not in shape of <..., time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.BassBiquad(44100, 100.0)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_bass_biquad def __init__(self, sample_rate, gain, central_freq=100.0, Q=0.707): super().__init__() self.sample_rate = sample_rate self.gain = gain self.central_freq = central_freq self.quality_factor = Q def parse(self): return cde.BassBiquadOperation(self.sample_rate, self.gain, self.central_freq, self.quality_factor)
[文档]class Biquad(TensorOperation): """ Perform a biquad filter of input audio. Mathematical fomulas refer to: `Digital_biquad_filter <https://en.wikipedia.org/wiki/Digital_biquad_filter>`_ . Args: b0 (float): Numerator coefficient of current input, x[n]. b1 (float): Numerator coefficient of input one time step ago x[n-1]. b2 (float): Numerator coefficient of input two time steps ago x[n-2]. a0 (float): Denominator coefficient of current output y[n], the value can't be zero, typically 1. a1 (float): Denominator coefficient of current output y[n-1]. a2 (float): Denominator coefficient of current output y[n-2]. Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> biquad_op = audio.Biquad(0.01, 0.02, 0.13, 1, 0.12, 0.3) >>> waveform_filtered = biquad_op(waveform) """ @check_biquad def __init__(self, b0, b1, b2, a0, a1, a2): super().__init__() self.b0 = b0 self.b1 = b1 self.b2 = b2 self.a0 = a0 self.a1 = a1 self.a2 = a2 def parse(self): return cde.BiquadOperation(self.b0, self.b1, self.b2, self.a0, self.a1, self.a2)
[文档]class ComplexNorm(AudioTensorOperation): """ Compute the norm of complex number sequence. Note: The dimension of the audio waveform to be processed needs to be (..., complex=2). The first dimension represents the real part while the second represents the imaginary. Args: power (float, optional): Power of the norm, which must be non-negative. Default: 1.0. Raises: TypeError: If `power` is not of type float. ValueError: If `power` is a negative number. RuntimeError: If input tensor is not in shape of <..., complex=2>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.random.random([2, 4, 2]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.ComplexNorm()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_complex_norm def __init__(self, power=1.0): super().__init__() self.power = power def parse(self): return cde.ComplexNormOperation(self.power)
DE_C_BORDER_TYPE = { BorderType.CONSTANT: cde.BorderType.DE_BORDER_CONSTANT, BorderType.EDGE: cde.BorderType.DE_BORDER_EDGE, BorderType.REFLECT: cde.BorderType.DE_BORDER_REFLECT, BorderType.SYMMETRIC: cde.BorderType.DE_BORDER_SYMMETRIC, }
[文档]class ComputeDeltas(AudioTensorOperation): r""" Compute delta coefficients, also known as differential coefficients, of a spectrogram. Delta coefficients help to understand the dynamics of the power spectrum. It can be computed using the following formula. .. math:: d_{t}=\frac{{\textstyle\sum_{n=1}^{N}}n(c_{t+n}-c_{t-n})}{2{\textstyle\sum_{n=1}^{N}}n^{2}} where :math:`d_{t}` is the deltas at time :math:`t` , :math:`c_{t}` is the spectrogram coefficients at time :math:`t` , :math:`N` is :math:`(\text{win_length} - 1) // 2` . Args: win_length (int, optional): The window length used for computing delta, must be no less than 3. Default: 5. pad_mode (BorderType, optional): Mode parameter passed to padding, can be BorderType.CONSTANT, BorderType.EDGE, BorderType.REFLECT or BorderType.SYMMETRIC. Default: BorderType.EDGE. - BorderType.CONSTANT, pad with a constant value. - BorderType.EDGE, pad with the last value on the edge. - BorderType.REFLECT, reflect the value on the edge while omitting the last one. For example, pad [1, 2, 3, 4] with 2 elements on both sides will result in [3, 2, 1, 2, 3, 4, 3, 2]. - BorderType.SYMMETRIC, reflect the value on the edge while repeating the last one. For example, pad [1, 2, 3, 4] with 2 elements on both sides will result in [2, 1, 1, 2, 3, 4, 4, 3]. Raises: TypeError: If `win_length` is not of type int. ValueError: If `win_length` is less than 3. TypeError: If `pad_mode` is not of type :class:`mindspore.dataset.audio.BorderType` . RuntimeError: If input tensor is not in shape of <..., freq, time>. Examples: >>> import numpy as np >>> from mindspore.dataset.audio import BorderType >>> >>> waveform = np.random.random([1, 400 // 2 + 1, 30]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.ComputeDeltas(win_length=7, pad_mode=BorderType.EDGE)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_compute_deltas def __init__(self, win_length=5, pad_mode=BorderType.EDGE): super().__init__() self.win_len = win_length self.pad_mode = pad_mode def parse(self): return cde.ComputeDeltasOperation(self.win_len, DE_C_BORDER_TYPE.get(self.pad_mode))
[文档]class Contrast(AudioTensorOperation): """ Apply contrast effect for audio waveform. Comparable with compression, this effect modifies an audio signal to make it sound louder. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Note: The dimension of the audio waveform to be processed needs to be (..., time). Args: enhancement_amount (float, optional): Controls the amount of the enhancement, in range of [0, 100]. Default: 75.0. Note that `enhancement_amount` equal to 0 still gives a significant contrast enhancement. Raises: TypeError: If `enhancement_amount` is not of type float. ValueError: If `enhancement_amount` is not in range [0, 100]. RuntimeError: If input tensor is not in shape of <..., time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Contrast()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_contrast def __init__(self, enhancement_amount=75.0): super().__init__() self.enhancement_amount = enhancement_amount def parse(self): return cde.ContrastOperation(self.enhancement_amount)
[文档]class DBToAmplitude(AudioTensorOperation): """ Turn a waveform from the decibel scale to the power/amplitude scale. Args: ref (float): Reference which the output will be scaled by. power (float): If power equals 1, will compute DB to power. If 0.5, will compute DB to amplitude. Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.DBToAmplitude(0.5, 0.5)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_db_to_amplitude def __init__(self, ref, power): super().__init__() self.ref = ref self.power = power def parse(self): return cde.DBToAmplitudeOperation(self.ref, self.power)
[文档]class DCShift(AudioTensorOperation): """ Apply a DC shift to the audio. This can be useful to remove DC offset from audio. Args: shift (float): The amount to shift the audio, the value must be in the range [-2.0, 2.0]. limiter_gain (float, optional): Used only on peaks to prevent clipping, the value should be much less than 1, such as 0.05 or 0.02. Examples: >>> import numpy as np >>> >>> waveform = np.array([0.60, 0.97, -1.04, -1.26, 0.97, 0.91, 0.48, 0.93]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.DCShift(0.5, 0.02)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_dc_shift def __init__(self, shift, limiter_gain=None): super().__init__() self.shift = shift self.limiter_gain = limiter_gain if limiter_gain else shift def parse(self): return cde.DCShiftOperation(self.shift, self.limiter_gain)
[文档]class DeemphBiquad(AudioTensorOperation): """ Apply Compact Disc (IEC 60908) de-emphasis (a treble attenuation shelving filter) to the audio waveform. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Args: sample_rate (int): Sampling rate of the waveform, must be 44100 or 48000 (Hz). Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is not 44100 or 48000. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.DeemphBiquad(44100)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_deemph_biquad def __init__(self, sample_rate): super().__init__() self.sample_rate = sample_rate def parse(self): return cde.DeemphBiquadOperation(self.sample_rate)
[文档]class DetectPitchFrequency(AudioTensorOperation): """ Detect pitch frequency. It is implemented using normalized cross-correlation function and median smoothing. Args: sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz), the value can't be zero. frame_time (float, optional): Duration of a frame, the value must be greater than zero. Default: 0.01. win_length (int, optional): The window length for median smoothing (in number of frames), the value must be greater than zero. Default: 30. freq_low (int, optional): Lowest frequency that can be detected (Hz), the value must be greater than zero. Default: 85. freq_high (int, optional): Highest frequency that can be detected (Hz), the value must be greater than zero. Default: 3400. Examples: >>> import numpy as np >>> >>> waveform = np.array([[0.716064e-03, 5.347656e-03, 6.246826e-03, 2.089477e-02, 7.138305e-02], ... [4.156616e-02, 1.394653e-02, 3.550292e-02, 0.614379e-02, 3.840209e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.DetectPitchFrequency(30, 0.1, 3, 5, 25)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_detect_pitch_frequency def __init__(self, sample_rate, frame_time=0.01, win_length=30, freq_low=85, freq_high=3400): super().__init__() self.sample_rate = sample_rate self.frame_time = frame_time self.win_length = win_length self.freq_low = freq_low self.freq_high = freq_high def parse(self): return cde.DetectPitchFrequencyOperation(self.sample_rate, self.frame_time, self.win_length, self.freq_low, self.freq_high)
DE_C_DENSITY_FUNCTION = {DensityFunction.TPDF: cde.DensityFunction.DE_DENSITY_FUNCTION_TPDF, DensityFunction.RPDF: cde.DensityFunction.DE_DENSITY_FUNCTION_RPDF, DensityFunction.GPDF: cde.DensityFunction.DE_DENSITY_FUNCTION_GPDF}
[文档]class Dither(AudioTensorOperation): """ Dither increases the perceived dynamic range of audio stored at a particular bit-depth by eliminating nonlinear truncation distortion. Args: density_function (DensityFunction, optional): The density function of a continuous random variable, can be DensityFunction.TPDF (Triangular Probability Density Function), DensityFunction.RPDF (Rectangular Probability Density Function) or DensityFunction.GPDF (Gaussian Probability Density Function). Default: DensityFunction.TPDF. noise_shaping (bool, optional): A filtering process that shapes the spectral energy of quantisation error. Default: False. Raises: TypeError: If `density_function` is not of type :class:`mindspore.dataset.audio.DensityFunction` . TypeError: If `noise_shaping` is not of type bool. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.array([[1, 2, 3], [4, 5, 6]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Dither()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_dither def __init__(self, density_function=DensityFunction.TPDF, noise_shaping=False): super().__init__() self.density_function = density_function self.noise_shaping = noise_shaping def parse(self): return cde.DitherOperation(DE_C_DENSITY_FUNCTION.get(self.density_function), self.noise_shaping)
[文档]class EqualizerBiquad(AudioTensorOperation): """ Design biquad equalizer filter and perform filtering. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Args: sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz), the value can't be zero. center_freq (float): Central frequency (in Hz). gain (float): Desired gain at the boost (or attenuation) in dB. Q (float, optional): https://en.wikipedia.org/wiki/Q_factor, range: (0, 1]. Default: 0.707. Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.EqualizerBiquad(44100, 1500, 5.5, 0.7)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_equalizer_biquad def __init__(self, sample_rate, center_freq, gain, Q=0.707): super().__init__() self.sample_rate = sample_rate self.center_freq = center_freq self.gain = gain self.quality_factor = Q def parse(self): return cde.EqualizerBiquadOperation(self.sample_rate, self.center_freq, self.gain, self.quality_factor)
DE_C_FADE_SHAPE = {FadeShape.QUARTER_SINE: cde.FadeShape.DE_FADE_SHAPE_QUARTER_SINE, FadeShape.HALF_SINE: cde.FadeShape.DE_FADE_SHAPE_HALF_SINE, FadeShape.LINEAR: cde.FadeShape.DE_FADE_SHAPE_LINEAR, FadeShape.LOGARITHMIC: cde.FadeShape.DE_FADE_SHAPE_LOGARITHMIC, FadeShape.EXPONENTIAL: cde.FadeShape.DE_FADE_SHAPE_EXPONENTIAL}
[文档]class Fade(AudioTensorOperation): """ Add a fade in and/or fade out to an waveform. Args: fade_in_len (int, optional): Length of fade-in (time frames), which must be non-negative. Default: 0. fade_out_len (int, optional): Length of fade-out (time frames), which must be non-negative. Default: 0. fade_shape (FadeShape, optional): Shape of fade, five different types can be chosen as defined in FadeShape. Default: FadeShape.LINEAR. -FadeShape.QUARTER_SINE, means it tend to 0 in an quarter sin function. -FadeShape.HALF_SINE, means it tend to 0 in an half sin function. -FadeShape.LINEAR, means it linear to 0. -FadeShape.LOGARITHMIC, means it tend to 0 in an logrithmic function. -FadeShape.EXPONENTIAL, means it tend to 0 in an exponential function. Raises: RuntimeError: If fade_in_len exceeds waveform length. RuntimeError: If fade_out_len exceeds waveform length. Examples: >>> import numpy as np >>> from mindspore.dataset.audio import FadeShape >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03, 9.246826171875e-03, 1.0894775390625e-02]]) >>> dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Fade(fade_in_len=3, fade_out_len=2, fade_shape=FadeShape.LINEAR)] >>> dataset = dataset.map(operations=transforms, input_columns=["audio"]) """ @check_fade def __init__(self, fade_in_len=0, fade_out_len=0, fade_shape=FadeShape.LINEAR): super().__init__() self.fade_in_len = fade_in_len self.fade_out_len = fade_out_len self.fade_shape = fade_shape def parse(self): return cde.FadeOperation(self.fade_in_len, self.fade_out_len, DE_C_FADE_SHAPE.get(self.fade_shape))
class Filtfilt(AudioTensorOperation): """ Apply an IIR filter forward and backward to a waveform. Args: a_coeffs (Sequence): denominator coefficients of difference equation of dimension of (n_order + 1). Lower delays coefficients are first, e.g. [a0, a1, a2, ...]. Must be same size as b_coeffs (pad with 0's as necessary). b_coeffs (Sequence): numerator coefficients of difference equation of dimension of (n_order + 1). Lower delays coefficients are first, e.g. [b0, b1, b2, ...]. Must be same size as a_coeffs (pad with 0's as necessary). clamp (bool, optional): If True, clamp the output signal to be in the range [-1, 1]. Default: True. Raises: RuntimeError: If the shape of input audio waveform does not match <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> a_coeffs = [0.1, 0.2, 0.3] >>> b_coeffs = [0.1, 0.2, 0.3] >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Filtfilt(a_coeffs, b_coeffs)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_lfilter def __init__(self, a_coeffs, b_coeffs, clamp=True): super().__init__() self.a_coeffs = a_coeffs self.b_coeffs = b_coeffs self.clamp = clamp def parse(self): return cde.FiltfiltOperation(self.a_coeffs, self.b_coeffs, self.clamp) DE_C_MODULATION = {Modulation.SINUSOIDAL: cde.Modulation.DE_MODULATION_SINUSOIDAL, Modulation.TRIANGULAR: cde.Modulation.DE_MODULATION_TRIANGULAR} DE_C_INTERPOLATION = {Interpolation.LINEAR: cde.Interpolation.DE_INTERPOLATION_LINEAR, Interpolation.QUADRATIC: cde.Interpolation.DE_INTERPOLATION_QUADRATIC}
[文档]class Flanger(AudioTensorOperation): """ Apply a flanger effect to the audio. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Args: sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz). delay (float, optional): Desired delay in milliseconds, in range of [0, 30]. Default: 0.0. depth (float, optional): Desired delay depth in milliseconds, in range of [0, 10]. Default: 2.0. regen (float, optional): Desired regen (feedback gain) in dB, in range of [-95, 95]. Default: 0.0. width (float, optional): Desired width (delay gain) in dB, in range of [0, 100]. Default: 71.0. speed (float, optional): Modulation speed in Hz, in range of [0.1, 10]. Default: 0.5. phase (float, optional): Percentage phase-shift for multi-channel, in range of [0, 100]. Default: 25.0. modulation (Modulation, optional): Modulation method, can be Modulation.SINUSOIDAL or Modulation.TRIANGULAR. Default: Modulation.SINUSOIDAL. interpolation (Interpolation, optional): Interpolation method, can be Interpolation.LINEAR or Interpolation.QUADRATIC. Default: Interpolation.LINEAR. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is zero. TypeError: If `delay` is not of type float. ValueError: If `delay` is not in range of [0, 30]. TypeError: If `depth` is not of type float. ValueError: If `depth` is not in range of [0, 10]. TypeError: If `regen` is not of type float. ValueError: If `regen` is not in range of [-95, 95]. TypeError: If `width` is not of type float. ValueError: If `width` is not in range of [0, 100]. TypeError: If `speed` is not of type float. ValueError: If `speed` is not in range of [0.1, 10]. TypeError: If `phase` is not of type float. ValueError: If `phase` is not in range of [0, 100]. TypeError: If `modulation` is not of type :class:`mindspore.dataset.audio.Modulation` . TypeError: If `interpolation` is not of type :class:`mindspore.dataset.audio.Interpolation` . RuntimeError: If input tensor is not in shape of <..., channel, time>. Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Flanger(44100)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_flanger def __init__(self, sample_rate, delay=0.0, depth=2.0, regen=0.0, width=71.0, speed=0.5, phase=25.0, modulation=Modulation.SINUSOIDAL, interpolation=Interpolation.LINEAR): super().__init__() self.sample_rate = sample_rate self.delay = delay self.depth = depth self.regen = regen self.width = width self.speed = speed self.phase = phase self.modulation = modulation self.interpolation = interpolation def parse(self): return cde.FlangerOperation(self.sample_rate, self.delay, self.depth, self.regen, self.width, self.speed, self.phase, DE_C_MODULATION.get(self.modulation), DE_C_INTERPOLATION.get(self.interpolation))
[文档]class FrequencyMasking(AudioTensorOperation): """ Apply masking to a spectrogram in the frequency domain. Note: The dimension of the audio waveform to be processed needs to be (..., freq, time). Args: iid_masks (bool, optional): Whether to apply different masks to each example/channel. Default: False. freq_mask_param (int, optional): When `iid_masks` is True, length of the mask will be uniformly sampled from [0, freq_mask_param]; When `iid_masks` is False, directly use it as length of the mask. The value should be in range of [0, freq_length], where `freq_length` is the length of audio waveform in frequency domain. Default: 0. mask_start (int, optional): Starting point to apply mask, only works when `iid_masks` is True. The value should be in range of [0, freq_length - freq_mask_param], where `freq_length` is the length of audio waveform in frequency domain. Default: 0. mask_value (float, optional): Value to assign to the masked columns. Default: 0.0. Raises: TypeError: If `iid_masks` is not of type bool. TypeError: If `freq_mask_param` is not of type int. ValueError: If `freq_mask_param` is greater than the length of audio waveform in frequency domain. TypeError: If `mask_start` is not of type int. ValueError: If `mask_start` is a negative number. TypeError: If `mask_value` is not of type float. ValueError: If `mask_value` is a negative number. RuntimeError: If input tensor is not in shape of <..., freq, time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.random.random([1, 3, 2]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.FrequencyMasking(freq_mask_param=1)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) .. image:: frequency_masking_original.png .. image:: frequency_masking.png """ @check_masking def __init__(self, iid_masks=False, freq_mask_param=0, mask_start=0, mask_value=0.0): super().__init__() self.iid_masks = iid_masks self.frequency_mask_param = freq_mask_param self.mask_start = mask_start self.mask_value = mask_value def parse(self): return cde.FrequencyMaskingOperation(self.iid_masks, self.frequency_mask_param, self.mask_start, self.mask_value)
[文档]class Gain(AudioTensorOperation): """ Apply amplification or attenuation to the whole waveform. Args: gain_db (float): Gain adjustment in decibels (dB). Default: 1.0. Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Gain(1.2)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_gain def __init__(self, gain_db=1.0): super().__init__() self.gain_db = gain_db def parse(self): return cde.GainOperation(self.gain_db)
[文档]class GriffinLim(AudioTensorOperation): r""" Compute waveform from a linear scale magnitude spectrogram using the Griffin-Lim transformation. About Griffin-Lim please refer to `A fast Griffin-Lim algorithm <https://doi.org/10.1109/WASPAA.2013.6701851>`_ and `Signal estimation from modified short-time Fourier transform <https://doi.org/10.1109/ICASSP.1983.1172092>`_ . Args: n_fft (int, optional): Size of FFT. Default: 400. n_iter (int, optional): Number of iteration for phase recovery. Default: 32. win_length (int, optional): Window size for GriffinLim. Default: None, will be set to n_fft. hop_length (int, optional): Length of hop between STFT windows. Default: None, will be set to win_length // 2. window_type (WindowType, optional): Window type for GriffinLim, which can be WindowType.BARTLETT, WindowType.BLACKMAN, WindowType.HAMMING, WindowType.HANN or WindowType.KAISER. Default: WindowType.HANN. Currently kaiser window is not supported on macOS. power (float, optional): Exponent for the magnitude spectrogram. Default: 2.0. momentum (float, optional): The momentum for fast Griffin-Lim. Default: 0.99. length (int, optional): Length of the expected output waveform. Default: None, will be set to the value of last dimension of the stft matrix. rand_init (bool, optional): Flag for random phase initialization or all-zero phase initialization. Default: True. Raises: RuntimeError: If `n_fft` is not less than `length` . RuntimeError: If `win_length` is not less than `n_fft` . Examples: >>> import numpy as np >>> >>> waveform = np.random.random([201, 6]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.GriffinLim(n_fft=400)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_griffin_lim def __init__(self, n_fft=400, n_iter=32, win_length=None, hop_length=None, window_type=WindowType.HANN, power=2.0, momentum=0.99, length=None, rand_init=True): super().__init__() self.n_fft = n_fft self.n_iter = n_iter self.win_length = win_length if win_length else self.n_fft self.hop_length = hop_length if hop_length else self.win_length // 2 self.window_type = window_type self.power = power self.momentum = momentum self.length = length if length else 0 self.rand_init = rand_init def parse(self): return cde.GriffinLimOperation(self.n_fft, self.n_iter, self.win_length, self.hop_length, DE_C_WINDOW_TYPE.get(self.window_type), self.power, self.momentum, self.length, self.rand_init)
[文档]class HighpassBiquad(AudioTensorOperation): """ Design biquad highpass filter and perform filtering. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Args: sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz), the value can't be zero. cutoff_freq (float): Filter cutoff frequency (in Hz). Q (float, optional): Quality factor, https://en.wikipedia.org/wiki/Q_factor, range: (0, 1]. Default: 0.707. Raises: RuntimeError: If the shape of input audio waveform does not match (..., time). Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.HighpassBiquad(44100, 1500, 0.7)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_highpass_biquad def __init__(self, sample_rate, cutoff_freq, Q=0.707): super().__init__() self.sample_rate = sample_rate self.cutoff_freq = cutoff_freq self.quality_factor = Q def parse(self): return cde.HighpassBiquadOperation(self.sample_rate, self.cutoff_freq, self.quality_factor)
[文档]class InverseMelScale(AudioTensorOperation): """ Solve for a normal STFT from a mel frequency STFT, using a conversion matrix. Args: n_stft (int): Number of bins in STFT. n_mels (int, optional): Number of mel filterbanks. Default: 128. sample_rate (int, optional): Sample rate of audio signal. Default: 16000. f_min (float, optional): Minimum frequency. Default: 0.0. f_max (float, optional): Maximum frequency. Default: None, will be set to `sample_rate // 2` . max_iter (int, optional): Maximum number of optimization iterations. Default: 100000. tolerance_loss (float, optional): Value of loss to stop optimization at. Default: 1e-5. tolerance_change (float, optional): Difference in losses to stop optimization at. Default: 1e-8. sgdargs (dict, optional): Arguments for the SGD optimizer. Default: None, will be set to {'sgd_lr': 0.1, 'sgd_momentum': 0.9}. norm (NormType, optional): Normalization method, can be NormType.SLANEY or NormType.NONE. Default: NormType.NONE. mel_type (MelType, optional): Mel scale to use, can be MelType.SLANEY or MelType.HTK. Default: MelType.HTK. Examples: >>> import numpy as np >>> >>> waveform = np.random.randn(2, 2, 3, 2) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.InverseMelScale(20, 3, 16000, 0, 8000, 10)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_inverse_mel_scale def __init__(self, n_stft, n_mels=128, sample_rate=16000, f_min=0.0, f_max=None, max_iter=100000, tolerance_loss=1e-5, tolerance_change=1e-8, sgdargs=None, norm=NormType.NONE, mel_type=MelType.HTK): super().__init__() self.n_stft = n_stft self.n_mels = n_mels self.sample_rate = sample_rate self.f_min = f_min self.f_max = f_max if f_max is not None else sample_rate // 2 self.max_iter = max_iter self.tolerance_loss = tolerance_loss self.tolerance_change = tolerance_change if sgdargs is None: self.sgdargs = {'sgd_lr': 0.1, 'sgd_momentum': 0.9} else: self.sgdargs = sgdargs self.norm = norm self.mel_type = mel_type def parse(self): return cde.InverseMelScaleOperation(self.n_stft, self.n_mels, self.sample_rate, self.f_min, self.f_max, self.max_iter, self.tolerance_loss, self.tolerance_change, self.sgdargs, DE_C_NORM_TYPE.get(self.norm), DE_C_MEL_TYPE.get(self.mel_type))
[文档]class LFilter(AudioTensorOperation): """ Perform an IIR filter by evaluating different equation. Args: a_coeffs (Sequence[float]): Denominator coefficients of difference equation of dimension. Lower delays coefficients are first, e.g. [a0, a1, a2, ...]. Must be same size as b_coeffs (pad with 0's as necessary). b_coeffs (Sequence[float]): Numerator coefficients of difference equation of dimension. Lower delays coefficients are first, e.g. [b0, b1, b2, ...]. Must be same size as a_coeffs (pad with 0's as necessary). clamp (bool, optional): If True, clamp the output signal to be in the range [-1, 1]. Default: True. Raises: TypeError: If `a_coeffs` is not of type Sequence[float]. TypeError: If `b_coeffs` is not of type Sequence[float]. ValueError: If `a_coeffs` and `b_coeffs` are of different sizes. TypeError: If `clamp` is not of type bool. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]]) >>> a_coeffs = [0.1, 0.2, 0.3] >>> b_coeffs = [0.1, 0.2, 0.3] >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.LFilter(a_coeffs, b_coeffs)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_lfilter def __init__(self, a_coeffs, b_coeffs, clamp=True): super().__init__() self.a_coeffs = a_coeffs self.b_coeffs = b_coeffs self.clamp = clamp def parse(self): return cde.LFilterOperation(self.a_coeffs, self.b_coeffs, self.clamp)
[文档]class LowpassBiquad(AudioTensorOperation): r""" Design two-pole low-pass filter for audio waveform. A low-pass filter passes frequencies lower than a selected cutoff frequency but attenuates frequencies higher than it. The system function is: .. math:: H(s) = \frac{1}{s^2 + \frac{s}{Q} + 1} Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Note: The dimension of the audio waveform to be processed needs to be (..., time). Args: sample_rate (int): Sampling rate (in Hz), which can't be zero. cutoff_freq (float): Filter cutoff frequency (in Hz). Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ , in range of (0, 1]. Default: 0.707. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is 0. TypeError: If `cutoff_freq` is not of type float. TypeError: If `Q` is not of type float. ValueError: If `Q` is not in range of (0, 1]. RuntimeError: If input tensor is not in shape of <..., time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.array([[0.8236, 0.2049, 0.3335], [0.5933, 0.9911, 0.2482], ... [0.3007, 0.9054, 0.7598], [0.5394, 0.2842, 0.5634], [0.6363, 0.2226, 0.2288]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.LowpassBiquad(4000, 1500, 0.7)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_lowpass_biquad def __init__(self, sample_rate, cutoff_freq, Q=0.707): super().__init__() self.sample_rate = sample_rate self.cutoff_freq = cutoff_freq self.quality_factor = Q def parse(self): return cde.LowpassBiquadOperation(self.sample_rate, self.cutoff_freq, self.quality_factor)
[文档]class Magphase(AudioTensorOperation): """ Separate a complex-valued spectrogram with shape (..., 2) into its magnitude and phase. Args: power (float): Power of the norm, which must be non-negative. Default: 1.0. Raises: RuntimeError: If the shape of input audio waveform does not match (..., 2). Examples: >>> import numpy as np >>> >>> waveform = np.random.random([2, 4, 2]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Magphase()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_magphase def __init__(self, power=1.0): super().__init__() self.power = power def parse(self): return cde.MagphaseOperation(self.power)
[文档]class MaskAlongAxis(AudioTensorOperation): """ Apply a mask along `axis` . Mask will be applied from indices `[mask_start, mask_start + mask_width)` . Args: mask_start (int): Starting position of the mask, which must be non negative. mask_width (int): The width of the mask, which must be larger than 0. mask_value (float): Value to assign to the masked columns. axis (int): Axis to apply mask on (1 for frequency and 2 for time). Raises: ValueError: If `mask_start` is invalid (< 0). ValueError: If `mask_width` is invalid (< 1). ValueError: If `axis` is not type of int or not within [1, 2]. Examples: >>> import numpy as np >>> >>> waveform = np.random.random([1, 20, 20]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.MaskAlongAxis(0, 10, 0.5, 1)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_mask_along_axis def __init__(self, mask_start, mask_width, mask_value, axis): super().__init__() self.mask_start = mask_start self.mask_width = mask_width self.mask_value = mask_value self.axis = axis def parse(self): return cde.MaskAlongAxisOperation(self.mask_start, self.mask_width, self.mask_value, self.axis)
[文档]class MaskAlongAxisIID(AudioTensorOperation): """ Apply a mask along `axis` . Mask will be applied from indices `[mask_start, mask_start + mask_width)` , where `mask_width` is sampled from `uniform[0, mask_param]` , and `mask_start` from `uniform[0, max_length - mask_width]` , `max_length` is the number of columns of the specified axis of the spectrogram. Args: mask_param (int): Number of columns to be masked, will be uniformly sampled from [0, mask_param], must be non negative. mask_value (float): Value to assign to the masked columns. axis (int): Axis to apply mask on (1 for frequency and 2 for time). Raises: TypeError: If `mask_param` is not of type int. ValueError: If `mask_param` is a negative value. TypeError: If `mask_value` is not of type float. TypeError: If `axis` is not of type int. ValueError: If `axis` is not in range of [1, 2]. RuntimeError: If input tensor is not in shape of <..., freq, time>. Examples: >>> import numpy as np >>> >>> waveform= np.random.random([1, 20, 20]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.MaskAlongAxisIID(5, 0.5, 2)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_mask_along_axis_iid def __init__(self, mask_param, mask_value, axis): super().__init__() self.mask_param = mask_param self.mask_value = mask_value self.axis = axis def parse(self): return cde.MaskAlongAxisIIDOperation(self.mask_param, self.mask_value, self.axis)
DE_C_MEL_TYPE = {MelType.SLANEY: cde.MelType.DE_MEL_TYPE_SLANEY, MelType.HTK: cde.MelType.DE_MEL_TYPE_HTK} DE_C_NORM_TYPE = {NormType.NONE: cde.NormType.DE_NORM_TYPE_NONE, NormType.SLANEY: cde.NormType.DE_NORM_TYPE_SLANEY}
[文档]class MelScale(AudioTensorOperation): """ Convert normal STFT to STFT at the Mel scale. Args: n_mels (int, optional): Number of mel filterbanks. Default: 128. sample_rate (int, optional): Sample rate of audio signal. Default: 16000. f_min (float, optional): Minimum frequency. Default: 0.0. f_max (float, optional): Maximum frequency. Default: None, will be set to `sample_rate // 2` . n_stft (int, optional): Number of bins in STFT. Default: 201. norm (NormType, optional): Type of norm, value should be NormType.SLANEY or NormType::NONE. If norm is NormType.SLANEY, divide the triangular mel weight by the width of the mel band. Default: NormType.NONE. mel_type (MelType, optional): Type to use, value should be MelType.SLANEY or MelType.HTK. Default: MelType.HTK. Examples: >>> import numpy as np >>> >>> waveform = np.array([[0.8236, 0.2049, 0.3335], [0.5933, 0.9911, 0.2482], ... [0.3007, 0.9054, 0.7598], [0.5394, 0.2842, 0.5634], [0.6363, 0.2226, 0.2288]]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.MelScale(4000, 1500, 0.7)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_mel_scale def __init__(self, n_mels=128, sample_rate=16000, f_min=0.0, f_max=None, n_stft=201, norm=NormType.NONE, mel_type=MelType.HTK): super().__init__() self.n_mels = n_mels self.sample_rate = sample_rate self.f_min = f_min self.f_max = f_max if f_max is not None else sample_rate // 2 self.n_stft = n_stft self.norm = norm self.mel_type = mel_type def parse(self): return cde.MelScaleOperation(self.n_mels, self.sample_rate, self.f_min, self.f_max, self.n_stft, DE_C_NORM_TYPE.get(self.norm), DE_C_MEL_TYPE.get(self.mel_type))
[文档]class MuLawDecoding(AudioTensorOperation): """ Decode mu-law encoded signal, refer to `mu-law algorithm <https://en.wikipedia.org/wiki/M-law_algorithm>`_ . Args: quantization_channels (int, optional): Number of channels, which must be positive. Default: 256. Raises: TypeError: If `quantization_channels` is not of type int. ValueError: If `quantization_channels` is not a positive number. RuntimeError: If input tensor is not in shape of <..., time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.random.random([1, 3, 4]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.MuLawDecoding()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_mu_law_coding def __init__(self, quantization_channels=256): super().__init__() self.quantization_channels = quantization_channels def parse(self): return cde.MuLawDecodingOperation(self.quantization_channels)
[文档]class MuLawEncoding(AudioTensorOperation): """ Encode signal based on mu-law companding. Args: quantization_channels (int, optional): Number of channels, which must be positive. Default: 256. Examples: >>> import numpy as np >>> >>> waveform = np.random.random([1, 3, 4]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.MuLawEncoding()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_mu_law_coding def __init__(self, quantization_channels=256): super().__init__() self.quantization_channels = quantization_channels def parse(self): return cde.MuLawEncodingOperation(self.quantization_channels)
[文档]class Overdrive(AudioTensorOperation): """ Apply an overdrive effect to the audio waveform. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Args: gain (float, optional): Desired gain at the boost (or attenuation) in dB, in range of [0, 100]. Default: 20.0. color (float, optional): Controls the amount of even harmonic content in the over-driven output, in range of [0, 100]. Default: 20.0. Raises: TypeError: If `gain` is not of type float. ValueError: If `gain` is not in range of [0, 100]. TypeError: If `color` is not of type float. ValueError: If `color` is not in range of [0, 100]. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float32) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Overdrive()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_overdrive def __init__(self, gain=20.0, color=20.0): super().__init__() self.gain = gain self.color = color def parse(self): return cde.OverdriveOperation(self.gain, self.color)
[文档]class Phaser(AudioTensorOperation): """ Apply a phasing effect to the audio. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Args: sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz). gain_in (float, optional): Desired input gain at the boost (or attenuation) in dB, in range of [0.0, 1.0]. Default: 0.4. gain_out (float, optional): Desired output gain at the boost (or attenuation) in dB, in range of [0.0, 1e9]. Default: 0.74. delay_ms (float, optional): Desired delay in milliseconds, in range of [0.0, 5.0]. Default: 3.0. decay (float, optional): Desired decay relative to gain-in, in range of [0.0, 0.99]. Default: 0.4. mod_speed (float, optional): Modulation speed in Hz, in range of [0.1, 2.0]. Default: 0.5. sinusoidal (bool, optional): If True, use sinusoidal modulation (preferable for multiple instruments). If False, use triangular modulation (gives single instruments a sharper phasing effect). Default: True. Raises: TypeError: If `sample_rate` is not of type int. TypeError: If `gain_in` is not of type float. ValueError: If `gain_in` is not in range of [0.0, 1.0]. TypeError: If `gain_out` is not of type float. ValueError: If `gain_out` is not in range of [0.0, 1e9]. TypeError: If `delay_ms` is not of type float. ValueError: If `delay_ms` is not in range of [0.0, 5.0]. TypeError: If `decay` is not of type float. ValueError: If `decay` is not in range of [0.0, 0.99]. TypeError: If `mod_speed` is not of type float. ValueError: If `mod_speed` is not in range of [0.1, 2.0]. TypeError: If `sinusoidal` is not of type bool. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float32) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Phaser(44100)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_phaser def __init__(self, sample_rate, gain_in=0.4, gain_out=0.74, delay_ms=3.0, decay=0.4, mod_speed=0.5, sinusoidal=True): super().__init__() self.decay = decay self.delay_ms = delay_ms self.gain_in = gain_in self.gain_out = gain_out self.mod_speed = mod_speed self.sample_rate = sample_rate self.sinusoidal = sinusoidal def parse(self): return cde.PhaserOperation(self.sample_rate, self.gain_in, self.gain_out, self.delay_ms, self.decay, self.mod_speed, self.sinusoidal)
[文档]class PhaseVocoder(AudioTensorOperation): """ Given a STFT spectrogram, speed up in time without modifying pitch by a factor of rate. Args: rate (float): Speed-up factor. phase_advance (numpy.ndarray): Expected phase advance in each bin, in shape of (freq, 1). Raises: TypeError: If `rate` is not of type float. ValueError: If `rate` is not a positive number. TypeError: If `phase_advance` is not of type :class:`numpy.ndarray` . RuntimeError: If input tensor is not in shape of <..., freq, num_frame, complex=2>. Examples: >>> import numpy as np >>> >>> waveform = np.random.random([2, 44, 10, 2]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> phase_advance = np.random.random([44, 1]) >>> transforms = [audio.PhaseVocoder(rate=2, phase_advance=phase_advance)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_phase_vocoder def __init__(self, rate, phase_advance): super().__init__() self.rate = rate self.phase_advance = cde.Tensor(phase_advance) def parse(self): return cde.PhaseVocoderOperation(self.rate, self.phase_advance)
DE_C_RESAMPLE_METHOD = {ResampleMethod.SINC_INTERPOLATION: cde.ResampleMethod.DE_RESAMPLE_SINC_INTERPOLATION, ResampleMethod.KAISER_WINDOW: cde.ResampleMethod.DE_RESAMPLE_KAISER_WINDOW}
[文档]class Resample(AudioTensorOperation): """ Resample a signal from one frequency to another. A resample method can be given. Args: orig_freq (float, optional): The original frequency of the signal, must be positive. Default: 16000. new_freq (float, optional): The desired frequency, must be positive. Default: 16000. resample_method (ResampleMethod, optional): The resample method to use, can be ResampleMethod.SINC_INTERPOLATION or ResampleMethod.KAISER_WINDOW. Default: ResampleMethod.SINC_INTERPOLATION. lowpass_filter_width (int, optional): Controls the sharpness of the filter, more means sharper but less efficient, must be positive. Default: 6. rolloff (float, optional): The roll-off frequency of the filter, as a fraction of the Nyquist. Lower values reduce anti-aliasing, but also reduce some of the highest frequencies, in range of (0, 1]. Default: 0.99. beta (float, optional): The shape parameter used for kaiser window. Default: None, will use 14.769656459379492. Raises: TypeError: If `orig_freq` is not of type float. ValueError: If `orig_freq` is not a positive number. TypeError: If `new_freq` is not of type float. ValueError: If `new_freq` is not a positive number. TypeError: If `resample_method` is not of type :class:`mindspore.dataset.audio.ResampleMethod` . TypeError: If `lowpass_filter_width` is not of type int. ValueError: If `lowpass_filter_width` is not a positive number. TypeError: If `rolloff` is not of type float. ValueError: If `rolloff` is not in range of (0, 1]. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> from mindspore.dataset.audio import ResampleMethod >>> >>> waveform = np.random.random([1, 30]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Resample(orig_freq=48000, new_freq=16000, ... resample_method=ResampleMethod.SINC_INTERPOLATION, ... lowpass_filter_width=6, rolloff=0.99, beta=None)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_resample def __init__(self, orig_freq=16000, new_freq=16000, resample_method=ResampleMethod.SINC_INTERPOLATION, lowpass_filter_width=6, rolloff=0.99, beta=None): super().__init__() self.orig_freq = orig_freq self.new_freq = new_freq self.resample_method = resample_method self.lowpass_filter_width = lowpass_filter_width self.rolloff = rolloff kaiser_beta = 14.769656459379492 self.beta = beta if beta is not None else kaiser_beta def parse(self): return cde.ResampleOperation(self.orig_freq, self.new_freq, DE_C_RESAMPLE_METHOD.get(self.resample_method), self.lowpass_filter_width, self.rolloff, self.beta)
[文档]class RiaaBiquad(AudioTensorOperation): """ Apply RIAA vinyl playback equalization. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Args: sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz), can only be one of 44100, 48000, 88200, 96000. Examples: >>> import numpy as np >>> >>> waveform = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float64) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.RiaaBiquad(44100)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_riaa_biquad def __init__(self, sample_rate): super().__init__() self.sample_rate = sample_rate def parse(self): return cde.RiaaBiquadOperation(self.sample_rate)
[文档]class SlidingWindowCmn(AudioTensorOperation): """ Apply sliding-window cepstral mean (and optionally variance) normalization per utterance. Args: cmn_window (int, optional): Window in frames for running average CMN computation. Default: 600. min_cmn_window (int, optional): Minimum CMN window used at start of decoding (adds latency only at start). Only applicable if center is False, ignored if center is True. Default: 100. center (bool, optional): If True, use a window centered on the current frame. If False, window is to the left. Default: False. norm_vars (bool, optional): If True, normalize variance to one. Default: False. Examples: >>> import numpy as np >>> >>> waveform = np.array([[[1, 2, 3], [4, 5, 6]]], dtype=np.float64) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.SlidingWindowCmn()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_sliding_window_cmn def __init__(self, cmn_window=600, min_cmn_window=100, center=False, norm_vars=False): super().__init__() self.cmn_window = cmn_window self.min_cmn_window = min_cmn_window self.center = center self.norm_vars = norm_vars def parse(self): return cde.SlidingWindowCmnOperation(self.cmn_window, self.min_cmn_window, self.center, self.norm_vars)
DE_C_WINDOW_TYPE = {WindowType.BARTLETT: cde.WindowType.DE_WINDOW_TYPE_BARTLETT, WindowType.BLACKMAN: cde.WindowType.DE_WINDOW_TYPE_BLACKMAN, WindowType.HAMMING: cde.WindowType.DE_WINDOW_TYPE_HAMMING, WindowType.HANN: cde.WindowType.DE_WINDOW_TYPE_HANN, WindowType.KAISER: cde.WindowType.DE_WINDOW_TYPE_KAISER}
[文档]class SpectralCentroid(TensorOperation): """ Compute the spectral centroid for each channel along the time axis. Args: sample_rate (int): Sampling rate of audio signal, e.g. 44100 (Hz). n_fft (int, optional): Size of FFT, creates `n_fft // 2 + 1` bins. Default: 400. win_length (int, optional): Window size. Default: None, will use `n_fft` . hop_length (int, optional): Length of hop between STFT windows. Default: None, will use `win_length // 2` . pad (int, optional): Two sided padding of signal. Default: 0. window (WindowType, optional): Window function that is applied/multiplied to each frame/window, can be WindowType.BARTLETT, WindowType.BLACKMAN, WindowType.HAMMING, WindowType.HANN or WindowType.KAISER. Default: WindowType.HANN. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is a negative number. TypeError: If `n_fft` is not of type int. ValueError: If `n_fft` is not a positive number. TypeError: If `win_length` is not of type int. ValueError: If `win_length` is not a positive number. ValueError: If `win_length` is greater than `n_fft` . TypeError: If `hop_length` is not of type int. ValueError: If `hop_length` is not a positive number. TypeError: If `pad` is not of type int. ValueError: If `pad` is a negative number. TypeError: If `window` is not of type :class:`mindspore.dataset.audio.WindowType` . RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.random.random([5, 10, 20]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.SpectralCentroid(44100)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_spectral_centroid def __init__(self, sample_rate, n_fft=400, win_length=None, hop_length=None, pad=0, window=WindowType.HANN): super().__init__() self.sample_rate = sample_rate self.pad = pad self.window = window self.n_fft = n_fft self.win_length = win_length if win_length else n_fft self.hop_length = hop_length if hop_length else self.win_length // 2 def parse(self): return cde.SpectralCentroidOperation(self.sample_rate, self.n_fft, self.win_length, self.hop_length, self.pad, DE_C_WINDOW_TYPE.get(self.window))
[文档]class Spectrogram(TensorOperation): """ Create a spectrogram from an audio signal. Args: n_fft (int, optional): Size of FFT, creates `n_fft // 2 + 1` bins. Default: 400. win_length (int, optional): Window size. Default: None, will use `n_fft` . hop_length (int, optional): Length of hop between STFT windows. Default: None, will use `win_length // 2` . pad (int, optional): Two sided padding of signal. Default: 0. window (WindowType, optional): Window function that is applied/multiplied to each frame/window, can be WindowType.BARTLETT, WindowType.BLACKMAN, WindowType.HAMMING, WindowType.HANN or WindowType.KAISER. Currently, Kaiser window is not supported on macOS. Default: WindowType.HANN. power (float, optional): Exponent for the magnitude spectrogram, must be non negative, e.g., 1 for energy, 2 for power, etc. Default: 2.0. normalized (bool, optional): Whether to normalize by magnitude after stft. Default: False. center (bool, optional): Whether to pad waveform on both sides. Default: True. pad_mode (BorderType, optional): Controls the padding method used when `center` is True, can be BorderType.REFLECT, BorderType.CONSTANT, BorderType.EDGE or BorderType.SYMMETRIC. Default: BorderType.REFLECT. onesided (bool, optional): Controls whether to return half of results to avoid redundancy. Default: True. Raises: TypeError: If `n_fft` is not of type int. ValueError: If `n_fft` is not a positive number. TypeError: If `win_length` is not of type int. ValueError: If `win_length` is not a positive number. ValueError: If `win_length` is greater than `n_fft` . TypeError: If `hop_length` is not of type int. ValueError: If `hop_length` is not a positive number. TypeError: If `pad` is not of type int. ValueError: If `pad` is a negative number. TypeError: If `window` is not of type :class:`mindspore.dataset.audio.WindowType` . TypeError: If `power` is not of type float. ValueError: If `power` is a negative number. TypeError: If `normalized` is not of type bool. TypeError: If `center` is not of type bool. TypeError: If `pad_mode` is not of type :class:`mindspore.dataset.audio.BorderType` . TypeError: If `onesided` is not of type bool. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.random.random([5, 10, 20]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Spectrogram()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_spectrogram def __init__(self, n_fft=400, win_length=None, hop_length=None, pad=0, window=WindowType.HANN, power=2.0, normalized=False, center=True, pad_mode=BorderType.REFLECT, onesided=True): super().__init__() self.n_fft = n_fft self.win_length = win_length if win_length else n_fft self.hop_length = hop_length if hop_length else self.win_length // 2 self.pad = pad self.window = window self.power = power self.normalized = normalized self.center = center self.pad_mode = pad_mode self.onesided = onesided def parse(self): return cde.SpectrogramOperation(self.n_fft, self.win_length, self.hop_length, self.pad, DE_C_WINDOW_TYPE.get(self.window), self.power, self.normalized, self.center, DE_C_BORDER_TYPE.get(self.pad_mode), self.onesided)
[文档]class TimeMasking(AudioTensorOperation): """ Apply masking to a spectrogram in the time domain. Note: The dimension of the audio waveform to be processed needs to be (..., freq, time). Args: iid_masks (bool, optional): Whether to apply different masks to each example/channel. Default: False. time_mask_param (int, optional): When `iid_masks` is True, length of the mask will be uniformly sampled from [0, time_mask_param]; When `iid_masks` is False, directly use it as length of the mask. The value should be in range of [0, time_length], where `time_length` is the length of audio waveform in time domain. Default: 0. mask_start (int, optional): Starting point to apply mask, only works when `iid_masks` is True. The value should be in range of [0, time_length - time_mask_param], where `time_length` is the length of audio waveform in time domain. Default: 0. mask_value (float, optional): Value to assign to the masked columns. Default: 0.0. Raises: TypeError: If `iid_masks` is not of type bool. TypeError: If `time_mask_param` is not of type int. ValueError: If `time_mask_param` is greater than the length of audio waveform in time domain. TypeError: If `mask_start` is not of type int. ValueError: If `mask_start` a negative number. TypeError: If `mask_value` is not of type float. ValueError: If `mask_value` is a negative number. RuntimeError: If input tensor is not in shape of <..., freq, time>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.random.random([4, 3, 2]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.TimeMasking(time_mask_param=1)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) .. image:: time_masking_original.png .. image:: time_masking.png """ @check_masking def __init__(self, iid_masks=False, time_mask_param=0, mask_start=0, mask_value=0.0): super().__init__() self.iid_masks = iid_masks self.time_mask_param = time_mask_param self.mask_start = mask_start self.mask_value = mask_value def parse(self): return cde.TimeMaskingOperation(self.iid_masks, self.time_mask_param, self.mask_start, self.mask_value)
[文档]class TimeStretch(AudioTensorOperation): """ Stretch Short Time Fourier Transform (STFT) in time without modifying pitch for a given rate. Note: The dimension of the audio waveform to be processed needs to be (..., freq, time, complex=2). The first dimension represents the real part while the second represents the imaginary. Args: hop_length (int, optional): Length of hop between STFT windows, i.e. the number of samples between consecutive frames. Default: None, will use `n_freq - 1` . n_freq (int, optional): Number of filter banks from STFT. Default: 201. fixed_rate (float, optional): Rate to speed up or slow down by. Default: None, will keep the original rate. Raises: TypeError: If `hop_length` is not of type int. ValueError: If `hop_length` is not a positive number. TypeError: If `n_freq` is not of type int. ValueError: If `n_freq` is not a positive number. TypeError: If `fixed_rate` is not of type float. ValueError: If `fixed_rate` is not a positive number. RuntimeError: If input tensor is not in shape of <..., freq, num_frame, complex=2>. Supported Platforms: ``CPU`` Examples: >>> import numpy as np >>> >>> waveform = np.random.random([44, 10, 2]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.TimeStretch()] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) .. image:: time_stretch_rate1.5.png .. image:: time_stretch_original.png .. image:: time_stretch_rate0.8.png """ @check_time_stretch def __init__(self, hop_length=None, n_freq=201, fixed_rate=None): super().__init__() self.n_freq = n_freq self.fixed_rate = fixed_rate n_fft = (n_freq - 1) * 2 self.hop_length = hop_length if hop_length is not None else n_fft // 2 self.fixed_rate = fixed_rate if fixed_rate is not None else 1 def parse(self): return cde.TimeStretchOperation(self.hop_length, self.n_freq, self.fixed_rate)
[文档]class TrebleBiquad(AudioTensorOperation): """ Design a treble tone-control effect. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Args: sample_rate (int): Sampling rate (in Hz), which can't be zero. gain (float): Desired gain at the boost (or attenuation) in dB. central_freq (float, optional): Central frequency (in Hz). Default: 3000. Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ , in range of (0, 1]. Default: 0.707. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is 0. TypeError: If `gain` is not of type float. TypeError: If `central_freq` is not of type float. TypeError: If `Q` is not of type float. ValueError: If `Q` is not in range of (0, 1]. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float64) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.TrebleBiquad(44100, 200.0)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_treble_biquad def __init__(self, sample_rate, gain, central_freq=3000, Q=0.707): super().__init__() self.sample_rate = sample_rate self.gain = gain self.central_freq = central_freq self.quality_factor = Q def parse(self): return cde.TrebleBiquadOperation(self.sample_rate, self.gain, self.central_freq, self.quality_factor)
[文档]class Vad(AudioTensorOperation): """ Voice activity detector. Attempt to trim silence and quiet background sounds from the ends of recordings of speech. Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation. Args: sample_rate (int): Sampling rate of audio signal. trigger_level (float, optional): The measurement level used to trigger activity detection. Default: 7.0. trigger_time (float, optional): The time constant (in seconds) used to help ignore short bursts of sounds. Default: 0.25. search_time (float, optional): The amount of audio (in seconds) to search for quieter/shorter bursts of audio to include prior to the detected trigger point. Default: 1.0. allowed_gap (float, optional): The allowed gap (in seconds) between quieter/shorter bursts of audio to include prior to the detected trigger point. Default: 0.25. pre_trigger_time (float, optional): The amount of audio (in seconds) to preserve before the trigger point and any found quieter/shorter bursts. Default: 0.0. boot_time (float, optional): The time for the initial noise estimate. Default: 0.35. noise_up_time (float, optional): Time constant used by the adaptive noise estimator for when the noise level is increasing. Default: 0.1. noise_down_time (float, optional): Time constant used by the adaptive noise estimator for when the noise level is decreasing. Default: 0.01. noise_reduction_amount (float, optional): Amount of noise reduction to use in the detection algorithm. Default: 1.35. measure_freq (float, optional): Frequency of the algorithm's processing/measurements. Default: 20.0. measure_duration (float, optional): The duration of measurement. Default: None, will use twice the measurement period. measure_smooth_time (float, optional): Time constant used to smooth spectral measurements. Default: 0.4. hp_filter_freq (float, optional): The 'Brick-wall' frequency of high-pass filter applied at the input to the detector algorithm. Default: 50.0. lp_filter_freq (float, optional): The 'Brick-wall' frequency of low-pass filter applied at the input to the detector algorithm. Default: 6000.0. hp_lifter_freq (float, optional): The 'Brick-wall' frequency of high-pass lifter used in the detector algorithm. Default: 150.0. lp_lifter_freq (float, optional): The 'Brick-wall' frequency of low-pass lifter used in the detector algorithm. Default: 2000.0. Raises: TypeError: If `sample_rate` is not of type int. ValueError: If `sample_rate` is not a positive number. TypeError: If `trigger_level` is not of type float. TypeError: If `trigger_time` is not of type float. ValueError: If `trigger_time` is a negative number. TypeError: If `search_time` is not of type float. ValueError: If `search_time` is a negative number. TypeError: If `allowed_gap` is not of type float. ValueError: If `allowed_gap` is a negative number. TypeError: If `pre_trigger_time` is not of type float. ValueError: If `pre_trigger_time` is a negative number. TypeError: If `boot_time` is not of type float. ValueError: If `boot_time` is a negative number. TypeError: If `noise_up_time` is not of type float. ValueError: If `noise_up_time` is a negative number. TypeError: If `noise_down_time` is not of type float. ValueError: If `noise_down_time` is a negative number. ValueError: If `noise_up_time` is less than `noise_down_time` . TypeError: If `noise_reduction_amount` is not of type float. ValueError: If `noise_reduction_amount` is a negative number. TypeError: If `measure_freq` is not of type float. ValueError: If `measure_freq` is not a positive number. TypeError: If `measure_duration` is not of type float. ValueError: If `measure_duration` is a negative number. TypeError: If `measure_smooth_time` is not of type float. ValueError: If `measure_smooth_time` is a negative number. TypeError: If `hp_filter_freq` is not of type float. ValueError: If `hp_filter_freq` is not a positive number. TypeError: If `lp_filter_freq` is not of type float. ValueError: If `lp_filter_freq` is not a positive number. TypeError: If `hp_lifter_freq` is not of type float. ValueError: If `hp_lifter_freq` is not a positive number. TypeError: If `lp_lifter_freq` is not of type float. ValueError: If `lp_lifter_freq` is not a positive number. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> >>> waveform = np.random.random([2, 1000]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Vad(sample_rate=600)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_vad def __init__(self, sample_rate, trigger_level=7.0, trigger_time=0.25, search_time=1.0, allowed_gap=0.25, pre_trigger_time=0.0, boot_time=0.35, noise_up_time=0.1, noise_down_time=0.01, noise_reduction_amount=1.35, measure_freq=20.0, measure_duration=None, measure_smooth_time=0.4, hp_filter_freq=50.0, lp_filter_freq=6000.0, hp_lifter_freq=150.0, lp_lifter_freq=2000.0): super().__init__() self.sample_rate = sample_rate self.trigger_level = trigger_level self.trigger_time = trigger_time self.search_time = search_time self.allowed_gap = allowed_gap self.pre_trigger_time = pre_trigger_time self.boot_time = boot_time self.noise_up_time = noise_up_time self.noise_down_time = noise_down_time self.noise_reduction_amount = noise_reduction_amount self.measure_freq = measure_freq self.measure_duration = measure_duration if measure_duration else 2.0 / measure_freq self.measure_smooth_time = measure_smooth_time self.hp_filter_freq = hp_filter_freq self.lp_filter_freq = lp_filter_freq self.hp_lifter_freq = hp_lifter_freq self.lp_lifter_freq = lp_lifter_freq def parse(self): return cde.VadOperation(self.sample_rate, self.trigger_level, self.trigger_time, self.search_time, self.allowed_gap, self.pre_trigger_time, self.boot_time, self.noise_up_time, self.noise_down_time, self.noise_reduction_amount, self.measure_freq, self.measure_duration, self.measure_smooth_time, self.hp_filter_freq, self.lp_filter_freq, self.hp_lifter_freq, self.lp_lifter_freq)
DE_C_GAIN_TYPE = {GainType.AMPLITUDE: cde.GainType.DE_GAIN_TYPE_AMPLITUDE, GainType.POWER: cde.GainType.DE_GAIN_TYPE_POWER, GainType.DB: cde.GainType.DE_GAIN_TYPE_DB}
[文档]class Vol(AudioTensorOperation): """ Adjust volume of waveform. Args: gain (float): Gain at the boost (or attenuation). If `gain_type` is GainType.AMPLITUDE, it is a non negative amplitude ratio. If `gain_type` is GainType.POWER, it is a power (voltage squared). If `gain_type` is GainType.DB, it is in decibels. gain_type (GainType, optional): Type of gain, can be GainType.AMPLITUDE, GainType.POWER or GainType.DB. Default: GainType.AMPLITUDE. Raises: TypeError: If `gain` is not of type float. TypeError: If `gain_type` is not of type :class:`mindspore.dataset.audio.GainType` . ValueError: If `gain` is a negative number when `gain_type` is GainType.AMPLITUDE. ValueError: If `gain` is not a positive number when `gain_type` is GainType.POWER. RuntimeError: If input tensor is not in shape of <..., time>. Examples: >>> import numpy as np >>> from mindspore.dataset.audio import GainType >>> >>> waveform = np.random.random([20, 30]) >>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"]) >>> transforms = [audio.Vol(gain=10, gain_type=GainType.DB)] >>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"]) """ @check_vol def __init__(self, gain, gain_type=GainType.AMPLITUDE): super().__init__() self.gain = gain self.gain_type = gain_type def parse(self): return cde.VolOperation(self.gain, DE_C_GAIN_TYPE.get(self.gain_type))