SPPAS 4.20

Module sppas.src.annotations

Class sppasSearchIPUs

Description

SPPAS integration of the IPUs detection.

Constructor

Create a new sppasSearchIPUs instance.

Parameters
  • log: (sppasLog) Human-readable logs.
View Source
def __init__(self, log=None):
    """Create a new sppasSearchIPUs instance.

    :param log: (sppasLog) Human-readable logs.

    """
    super(sppasSearchIPUs, self).__init__('searchipus.json', log)

Public functions

fix_options

Fix all options.

Available options are:

  • threshold: volume threshold to decide a window is silence or not
  • win_length: length of window for a estimation or volume values
  • min_sil: minimum duration of a silence
  • min_ipu: minimum duration of an ipu
  • shift_start: start boundary shift value.
  • shift_end: end boundary shift value.
Parameters
  • options: (sppasOption)
View Source
def fix_options(self, options):
    """Fix all options.

        Available options are:

            - threshold: volume threshold to decide a window is silence or not
            - win_length: length of window for a estimation or volume values
            - min_sil: minimum duration of a silence
            - min_ipu: minimum duration of an ipu
            - shift_start: start boundary shift value.
            - shift_end: end boundary shift value.

        :param options: (sppasOption)

        """
    for opt in options:
        key = opt.get_key()
        if 'threshold' == key:
            self.set_threshold(opt.get_value())
        elif 'win_length' == key:
            self.set_win_length(opt.get_value())
        elif 'min_sil' == key:
            self.set_min_sil(opt.get_value())
        elif 'min_ipu' == key:
            self.set_min_ipu(opt.get_value())
        elif 'shift_start' == key:
            self.set_shift_start(opt.get_value())
        elif 'shift_end' == key:
            self.set_shift_end(opt.get_value())
        elif 'pattern' in key:
            self._options[key] = opt.get_value()
        else:
            raise AnnotationOptionError(key)
get_threshold
View Source
def get_threshold(self):
    return self._options['threshold']
get_win_length
View Source
def get_win_length(self):
    return self._options['win_length']
get_min_sil
View Source
def get_min_sil(self):
    return self._options['min_sil']
get_min_ipu
View Source
def get_min_ipu(self):
    return self._options['min_ipu']
get_shift_start
View Source
def get_shift_start(self):
    return self._options['shift_start']
get_shift_end
View Source
def get_shift_end(self):
    return self._options['shift_end']
set_threshold

Fix the threshold volume.

Parameters
  • value: (int) RMS value used as volume threshold
View Source
def set_threshold(self, value):
    """Fix the threshold volume.

        :param value: (int) RMS value used as volume threshold

        """
    self._options['threshold'] = value
set_win_length

Set a new length of window for a estimation or volume values.

TAKE CARE:

it cancels any previous estimation of volume and silence search.

Parameters
  • value: (float) generally between 0.01 and 0.04 seconds.
View Source
def set_win_length(self, value):
    """Set a new length of window for a estimation or volume values.

        TAKE CARE:
        it cancels any previous estimation of volume and silence search.

        :param value: (float) generally between 0.01 and 0.04 seconds.

        """
    self._options['win_length'] = value
set_min_sil

Fix the default minimum duration of a silence.

Parameters
  • value: (float) Duration in seconds.
View Source
def set_min_sil(self, value):
    """Fix the default minimum duration of a silence.

        :param value: (float) Duration in seconds.

        """
    self._options['min_sil'] = value
set_min_ipu

Fix the default minimum duration of an IPU.

Parameters
  • value: (float) Duration in seconds.
View Source
def set_min_ipu(self, value):
    """Fix the default minimum duration of an IPU.

        :param value: (float) Duration in seconds.

        """
    self._options['min_ipu'] = value
set_shift_start

Fix the start boundary shift value.

Parameters
  • value: (float) Duration in seconds.
View Source
def set_shift_start(self, value):
    """Fix the start boundary shift value.

        :param value: (float) Duration in seconds.

        """
    self._options['shift_start'] = value
set_shift_end

Fix the end boundary shift value.

Parameters
  • value: (float) Duration in seconds.
View Source
def set_shift_end(self, value):
    """Fix the end boundary shift value.

        :param value: (float) Duration in seconds.

        """
    self._options['shift_end'] = value
tracks_to_tier

Create a sppasTier object from tracks.

Parameters
  • tracks: (List of tuple) with (from, to) values in seconds
  • end_time: (float) End-time of the tier
  • vagueness: (float) vagueness used for silence search
View Source
@staticmethod
def tracks_to_tier(tracks, end_time, vagueness):
    """Create a sppasTier object from tracks.

        :param tracks: (List of tuple) with (from, to) values in seconds
        :param end_time: (float) End-time of the tier
        :param vagueness: (float) vagueness used for silence search

        """
    if len(tracks) == 0:
        raise IOError('No IPUs to write.\n')
    tier = sppasTier('IPUs')
    tier.set_meta('number_of_ipus', str(len(tracks)))
    i = 0
    to_prec = 0.0
    for from_time, to_time in tracks:
        if from_time == 0.0 or to_time == end_time:
            radius = 0.0
        else:
            radius = vagueness / 2.0
        if to_prec < from_time:
            tier.create_annotation(sppasLocation(sppasInterval(sppasPoint(to_prec, radius), sppasPoint(from_time, radius))), sppasLabel(sppasTag(SIL_ORTHO)))
        tier.create_annotation(sppasLocation(sppasInterval(sppasPoint(from_time, radius), sppasPoint(to_time, radius))), sppasLabel(sppasTag('ipu_%d' % (i + 1))))
        i += 1
        to_prec = to_time
    begin = sppasPoint(to_prec, vagueness / 2.0)
    if begin < end_time:
        tier.create_annotation(sppasLocation(sppasInterval(begin, sppasPoint(end_time))), sppasLabel(sppasTag(SIL_ORTHO)))
    return tier
