# Copyright 2021-2022 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
The module audio.transforms is inherited from _c_dataengine and is
implemented based on C++. It's a high performance module to process
audio. Users can apply suitable augmentations on audio data to improve
their training models.
"""
import numpy as np
import mindspore._c_dataengine as cde
from ..transforms.c_transforms import TensorOperation
from .utils import BorderType, DensityFunction, FadeShape, GainType, Interpolation, MelType, Modulation, NormType, \
ScaleType, WindowType
from .validators import check_allpass_biquad, check_amplitude_to_db, check_band_biquad, check_bandpass_biquad, \
check_bandreject_biquad, check_bass_biquad, check_biquad, check_complex_norm, check_compute_deltas, \
check_contrast, check_db_to_amplitude, check_dc_shift, check_deemph_biquad, check_detect_pitch_frequency, \
check_dither, check_equalizer_biquad, check_fade, check_flanger, check_gain, check_highpass_biquad, \
check_lfilter, check_lowpass_biquad, check_magphase, check_mask_along_axis, check_mask_along_axis_iid, \
check_masking, check_mel_scale, check_mu_law_coding, check_overdrive, check_phase_vocoder, check_phaser, \
check_riaa_biquad, check_sliding_window_cmn, check_spectral_centroid, check_spectrogram, check_time_stretch, \
check_treble_biquad, check_vol
class AudioTensorOperation(TensorOperation):
"""
Base class of Audio Tensor Ops.
"""
def __call__(self, *input_tensor_list):
for tensor in input_tensor_list:
if not isinstance(tensor, (np.ndarray,)):
raise TypeError("Input should be NumPy audio, got {}.".format(type(tensor)))
return super().__call__(*input_tensor_list)
def parse(self):
raise NotImplementedError("AudioTensorOperation has to implement parse() method.")
[docs]class AllpassBiquad(AudioTensorOperation):
r"""
Design two-pole all-pass filter with central frequency and bandwidth for audio waveform.
An all-pass filter changes the audio's frequency to phase relationship without changing
its frequency to amplitude relationship. The system function is:
.. math::
H(s) = \frac{s^2 - \frac{s}{Q} + 1}{s^2 + \frac{s}{Q} + 1}
Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation.
Note:
The dimension of the audio waveform to be processed needs to be (..., time).
Args:
sample_rate (int): Sampling rate (in Hz), which can't be zero.
central_freq (float): Central frequency (in Hz).
Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ ,
in range of (0, 1]. Default: 0.707.
Raises:
TypeError: If `sample_rate` is not of type integer.
ValueError: If `sample_rate` is 0.
TypeError: If `central_freq` is not of type float.
TypeError: If `Q` is not of type float.
ValueError: If `Q` is not in range of (0, 1].
RuntimeError: If input tensor is not in shape of <..., time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.AllpassBiquad(44100, 200.0)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_allpass_biquad
def __init__(self, sample_rate, central_freq, Q=0.707):
self.sample_rate = sample_rate
self.central_freq = central_freq
self.Q = Q
def parse(self):
return cde.AllpassBiquadOperation(self.sample_rate, self.central_freq, self.Q)
DE_C_SCALE_TYPE = {ScaleType.POWER: cde.ScaleType.DE_SCALE_TYPE_POWER,
ScaleType.MAGNITUDE: cde.ScaleType.DE_SCALE_TYPE_MAGNITUDE}
[docs]class AmplitudeToDB(AudioTensorOperation):
r"""
Turn the input audio waveform from the amplitude/power scale to decibel scale.
Note:
The dimension of the audio waveform to be processed needs to be (..., freq, time).
Args:
stype (ScaleType, optional): Scale of the input waveform, which can be
ScaleType.POWER or ScaleType.MAGNITUDE. Default: ScaleType.POWER.
ref_value (float, optional): Multiplier reference value for generating
`db_multiplier`. Default: 1.0. The formula is
:math:`\text{db_multiplier} = Log10(max(\text{ref_value}, amin))`.
amin (float, optional): Lower bound to clamp the input waveform, which must
be greater than zero. Default: 1e-10.
top_db (float, optional): Minimum cut-off decibels, which must be non-negative. Default: 80.0.
Raises:
TypeError: If `stype` is not of type :class:`mindspore.dataset.audio.utils.ScaleType`.
TypeError: If `ref_value` is not of type float.
ValueError: If `ref_value` is not a positive number.
TypeError: If `amin` is not of type float.
ValueError: If `amin` is not a positive number.
TypeError: If `top_db` is not of type float.
ValueError: If `top_db` is not a positive number.
RuntimeError: If input tensor is not in shape of <..., freq, time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>> from mindspore.dataset.audio import ScaleType
>>>
>>> waveform = np.random.random([1, 400 // 2 + 1, 30])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.AmplitudeToDB(stype=ScaleType.POWER)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_amplitude_to_db
def __init__(self, stype=ScaleType.POWER, ref_value=1.0, amin=1e-10, top_db=80.0):
self.stype = stype
self.ref_value = ref_value
self.amin = amin
self.top_db = top_db
def parse(self):
return cde.AmplitudeToDBOperation(DE_C_SCALE_TYPE[self.stype], self.ref_value, self.amin, self.top_db)
[docs]class Angle(AudioTensorOperation):
"""
Calculate the angle of complex number sequence.
Note:
The dimension of the audio waveform to be processed needs to be (..., complex=2).
The first dimension represents the real part while the second represents the imaginary.
Raises:
RuntimeError: If input tensor is not in shape of <..., complex=2>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[1.43, 5.434], [23.54, 89.38]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Angle()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
def parse(self):
return cde.AngleOperation()
[docs]class BandBiquad(AudioTensorOperation):
"""
Design two-pole band-pass filter for audio waveform.
The frequency response drops logarithmically around the center frequency. The
bandwidth gives the slope of the drop. The frequencies at band edge will be
half of their original amplitudes.
Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation.
Note:
The dimension of the audio waveform to be processed needs to be (..., time).
Args:
sample_rate (int): Sampling rate (in Hz), which can't be zero.
central_freq (float): Central frequency (in Hz).
Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ ,
in range of (0, 1]. Default: 0.707.
noise (bool, optional) : If True, uses the alternate mode for un-pitched audio (e.g. percussion).
If False, uses mode oriented to pitched audio, i.e. voice, singing, or instrumental music. Default: False.
Raises:
TypeError: If `sample_rate` is not of type integer.
ValueError: If `sample_rate` is 0.
TypeError: If `central_freq` is not of type float.
TypeError: If `Q` is not of type float.
ValueError: If `Q` is not in range of (0, 1].
TypeError: If `noise` is not of type bool.
RuntimeError: If input tensor is not in shape of <..., time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.BandBiquad(44100, 200.0)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_band_biquad
def __init__(self, sample_rate, central_freq, Q=0.707, noise=False):
self.sample_rate = sample_rate
self.central_freq = central_freq
self.Q = Q
self.noise = noise
def parse(self):
return cde.BandBiquadOperation(self.sample_rate, self.central_freq, self.Q, self.noise)
[docs]class BandpassBiquad(AudioTensorOperation):
r"""
Design two-pole Butterworth band-pass filter for audio waveform.
The frequency response of the Butterworth filter is maximally flat (i.e. has no ripples)
in the passband and rolls off towards zero in the stopband.
The system function of Butterworth band-pass filter is:
.. math::
H(s) = \begin{cases}
\frac{s}{s^2 + \frac{s}{Q} + 1}, &\text{if const_skirt_gain=True}; \cr
\frac{\frac{s}{Q}}{s^2 + \frac{s}{Q} + 1}, &\text{if const_skirt_gain=False}.
\end{cases}
Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation.
Note:
The dimension of the audio waveform to be processed needs to be (..., time).
Args:
sample_rate (int): Sampling rate (in Hz), which can't be zero.
central_freq (float): Central frequency (in Hz).
Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ ,
in range of (0, 1]. Default: 0.707.
const_skirt_gain (bool, optional) : If True, uses a constant skirt gain (peak gain = Q);
If False, uses a constant 0dB peak gain. Default: False.
Raises:
TypeError: If `sample_rate` is not of type integer.
ValueError: If `sample_rate` is 0.
TypeError: If `central_freq` is not of type float.
TypeError: If `Q` is not of type float.
ValueError: If `Q` is not in range of (0, 1].
TypeError: If `const_skirt_gain` is not of type bool.
RuntimeError: If input tensor is not in shape of <..., time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.BandpassBiquad(44100, 200.0)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_bandpass_biquad
def __init__(self, sample_rate, central_freq, Q=0.707, const_skirt_gain=False):
self.sample_rate = sample_rate
self.central_freq = central_freq
self.Q = Q
self.const_skirt_gain = const_skirt_gain
def parse(self):
return cde.BandpassBiquadOperation(self.sample_rate, self.central_freq, self.Q, self.const_skirt_gain)
[docs]class BandrejectBiquad(AudioTensorOperation):
r"""
Design two-pole Butterworth band-reject filter for audio waveform.
The frequency response of the Butterworth filter is maximally flat (i.e. has no ripples)
in the passband and rolls off towards zero in the stopband.
The system function of Butterworth band-reject filter is:
.. math::
H(s) = \frac{s^2 + 1}{s^2 + \frac{s}{Q} + 1}
Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation.
Note:
The dimension of the audio waveform to be processed needs to be (..., time).
Args:
sample_rate (int): Sampling rate (in Hz), which can't be zero.
central_freq (float): Central frequency (in Hz).
Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ ,
in range of (0, 1]. Default: 0.707.
Raises:
TypeError: If `sample_rate` is not of type integer.
ValueError: If `sample_rate` is 0.
TypeError: If `central_freq` is not of type float.
TypeError: If `Q` is not of type float.
ValueError: If `Q` is not in range of (0, 1].
RuntimeError: If input tensor is not in shape of <..., time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03],[9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.BandrejectBiquad(44100, 200.0)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_bandreject_biquad
def __init__(self, sample_rate, central_freq, Q=0.707):
self.sample_rate = sample_rate
self.central_freq = central_freq
self.Q = Q
def parse(self):
return cde.BandrejectBiquadOperation(self.sample_rate, self.central_freq, self.Q)
[docs]class BassBiquad(AudioTensorOperation):
r"""
Design a bass tone-control effect, also known as two-pole low-shelf filter for audio waveform.
A low-shelf filter passes all frequencies, but increase or reduces frequencies below the shelf
frequency by specified amount. The system function is:
.. math::
H(s) = A\frac{s^2 + \frac{\sqrt{A}}{Q}s + A}{As^2 + \frac{\sqrt{A}}{Q}s + 1}
Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation.
Note:
The dimension of the audio waveform to be processed needs to be (..., time).
Args:
sample_rate (int): Sampling rate (in Hz), which can't be zero.
gain (float): Desired gain at the boost (or attenuation) in dB.
central_freq (float, optional): Central frequency (in Hz). Default: 100.0.
Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ ,
in range of (0, 1]. Default: 0.707.
Raises:
TypeError: If `sample_rate` is not of type integer.
ValueError: If `sample_rate` is 0.
TypeError: If `gain` is not of type float.
TypeError: If `central_freq` is not of type float.
TypeError: If `Q` is not of type float.
ValueError: If `Q` is not in range of (0, 1].
RuntimeError: If input tensor is not in shape of <..., time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.BassBiquad(44100, 100.0)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_bass_biquad
def __init__(self, sample_rate, gain, central_freq=100.0, Q=0.707):
self.sample_rate = sample_rate
self.gain = gain
self.central_freq = central_freq
self.Q = Q
def parse(self):
return cde.BassBiquadOperation(self.sample_rate, self.gain, self.central_freq, self.Q)
class Biquad(TensorOperation):
"""
Perform a biquad filter of input audio.
Args:
b0 (float): Numerator coefficient of current input, x[n].
b1 (float): Numerator coefficient of input one time step ago x[n-1].
b2 (float): Numerator coefficient of input two time steps ago x[n-2].
a0 (float): Denominator coefficient of current output y[n], the value can't be zero, typically 1.
a1 (float): Denominator coefficient of current output y[n-1].
a2 (float): Denominator coefficient of current output y[n-2].
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> biquad_op = audio.Biquad(0.01, 0.02, 0.13, 1, 0.12, 0.3)
>>> waveform_filtered = biquad_op(waveform)
"""
@check_biquad
def __init__(self, b0, b1, b2, a0, a1, a2):
self.b0 = b0
self.b1 = b1
self.b2 = b2
self.a0 = a0
self.a1 = a1
self.a2 = a2
def parse(self):
return cde.BiquadOperation(self.b0, self.b1, self.b2, self.a0, self.a1, self.a2)
[docs]class ComplexNorm(AudioTensorOperation):
"""
Compute the norm of complex number sequence.
Note:
The dimension of the audio waveform to be processed needs to be (..., complex=2).
The first dimension represents the real part while the second represents the imaginary.
Args:
power (float, optional): Power of the norm, which must be non-negative. Default: 1.0.
Raises:
TypeError: If `power` is not of type float.
ValueError: If `power` is a negative number.
RuntimeError: If input tensor is not in shape of <..., complex=2>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([2, 4, 2])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.ComplexNorm()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_complex_norm
def __init__(self, power=1.0):
self.power = power
def parse(self):
return cde.ComplexNormOperation(self.power)
DE_C_BORDER_TYPE = {
BorderType.CONSTANT: cde.BorderType.DE_BORDER_CONSTANT,
BorderType.EDGE: cde.BorderType.DE_BORDER_EDGE,
BorderType.REFLECT: cde.BorderType.DE_BORDER_REFLECT,
BorderType.SYMMETRIC: cde.BorderType.DE_BORDER_SYMMETRIC,
}
class ComputeDeltas(AudioTensorOperation):
"""
Compute delta coefficients of a spectrogram.
Args:
win_length (int): The window length used for computing delta, must be no less than 3 (default=5).
pad_mode (BorderType): Mode parameter passed to padding (default=BorderType.EDGE).It can be any of
[BorderType.CONSTANT, BorderType.EDGE, BorderType.REFLECT, BordBorderTypeer.SYMMETRIC].
- BorderType.CONSTANT, means it fills the border with constant values.
- BorderType.EDGE, means it pads with the last value on the edge.
- BorderType.REFLECT, means it reflects the values on the edge omitting the last
value of edge.
- BorderType.SYMMETRIC, means it reflects the values on the edge repeating the last
value of edge.
Examples:
>>> import numpy as np
>>> from mindspore.dataset.audio import BorderType
>>>
>>> waveform = np.random.random([1, 400//2+1, 30])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.ComputeDeltas(win_length=7, pad_mode = BorderType.EDGE)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_compute_deltas
def __init__(self, win_length=5, pad_mode=BorderType.EDGE):
self.win_len = win_length
self.pad_mode = pad_mode
def parse(self):
return cde.ComputeDeltasOperation(self.win_len, DE_C_BORDER_TYPE[self.pad_mode])
[docs]class Contrast(AudioTensorOperation):
"""
Apply contrast effect for audio waveform.
Comparable with compression, this effect modifies an audio signal to make it sound louder.
Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation.
Note:
The dimension of the audio waveform to be processed needs to be (..., time).
Args:
enhancement_amount (float, optional): Controls the amount of the enhancement,
in range of [0, 100]. Default: 75.0. Note that `enhancement_amount` equal
to 0 still gives a significant contrast enhancement.
Raises:
TypeError: If `enhancement_amount` is not of type float.
ValueError: If `enhancement_amount` is not in range [0, 100].
RuntimeError: If input tensor is not in shape of <..., time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Contrast()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_contrast
def __init__(self, enhancement_amount=75.0):
self.enhancement_amount = enhancement_amount
def parse(self):
return cde.ContrastOperation(self.enhancement_amount)
class DBToAmplitude(AudioTensorOperation):
"""
Turn a waveform from the decibel scale to the power/amplitude scale.
Args:
ref (float): Reference which the output will be scaled by.
power (float): If power equals 1, will compute DB to power. If 0.5, will compute DB to amplitude.
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.DBToAmplitude(0.5, 0.5)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_db_to_amplitude
def __init__(self, ref, power):
self.ref = ref
self.power = power
def parse(self):
return cde.DBToAmplitudeOperation(self.ref, self.power)
class DCShift(AudioTensorOperation):
"""
Apply a DC shift to the audio.
Args:
shift (float): The amount to shift the audio, the value must be in the range [-2.0, 2.0].
limiter_gain (float, optional): Used only on peaks to prevent clipping,
the value should be much less than 1, such as 0.05 or 0.02.
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([0.60, 0.97, -1.04, -1.26, 0.97, 0.91, 0.48, 0.93])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.DCShift(0.5, 0.02)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_dc_shift
def __init__(self, shift, limiter_gain=None):
self.shift = shift
self.limiter_gain = limiter_gain if limiter_gain else shift
def parse(self):
return cde.DCShiftOperation(self.shift, self.limiter_gain)
class DeemphBiquad(AudioTensorOperation):
"""
Design two-pole deemph filter for audio waveform of dimension of (..., time).
Args:
sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz),
the value must be 44100 or 48000.
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.DeemphBiquad(44100)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_deemph_biquad
def __init__(self, sample_rate):
self.sample_rate = sample_rate
def parse(self):
return cde.DeemphBiquadOperation(self.sample_rate)
class DetectPitchFrequency(AudioTensorOperation):
"""
Detect pitch frequency.
It is implemented using normalized cross-correlation function and median smoothing.
Args:
sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz), the value can't be zero.
frame_time (float, optional): Duration of a frame, the value must be greater than zero (default=0.01).
win_length (int, optional): The window length for median smoothing (in number of frames), the value must be
greater than zero (default=30).
freq_low (int, optional): Lowest frequency that can be detected (Hz), the value must be greater than zero
(default=85).
freq_high (int, optional): Highest frequency that can be detected (Hz), the value must be greater than zero
(default=3400).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[0.716064e-03, 5.347656e-03, 6.246826e-03, 2.089477e-02, 7.138305e-02],
... [4.156616e-02, 1.394653e-02, 3.550292e-02, 0.614379e-02, 3.840209e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.DetectPitchFrequency(30, 0.1, 3, 5, 25)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_detect_pitch_frequency
def __init__(self, sample_rate, frame_time=0.01, win_length=30, freq_low=85, freq_high=3400):
self.sample_rate = sample_rate
self.frame_time = frame_time
self.win_length = win_length
self.freq_low = freq_low
self.freq_high = freq_high
def parse(self):
return cde.DetectPitchFrequencyOperation(self.sample_rate, self.frame_time,
self.win_length, self.freq_low, self.freq_high)
DE_C_DENSITY_FUNCTION = {DensityFunction.TPDF: cde.DensityFunction.DE_DENSITY_FUNCTION_TPDF,
DensityFunction.RPDF: cde.DensityFunction.DE_DENSITY_FUNCTION_RPDF,
DensityFunction.GPDF: cde.DensityFunction.DE_DENSITY_FUNCTION_GPDF}
class Dither(AudioTensorOperation):
"""
Dither increases the perceived dynamic range of audio stored at a
particular bit-depth by eliminating nonlinear truncation distortion.
Args:
density_function (DensityFunction, optional): The density function of a continuous
random variable. Can be one of DensityFunction.TPDF (Triangular Probability Density Function),
DensityFunction.RPDF (Rectangular Probability Density Function) or
DensityFunction.GPDF (Gaussian Probability Density Function)
(default=DensityFunction.TPDF).
noise_shaping (bool, optional): A filtering process that shapes the spectral
energy of quantisation error (default=False).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[1, 2, 3], [4, 5, 6]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Dither()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_dither
def __init__(self, density_function=DensityFunction.TPDF, noise_shaping=False):
self.density_function = density_function
self.noise_shaping = noise_shaping
def parse(self):
return cde.DitherOperation(DE_C_DENSITY_FUNCTION[self.density_function], self.noise_shaping)
class EqualizerBiquad(AudioTensorOperation):
"""
Design biquad equalizer filter and perform filtering. Similar to SoX implementation.
Args:
sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz), the value can't be zero.
center_freq (float): Central frequency (in Hz).
gain (float): Desired gain at the boost (or attenuation) in dB.
Q (float, optional): https://en.wikipedia.org/wiki/Q_factor, range: (0, 1] (default=0.707).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.EqualizerBiquad(44100, 1500, 5.5, 0.7)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_equalizer_biquad
def __init__(self, sample_rate, center_freq, gain, Q=0.707):
self.sample_rate = sample_rate
self.center_freq = center_freq
self.gain = gain
self.Q = Q
def parse(self):
return cde.EqualizerBiquadOperation(self.sample_rate, self.center_freq, self.gain, self.Q)
DE_C_FADE_SHAPE = {FadeShape.QUARTER_SINE: cde.FadeShape.DE_FADE_SHAPE_QUARTER_SINE,
FadeShape.HALF_SINE: cde.FadeShape.DE_FADE_SHAPE_HALF_SINE,
FadeShape.LINEAR: cde.FadeShape.DE_FADE_SHAPE_LINEAR,
FadeShape.LOGARITHMIC: cde.FadeShape.DE_FADE_SHAPE_LOGARITHMIC,
FadeShape.EXPONENTIAL: cde.FadeShape.DE_FADE_SHAPE_EXPONENTIAL}
class Fade(AudioTensorOperation):
"""
Add a fade in and/or fade out to an waveform.
Args:
fade_in_len (int, optional): Length of fade-in (time frames), which must be non-negative (default=0).
fade_out_len (int, optional): Length of fade-out (time frames), which must be non-negative (default=0).
fade_shape (FadeShape, optional): Shape of fade (default=FadeShape.LINEAR). Can be one of
FadeShape.QUARTER_SINE, FadeShape.HALF_SINE, FadeShape.LINEAR, FadeShape.LOGARITHMIC or
FadeShape.EXPONENTIAL.
-FadeShape.QUARTER_SINE, means it tend to 0 in an quarter sin function.
-FadeShape.HALF_SINE, means it tend to 0 in an half sin function.
-FadeShape.LINEAR, means it linear to 0.
-FadeShape.LOGARITHMIC, means it tend to 0 in an logrithmic function.
-FadeShape.EXPONENTIAL, means it tend to 0 in an exponential function.
Raises:
RuntimeError: If fade_in_len exceeds waveform length.
RuntimeError: If fade_out_len exceeds waveform length.
Examples:
>>> import numpy as np
>>> from mindspore.dataset.audio import FadeShape
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03, 9.246826171875e-03, 1.0894775390625e-02]])
>>> dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Fade(fade_in_len=3, fade_out_len=2, fade_shape=FadeShape.LINEAR)]
>>> dataset = dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_fade
def __init__(self, fade_in_len=0, fade_out_len=0, fade_shape=FadeShape.LINEAR):
self.fade_in_len = fade_in_len
self.fade_out_len = fade_out_len
self.fade_shape = fade_shape
def parse(self):
return cde.FadeOperation(self.fade_in_len, self.fade_out_len, DE_C_FADE_SHAPE[self.fade_shape])
DE_C_MODULATION = {Modulation.SINUSOIDAL: cde.Modulation.DE_MODULATION_SINUSOIDAL,
Modulation.TRIANGULAR: cde.Modulation.DE_MODULATION_TRIANGULAR}
DE_C_INTERPOLATION = {Interpolation.LINEAR: cde.Interpolation.DE_INTERPOLATION_LINEAR,
Interpolation.QUADRATIC: cde.Interpolation.DE_INTERPOLATION_QUADRATIC}
class Flanger(AudioTensorOperation):
"""
Apply a flanger effect to the audio.
Args:
sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz).
delay (float, optional): Desired delay in milliseconds (ms), range: [0, 30] (default=0.0).
depth (float, optional): Desired delay depth in milliseconds (ms), range: [0, 10] (default=2.0).
regen (float, optional): Desired regen (feedback gain) in dB, range: [-95, 95] (default=0.0).
width (float, optional): Desired width (delay gain) in dB, range: [0, 100] (default=71.0).
speed (float, optional): Modulation speed in Hz, range: [0.1, 10] (default=0.5).
phase (float, optional): Percentage phase-shift for multi-channel, range: [0, 100] (default=25.0).
modulation (Modulation, optional): Modulation of the input tensor (default=Modulation.SINUSOIDAL).
It can be one of Modulation.SINUSOIDAL or Modulation.TRIANGULAR.
interpolation (Interpolation, optional): Interpolation of the input tensor (default=Interpolation.LINEAR).
It can be one of Interpolation.LINEAR or Interpolation.QUADRATIC.
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Flanger(44100)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_flanger
def __init__(self, sample_rate, delay=0.0, depth=2.0, regen=0.0, width=71.0, speed=0.5,
phase=25.0, modulation=Modulation.SINUSOIDAL, interpolation=Interpolation.LINEAR):
self.sample_rate = sample_rate
self.delay = delay
self.depth = depth
self.regen = regen
self.width = width
self.speed = speed
self.phase = phase
self.modulation = modulation
self.interpolation = interpolation
def parse(self):
return cde.FlangerOperation(self.sample_rate, self.delay, self.depth, self.regen, self.width, self.speed,
self.phase, DE_C_MODULATION[self.modulation],
DE_C_INTERPOLATION[self.interpolation])
[docs]class FrequencyMasking(AudioTensorOperation):
"""
Apply masking to a spectrogram in the frequency domain.
Note:
The dimension of the audio waveform to be processed needs to be (..., freq, time).
Args:
iid_masks (bool, optional): Whether to apply different masks to each example/channel. Default: False.
freq_mask_param (int, optional): When `iid_masks` is True, length of the mask will be uniformly sampled
from [0, freq_mask_param]; When `iid_masks` is False, directly use it as length of the mask.
The value should be in range of [0, freq_length], where `freq_length` is the length of audio waveform
in frequency domain. Default: 0.
mask_start (int, optional): Starting point to apply mask, only works when `iid_masks` is True. The value should
be in range of [0, freq_length - freq_mask_param], where `freq_length` is the length of audio waveform
in frequency domain. Default: 0.
mask_value (float, optional): Value to assign to the masked columns. Default: 0.0.
Raises:
TypeError: If `iid_masks` is not of type bool.
TypeError: If `freq_mask_param` is not of type integer.
ValueError: If `freq_mask_param` is greater than the length of audio waveform in frequency domain.
TypeError: If `mask_start` is not of type integer.
ValueError: If `mask_start` is a negative number.
TypeError: If `mask_value` is not of type float.
ValueError: If `mask_value` is a negative number.
RuntimeError: If input tensor is not in shape of <..., freq, time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([1, 3, 2])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.FrequencyMasking(freq_mask_param=1)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
.. image:: frequency_masking_original.png
.. image:: frequency_masking.png
"""
@check_masking
def __init__(self, iid_masks=False, freq_mask_param=0, mask_start=0, mask_value=0.0):
self.iid_masks = iid_masks
self.frequency_mask_param = freq_mask_param
self.mask_start = mask_start
self.mask_value = mask_value
def parse(self):
return cde.FrequencyMaskingOperation(self.iid_masks, self.frequency_mask_param, self.mask_start,
self.mask_value)
class Gain(AudioTensorOperation):
"""
Apply amplification or attenuation to the whole waveform.
Args:
gain_db (float): Gain adjustment in decibels (dB) (default=1.0).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Gain(1.2)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_gain
def __init__(self, gain_db=1.0):
self.gain_db = gain_db
def parse(self):
return cde.GainOperation(self.gain_db)
class HighpassBiquad(AudioTensorOperation):
"""
Design biquad highpass filter and perform filtering. Similar to SoX implementation.
Args:
sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz), the value can't be zero.
cutoff_freq (float): Filter cutoff frequency (in Hz).
Q (float, optional): Quality factor, https://en.wikipedia.org/wiki/Q_factor, range: (0, 1] (default=0.707).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.HighpassBiquad(44100, 1500, 0.7)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_highpass_biquad
def __init__(self, sample_rate, cutoff_freq, Q=0.707):
self.sample_rate = sample_rate
self.cutoff_freq = cutoff_freq
self.Q = Q
def parse(self):
return cde.HighpassBiquadOperation(self.sample_rate, self.cutoff_freq, self.Q)
class LFilter(AudioTensorOperation):
"""
Design two-pole filter for audio waveform of dimension of (..., time).
Args:
a_coeffs (sequence): denominator coefficients of difference equation of dimension of (n_order + 1).
Lower delays coefficients are first, e.g. [a0, a1, a2, ...].
Must be same size as b_coeffs (pad with 0's as necessary).
b_coeffs (sequence): numerator coefficients of difference equation of dimension of (n_order + 1).
Lower delays coefficients are first, e.g. [b0, b1, b2, ...].
Must be same size as a_coeffs (pad with 0's as necessary).
clamp (bool, optional): If True, clamp the output signal to be in the range [-1, 1] (default=True).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[2.716064453125e-03, 6.34765625e-03], [9.246826171875e-03, 1.0894775390625e-02]])
>>> a_coeffs = [0.1, 0.2, 0.3]
>>> b_coeffs = [0.1, 0.2, 0.3]
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.LFilter(a_coeffs, b_coeffs)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_lfilter
def __init__(self, a_coeffs, b_coeffs, clamp=True):
self.a_coeffs = a_coeffs
self.b_coeffs = b_coeffs
self.clamp = clamp
def parse(self):
return cde.LFilterOperation(self.a_coeffs, self.b_coeffs, self.clamp)
[docs]class LowpassBiquad(AudioTensorOperation):
r"""
Design two-pole low-pass filter for audio waveform.
A low-pass filter passes frequencies lower than a selected cutoff frequency
but attenuates frequencies higher than it. The system function is:
.. math::
H(s) = \frac{1}{s^2 + \frac{s}{Q} + 1}
Similar to `SoX <http://sox.sourceforge.net/sox.html>`_ implementation.
Note:
The dimension of the audio waveform to be processed needs to be (..., time).
Args:
sample_rate (int): Sampling rate (in Hz), which can't be zero.
cutoff_freq (float): Filter cutoff frequency (in Hz).
Q (float, optional): `Quality factor <https://en.wikipedia.org/wiki/Q_factor>`_ ,
in range of (0, 1]. Default: 0.707.
Raises:
TypeError: If `sample_rate` is not of type integer.
ValueError: If `sample_rate` is 0.
TypeError: If `cutoff_freq` is not of type float.
TypeError: If `Q` is not of type float.
ValueError: If `Q` is not in range of (0, 1].
RuntimeError: If input tensor is not in shape of <..., time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[0.8236, 0.2049, 0.3335], [0.5933, 0.9911, 0.2482],
... [0.3007, 0.9054, 0.7598], [0.5394, 0.2842, 0.5634], [0.6363, 0.2226, 0.2288]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.LowpassBiquad(4000, 1500, 0.7)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_lowpass_biquad
def __init__(self, sample_rate, cutoff_freq, Q=0.707):
self.sample_rate = sample_rate
self.cutoff_freq = cutoff_freq
self.Q = Q
def parse(self):
return cde.LowpassBiquadOperation(self.sample_rate, self.cutoff_freq, self.Q)
class Magphase(AudioTensorOperation):
"""
Separate a complex-valued spectrogram with shape (..., 2) into its magnitude and phase.
Args:
power (float): Power of the norm, which must be non-negative (default=1.0).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([2, 4, 2])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Magphase()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_magphase
def __init__(self, power=1.0):
self.power = power
def parse(self):
return cde.MagphaseOperation(self.power)
class MaskAlongAxis(AudioTensorOperation):
"""
Apply a mask along `axis`. Mask will be applied from indices `[mask_start, mask_start + mask_width)`.
Args:
mask_start (int): Starting position of the mask, which must be non negative.
mask_width (int): The width of the mask, which must be non negative.
mask_value (float): Value to assign to the masked columns.
axis (int): Axis to apply masking on (1 for frequency and 2 for time).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([1, 20, 20])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.MaskAlongAxis(0, 10, 0.5, 1)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_mask_along_axis
def __init__(self, mask_start, mask_width, mask_value, axis):
self.mask_start = mask_start
self.mask_width = mask_width
self.mask_value = mask_value
self.axis = axis
def parse(self):
return cde.MaskAlongAxisOperation(self.mask_start, self.mask_width, self.mask_value, self.axis)
class MaskAlongAxisIID(AudioTensorOperation):
"""
Apply a mask along `axis`. Mask will be applied from indices `[mask_start, mask_start + mask_width)`, where
`mask_width` is sampled from `uniform[0, mask_param]`, and `mask_start` from `uniform[0, max_length - mask_width]`,
`max_length` is the number of columns of the specified axis of the spectrogram.
Args:
mask_param (int): Number of columns to be masked, will be uniformly sampled from
[0, mask_param], must be non negative.
mask_value (float): Value to assign to the masked columns.
axis (int): Axis to apply masking on (1 for frequency and 2 for time).
Examples:
>>> import numpy as np
>>>
>>> waveform= np.random.random([1, 20, 20])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.MaskAlongAxisIID(5, 0.5, 2)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_mask_along_axis_iid
def __init__(self, mask_param, mask_value, axis):
self.mask_param = mask_param
self.mask_value = mask_value
self.axis = axis
def parse(self):
return cde.MaskAlongAxisIIDOperation(self.mask_param, self.mask_value, self.axis)
DE_C_MEL_TYPE = {MelType.SLANEY: cde.MelType.DE_MEL_TYPE_SLANEY,
MelType.HTK: cde.MelType.DE_MEL_TYPE_HTK}
DE_C_NORM_TYPE = {NormType.NONE: cde.NormType.DE_NORM_TYPE_NONE,
NormType.SLANEY: cde.NormType.DE_NORM_TYPE_SLANEY}
class MelScale(AudioTensorOperation):
"""
Convert normal STFT to STFT at the Mel scale.
Args:
n_mels (int, optional): Number of mel filterbanks (default=128).
sample_rate (int, optional): Sample rate of audio signal (default=16000).
f_min (float, optional): Minimum frequency (default=0).
f_max (float, optional): Maximum frequency (default=None, will be set to sample_rate // 2).
n_stft (int, optional): Number of bins in STFT (default=201).
norm (NormType, optional): Type of norm, value should be NormType.SLANEY or NormType::NONE.
If norm is NormType.SLANEY, divide the triangular mel weight by the width of the mel band.
(default=NormType.NONE).
mel_type (MelType, optional): Type to use, value should be MelType.SLANEY or MelType.HTK (default=MelType.HTK).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[0.8236, 0.2049, 0.3335], [0.5933, 0.9911, 0.2482],
... [0.3007, 0.9054, 0.7598], [0.5394, 0.2842, 0.5634], [0.6363, 0.2226, 0.2288]])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.MelScale(4000, 1500, 0.7)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_mel_scale
def __init__(self, n_mels=128, sample_rate=16000, f_min=0, f_max=None, n_stft=201, norm=NormType.NONE,
mel_type=MelType.HTK):
self.n_mels = n_mels
self.sample_rate = sample_rate
self.f_min = f_min
self.f_max = f_max if f_max is not None else sample_rate // 2
self.n_stft = n_stft
self.norm = norm
self.mel_type = mel_type
def parse(self):
return cde.MelScaleOperation(self.n_mels, self.sample_rate, self.f_min, self.f_max, self.n_stft,
DE_C_NORM_TYPE[self.norm], DE_C_MEL_TYPE[self.mel_type])
class MuLawDecoding(AudioTensorOperation):
"""
Decode mu-law encoded signal.
Args:
quantization_channels (int): Number of channels, which must be positive (Default: 256).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([1, 3, 4])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.MuLawDecoding()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_mu_law_coding
def __init__(self, quantization_channels=256):
self.quantization_channels = quantization_channels
def parse(self):
return cde.MuLawDecodingOperation(self.quantization_channels)
class MuLawEncoding(AudioTensorOperation):
"""
Encode signal based on mu-law companding.
Args:
quantization_channels (int): Number of channels, which must be positive (Default: 256).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([1, 3, 4])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.MuLawEncoding()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_mu_law_coding
def __init__(self, quantization_channels=256):
self.quantization_channels = quantization_channels
def parse(self):
return cde.MuLawEncodingOperation(self.quantization_channels)
class Overdrive(AudioTensorOperation):
"""
Apply overdrive on input audio.
Args:
gain (float): Desired gain at the boost (or attenuation) in dB, in range of [0, 100] (default=20.0).
color (float): Controls the amount of even harmonic content in the over-driven output,
in range of [0, 100] (default=20.0).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float32)
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Overdrive()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_overdrive
def __init__(self, gain=20.0, color=20.0):
self.gain = gain
self.color = color
def parse(self):
return cde.OverdriveOperation(self.gain, self.color)
class Phaser(AudioTensorOperation):
"""
Apply a phasing effect to the audio.
Args:
sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz).
gain_in (float): Desired input gain at the boost (or attenuation) in dB.
Allowed range of values is [0, 1] (default=0.4).
gain_out (float): Desired output gain at the boost (or attenuation) in dB.
Allowed range of values is [0, 1e9] (default=0.74).
delay_ms (float): Desired delay in milli seconds. Allowed range of values is [0, 5] (default=3.0).
decay (float): Desired decay relative to gain-in. Allowed range of values is [0, 0.99] (default=0.4).
mod_speed (float): Modulation speed in Hz. Allowed range of values is [0.1, 2] (default=0.5).
sinusoidal (bool): If True, use sinusoidal modulation (preferable for multiple instruments).
If False, use triangular modulation (gives single instruments a sharper
phasing effect) (default=True).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float32)
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Phaser(44100)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_phaser
def __init__(self, sample_rate, gain_in=0.4, gain_out=0.74,
delay_ms=3.0, decay=0.4, mod_speed=0.5, sinusoidal=True):
self.decay = decay
self.delay_ms = delay_ms
self.gain_in = gain_in
self.gain_out = gain_out
self.mod_speed = mod_speed
self.sample_rate = sample_rate
self.sinusoidal = sinusoidal
def parse(self):
return cde.PhaserOperation(self.sample_rate, self.gain_in, self.gain_out,
self.delay_ms, self.decay, self.mod_speed, self.sinusoidal)
class PhaseVocoder(AudioTensorOperation):
"""
Given a STFT tensor, speed up in time without modifying pitch by a factor of rate.
Args:
rate (float): Speed-up factor.
phase_advance (numpy.ndarray): Expected phase advance in each bin in shape of (freq, 1).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.randn(2, 44, 10, 2)
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> phase_advance = np.random.randn(44, 1)
>>> transforms = [audio.PhaseVocoder(rate=2, phase_advance=phase_advance)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_phase_vocoder
def __init__(self, rate, phase_advance):
self.rate = rate
self.phase_advance = cde.Tensor(phase_advance)
def parse(self):
return cde.PhaseVocoderOperation(self.rate, self.phase_advance)
class RiaaBiquad(AudioTensorOperation):
"""
Apply RIAA vinyl playback equalization. Similar to SoX implementation.
Args:
sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz),
can only be one of 44100, 48000, 88200, 96000.
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float64)
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.RiaaBiquad(44100)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_riaa_biquad
def __init__(self, sample_rate):
self.sample_rate = sample_rate
def parse(self):
return cde.RiaaBiquadOperation(self.sample_rate)
class SlidingWindowCmn(AudioTensorOperation):
"""
Apply sliding-window cepstral mean (and optionally variance) normalization per utterance.
Args:
cmn_window (int, optional): Window in frames for running average CMN computation (default=600).
min_cmn_window (int, optional): Minimum CMN window used at start of decoding (adds latency only at start).
Only applicable if center is False, ignored if center is True (default=100).
center (bool, optional): If True, use a window centered on the current frame. If False, window is
to the left. (default=False).
norm_vars (bool, optional): If True, normalize variance to one. (default=False).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[[1, 2, 3], [4, 5, 6]]], dtype=np.float64)
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.SlidingWindowCmn()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_sliding_window_cmn
def __init__(self, cmn_window=600, min_cmn_window=100, center=False, norm_vars=False):
self.cmn_window = cmn_window
self.min_cmn_window = min_cmn_window
self.center = center
self.norm_vars = norm_vars
def parse(self):
return cde.SlidingWindowCmnOperation(self.cmn_window, self.min_cmn_window, self.center, self.norm_vars)
DE_C_WINDOW_TYPE = {WindowType.BARTLETT: cde.WindowType.DE_WINDOW_TYPE_BARTLETT,
WindowType.BLACKMAN: cde.WindowType.DE_WINDOW_TYPE_BLACKMAN,
WindowType.HAMMING: cde.WindowType.DE_WINDOW_TYPE_HAMMING,
WindowType.HANN: cde.WindowType.DE_WINDOW_TYPE_HANN,
WindowType.KAISER: cde.WindowType.DE_WINDOW_TYPE_KAISER}
class SpectralCentroid(TensorOperation):
"""
Create a spectral centroid from an audio signal.
Args:
sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz).
n_fft (int, optional): Size of FFT, creates n_fft // 2 + 1 bins (default=400).
win_length (int, optional): Window size (default=None, will use n_fft).
hop_length (int, optional): Length of hop between STFT windows (default=None, will use win_length // 2).
pad (int, optional): Two sided padding of signal (default=0).
window (WindowType, optional): Window function that is applied/multiplied to each frame/window,
which can be WindowType.BARTLETT, WindowType.BLACKMAN, WindowType.HAMMING, WindowType.HANN
or WindowType.KAISER (default=WindowType.HANN).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([5, 10, 20])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.SpectralCentroid(44100)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_spectral_centroid
def __init__(self, sample_rate, n_fft=400, win_length=None, hop_length=None, pad=0, window=WindowType.HANN):
self.sample_rate = sample_rate
self.pad = pad
self.window = window
self.n_fft = n_fft
self.win_length = win_length if win_length else n_fft
self.hop_length = hop_length if hop_length else self.win_length // 2
def parse(self):
return cde.SpectralCentroidOperation(self.sample_rate, self.n_fft, self.win_length, self.hop_length,
self.pad, DE_C_WINDOW_TYPE[self.window])
class Spectrogram(TensorOperation):
"""
Create a spectrogram from an audio signal.
Args:
n_fft (int, optional): Size of FFT, creates n_fft // 2 + 1 bins (default=400).
win_length (int, optional): Window size (default=None, will use n_fft).
hop_length (int, optional): Length of hop between STFT windows (default=None, will use win_length // 2).
pad (int): Two sided padding of signal (default=0).
window (WindowType, optional): Window function that is applied/multiplied to each frame/window,
which can be WindowType.BARTLETT, WindowType.BLACKMAN, WindowType.HAMMING, WindowType.HANN
or WindowType.KAISER (default=WindowType.HANN). Currently kaiser window is not supported on macOS.
power (float, optional): Exponent for the magnitude spectrogram, which must be greater
than or equal to 0, e.g., 1 for energy, 2 for power, etc. (default=2.0).
normalized (bool, optional): Whether to normalize by magnitude after stft (default=False).
center (bool, optional): Whether to pad waveform on both sides (default=True).
pad_mode (BorderType, optional): Controls the padding method used when center is True,
which can be BorderType.REFLECT, BorderType.CONSTANT, BorderType.EDGE, BorderType.SYMMETRIC
(default=BorderType.REFLECT).
onesided (bool, optional): Controls whether to return half of results to avoid redundancy (default=True).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([5, 10, 20])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Spectrogram()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_spectrogram
def __init__(self, n_fft=400, win_length=None, hop_length=None, pad=0, window=WindowType.HANN, power=2.0,
normalized=False, center=True, pad_mode=BorderType.REFLECT, onesided=True):
self.n_fft = n_fft
self.win_length = win_length if win_length else n_fft
self.hop_length = hop_length if hop_length else self.win_length // 2
self.pad = pad
self.window = window
self.power = power
self.normalized = normalized
self.center = center
self.pad_mode = pad_mode
self.onesided = onesided
def parse(self):
return cde.SpectrogramOperation(self.n_fft, self.win_length, self.hop_length, self.pad,
DE_C_WINDOW_TYPE[self.window], self.power, self.normalized,
self.center, DE_C_BORDER_TYPE[self.pad_mode], self.onesided)
[docs]class TimeMasking(AudioTensorOperation):
"""
Apply masking to a spectrogram in the time domain.
Note:
The dimension of the audio waveform to be processed needs to be (..., freq, time).
Args:
iid_masks (bool, optional): Whether to apply different masks to each example/channel. Default: False.
time_mask_param (int, optional): When `iid_masks` is True, length of the mask will be uniformly sampled
from [0, time_mask_param]; When `iid_masks` is False, directly use it as length of the mask.
The value should be in range of [0, time_length], where `time_length` is the length of audio waveform
in time domain. Default: 0.
mask_start (int, optional): Starting point to apply mask, only works when `iid_masks` is True. The value should
be in range of [0, time_length - time_mask_param], where `time_length` is the length of audio waveform
in time domain. Default: 0.
mask_value (float, optional): Value to assign to the masked columns. Default: 0.0.
Raises:
TypeError: If `iid_masks` is not of type bool.
TypeError: If `time_mask_param` is not of type integer.
ValueError: If `time_mask_param` is greater than the length of audio waveform in time domain.
TypeError: If `mask_start` is not of type integer.
ValueError: If `mask_start` a negative number.
TypeError: If `mask_value` is not of type float.
ValueError: If `mask_value` is a negative number.
RuntimeError: If input tensor is not in shape of <..., freq, time>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([1, 3, 2])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.TimeMasking(time_mask_param=1)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
.. image:: time_masking_original.png
.. image:: time_masking.png
"""
@check_masking
def __init__(self, iid_masks=False, time_mask_param=0, mask_start=0, mask_value=0.0):
self.iid_masks = iid_masks
self.time_mask_param = time_mask_param
self.mask_start = mask_start
self.mask_value = mask_value
def parse(self):
return cde.TimeMaskingOperation(self.iid_masks, self.time_mask_param, self.mask_start, self.mask_value)
[docs]class TimeStretch(AudioTensorOperation):
"""
Stretch Short Time Fourier Transform (STFT) in time without modifying pitch for a given rate.
Note:
The dimension of the audio waveform to be processed needs to be (..., freq, time, complex=2).
The first dimension represents the real part while the second represents the imaginary.
Args:
hop_length (int, optional): Length of hop between STFT windows, i.e. the number of samples
between consecutive frames. Default: None, will use `n_freq - 1`.
n_freq (int, optional): Number of filter banks from STFT. Default: 201.
fixed_rate (float, optional): Rate to speed up or slow down by. Default: None, will keep
the original rate.
Raises:
TypeError: If `hop_length` is not of type integer.
ValueError: If `hop_length` is not a positive number.
TypeError: If `n_freq` is not of type integer.
ValueError: If `n_freq` is not a positive number.
TypeError: If `fixed_rate` is not of type float.
ValueError: If `fixed_rate` is not a positive number.
RuntimeError: If input tensor is not in shape of <..., freq, num_frame, complex=2>.
Supported Platforms:
``CPU``
Examples:
>>> import numpy as np
>>>
>>> waveform = np.random.random([1, 30])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.TimeStretch()]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
.. image:: time_stretch_rate1.5.png
.. image:: time_stretch_original.png
.. image:: time_stretch_rate0.8.png
"""
@check_time_stretch
def __init__(self, hop_length=None, n_freq=201, fixed_rate=None):
self.n_freq = n_freq
self.fixed_rate = fixed_rate
n_fft = (n_freq - 1) * 2
self.hop_length = hop_length if hop_length is not None else n_fft // 2
self.fixed_rate = fixed_rate if fixed_rate is not None else 1
def parse(self):
return cde.TimeStretchOperation(self.hop_length, self.n_freq, self.fixed_rate)
class TrebleBiquad(AudioTensorOperation):
"""
Design a treble tone-control effect. Similar to SoX implementation.
Args:
sample_rate (int): Sampling rate of the waveform, e.g. 44100 (Hz), the value can't be zero.
gain (float): Desired gain at the boost (or attenuation) in dB.
central_freq (float, optional): Central frequency (in Hz) (default=3000).
Q(float, optional): Quality factor, https://en.wikipedia.org/wiki/Q_factor, range: (0, 1] (default=0.707).
Examples:
>>> import numpy as np
>>>
>>> waveform = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.float64)
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.TrebleBiquad(44100, 200.0)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_treble_biquad
def __init__(self, sample_rate, gain, central_freq=3000, Q=0.707):
self.sample_rate = sample_rate
self.gain = gain
self.central_freq = central_freq
self.Q = Q
def parse(self):
return cde.TrebleBiquadOperation(self.sample_rate, self.gain, self.central_freq, self.Q)
DE_C_GAIN_TYPE = {GainType.AMPLITUDE: cde.GainType.DE_GAIN_TYPE_AMPLITUDE,
GainType.POWER: cde.GainType.DE_GAIN_TYPE_POWER,
GainType.DB: cde.GainType.DE_GAIN_TYPE_DB}
class Vol(AudioTensorOperation):
"""
Apply amplification or attenuation to the whole waveform.
Args:
gain (float): Value of gain adjustment.
If gain_type = amplitude, gain stands for nonnegative amplitude ratio.
If gain_type = power, gain stands for power.
If gain_type = db, gain stands for decibels.
gain_type (GainType, optional): Type of gain, contains the following three enumeration values
GainType.AMPLITUDE, GainType.POWER and GainType.DB (default=GainType.AMPLITUDE).
Examples:
>>> import numpy as np
>>> from mindspore.dataset.audio import GainType
>>>
>>> waveform = np.random.random([20, 30])
>>> numpy_slices_dataset = ds.NumpySlicesDataset(data=waveform, column_names=["audio"])
>>> transforms = [audio.Vol(gain=10, gain_type=GainType.DB)]
>>> numpy_slices_dataset = numpy_slices_dataset.map(operations=transforms, input_columns=["audio"])
"""
@check_vol
def __init__(self, gain, gain_type=GainType.AMPLITUDE):
self.gain = gain
self.gain_type = gain_type
def parse(self):
return cde.VolOperation(self.gain, DE_C_GAIN_TYPE[self.gain_type])