convert

Search for IPUs in the given channel.

Parameters
  • channel: (Channel) Input channel
Returns
  • (sppasTier)
View Source
def convert(self, channel):
    """Search for IPUs in the given channel.

        :param channel: (Channel) Input channel
        :returns: (sppasTier)

        """
    searcher = SearchIPUs(channel=channel)
    searcher.set_vol_threshold(self._options['threshold'])
    searcher.set_win_length(self._options['win_length'])
    searcher.set_min_sil(self._options['min_sil'])
    searcher.set_min_ipu(self._options['min_ipu'])
    searcher.set_shift_start(self._options['shift_start'])
    searcher.set_shift_end(self._options['shift_end'])
    tracks = searcher.get_tracks(time_domain=True)
    tier = self.tracks_to_tier(tracks, channel.get_duration(), searcher.get_vagueness())
    self._set_meta(searcher, tier)
    return tier
run

Run the automatic annotation process on an input.

Parameters
  • input_files: (list of str) Audio
  • output: (str) the output file name
Returns
  • (sppasTranscription)
View Source
def run(self, input_files, output=None):
    """Run the automatic annotation process on an input.

        :param input_files: (list of str) Audio
        :param output: (str) the output file name
        :returns: (sppasTranscription)

        """
    audio_speech = audioopy.aio.open(input_files[0])
    framerate = audio_speech.get_framerate()
    n = audio_speech.get_nchannels()
    if n != 1:
        raise IOError('An audio file with only one channel is expected. Got {:d} channels.'.format(n))
    idx = audio_speech.extract_channel(0)
    channel = audio_speech.get_channel(idx)
    tier = self.convert(channel)
    trs_output = sppasTranscription(self.name)
    trs_output.set_meta('media_sample_rate', str(framerate))
    trs_output.set_meta('annotation_result_of', input_files[0])
    trs_output.append(tier)
    extm = os.path.splitext(input_files[0])[1].lower()[1:]
    media = sppasMedia(os.path.abspath(input_files[0]), mime_type='audio/' + extm)
    tier.set_media(media)
    if output is not None:
        output_file = self.fix_out_file_ext(output)
        parser = sppasTrsRW(output_file)
        parser.write(trs_output)
        return [output_file]
    return trs_output
run_for_batch_processing

Perform the annotation on a file.

This method is called by 'batch_processing'. It fixes the name of the

output file. If the output file is already existing, the annotation

is cancelled (the file won't be overridden). If not, it calls the run

method.

Parameters
  • input_files: (list of str) the inputs to perform a run
Returns
  • output file name or None
View Source
def run_for_batch_processing(self, input_files):
    """Perform the annotation on a file.

        This method is called by 'batch_processing'. It fixes the name of the
        output file. If the output file is already existing, the annotation
        is cancelled (the file won't be overridden). If not, it calls the run
        method.

        :param input_files: (list of str) the inputs to perform a run
        :returns: output file name or None

        """
    root_pattern = self.get_out_name(input_files[0])
    ext = []
    for e in sppasTrsRW.annot_extensions():
        if e not in ('.txt',):
            ext.append(e)
    exist_out_name = sppasBaseAnnotation._get_filename(root_pattern, ext)
    if exist_out_name is not None:
        out_name = self.fix_out_file_ext(root_pattern)
        if exist_out_name.lower() == out_name.lower():
            return list()
        else:
            try:
                parser = sppasTrsRW(exist_out_name)
                t = parser.read()
                parser.set_filename(out_name)
                parser.write(t)
                self.logfile.print_message(_info(1302).format(out_name), indent=2, status=annots.warning)
                return [out_name]
            except:
                pass
    try:
        new_files = self.run(input_files, root_pattern)
    except Exception as e:
        new_files = list()
        self.logfile.print_message('{:s}\n'.format(str(e)), indent=2, status=-1)
    return new_files
get_input_extensions

Extensions that the annotation expects for its input filename.

View Source
@staticmethod
def get_input_extensions():
    """Extensions that the annotation expects for its input filename."""
    return [sppasFiles.get_informat_extensions('AUDIO')]

Private functions

_set_meta

Set meta values to the tier.

Parameters
  • searcher
  • tier
View Source
def _set_meta(self, searcher, tier):
    """Set meta values to the tier."""
    tier.set_meta('required_threshold_volume', str(searcher.get_vol_threshold()))
    tier.set_meta('estimated_threshold_volume', str(searcher.get_effective_threshold()))
    tier.set_meta('minimum_silence_duration', str(searcher.get_min_sil_dur()))
    tier.set_meta('minimum_ipus_duration', str(searcher.get_min_ipu_dur()))
    tier.set_meta('shift_ipus_start', str(searcher.get_shift_start()))
    tier.set_meta('shift_ipus_end', str(searcher.get_shift_end()))
    meta = ('rms_min', 'rms_max', 'rms_mean', 'rms_median', 'rms_coefvar')
    for key, value in zip(meta, searcher.get_rms_stats()):
        tier.set_meta(str(key), str(value))
    self.logfile.print_message('Information: ', indent=1)
    if searcher.get_vol_threshold() == 0:
        self.logfile.print_message('Automatically estimated threshold volume value: {:d}'.format(searcher.get_effective_threshold()), indent=2)
    self.logfile.print_message('Number of IPUs found: {:s}'.format(tier.get_meta('number_of_ipus')), indent=2)