SPPAS 4.20

Module sppas.src.annotations

Class sppasLexRep

Description

SPPAS integration of the speaker lexical variation annotation.

Main differences compared to repetitions:

The span option is used to fix the max number of continuous tokens to analyze.

The span window has a duration limit.

Constructor

Create a new sppasLexVar instance.

Log is used for a better communication of the annotation process and its

results. If None, logs are redirected to the default logging system.

Parameters
  • log: (sppasLog) Human-readable logs.
View Source
def __init__(self, log=None):
    """Create a new sppasLexVar instance.

    Log is used for a better communication of the annotation process and its
    results. If None, logs are redirected to the default logging system.

    :param log: (sppasLog) Human-readable logs.

    """
    super(sppasLexRep, self).__init__('lexrep.json', log)
    self.__rules = OtherRules(self._stop_words)

Public functions

fix_options

Fix all options.

Parameters
  • options: list of sppasOption instances
View Source
def fix_options(self, options):
    """Fix all options.

        :param options: list of sppasOption instances

        """
    for opt in options:
        key = opt.get_key()
        if 'spandur' == key:
            self.set_span_duration(opt.get_value())
        elif 'span' == key:
            self.set_span(opt.get_value())
        elif 'stopwords' == key:
            self.set_stopwords(opt.get_value())
        elif 'alpha' == key:
            self.set_alpha(opt.get_value())
        elif 'pattern' in key:
            self._options[key] = opt.get_value()
        else:
            raise AnnotationOptionError(key)
set_span

Set the max span, in number of words.

Parameters
  • value: (int) Max nb of tokens in a span window.
View Source
def set_span(self, value):
    """Set the max span, in number of words.

        :param value: (int) Max nb of tokens in a span window.

        """
    if value < 5 or value > 200:
        raise Exception('Invalid span value')
    self._options['span'] = int(value)
set_span_duration

Set the spandur option.

Parameters
  • value: (float, int) Max duration of a span window.
View Source
def set_span_duration(self, value):
    """Set the spandur option.

        :param value: (float, int) Max duration of a span window.

        """
    self._options['spandur'] = value
set_stopwords

Set the stopwords option.

Parameters
  • value: (bool) Enable the fact to add estimated stopwords
View Source
def set_stopwords(self, value):
    """Set the stopwords option.

        :param value: (bool) Enable the fact to add estimated stopwords

        """
    self._options['stopwords'] = bool(value)
set_alpha

Set the alpha option.

Parameters
  • value: (float) Coefficient to estimated stopwords
View Source
def set_alpha(self, value):
    """Set the alpha option.

        :param value: (float) Coefficient to estimated stopwords

        """
    if value < 0.1 or value > 2.0:
        raise Exception('Invalid alpha value')
    self._options['alpha'] = float(value)
tier_to_list

Create a list with the tokens contained in a tier.

Parameters
  • tier: (sppasTier)
  • loc: (bool) if true create the corresponding list of sppasLocation()
Returns
  • (list, list) list of unicode content and list of location
View Source
@staticmethod
def tier_to_list(tier, loc=False):
    """Create a list with the tokens contained in a tier.

        :param tier: (sppasTier)
        :param loc: (bool) if true create the corresponding list of sppasLocation()
        :returns:  (list, list) list of unicode content and list of location

        """
    content = list()
    localiz = list()
    for ann in tier:
        for label in ann.get_labels():
            for tag, score in label:
                if tag.is_speech():
                    content.append(tag.get_content())
                    if loc is True:
                        localiz.append(ann.get_location())
    return (content, localiz)
get_longest

Return the index of the last token of the longest repeated sequence.

No matter if a non-speech event occurs in the middle of the repeated

sequence and no matter if a non-speech event occurs in the middle of

the source sequence.

No matter if tokens are not repeated in the same order.

Parameters
  • speaker1: (DataSpeaker) Entries of speaker 1
  • speaker2: (DataSpeaker) Entries of speaker 2
Returns
  • (int) Index or -1
View Source
@staticmethod
def get_longest(speaker1, speaker2):
    """Return the index of the last token of the longest repeated sequence.

        No matter if a non-speech event occurs in the middle of the repeated
        sequence and no matter if a non-speech event occurs in the middle of
        the source sequence.
        No matter if tokens are not repeated in the same order.

        :param speaker1: (DataSpeaker) Entries of speaker 1
        :param speaker2: (DataSpeaker) Entries of speaker 2
        :returns: (int) Index or -1

        """
    last_token = -1
    for index1 in range(len(speaker1)):
        if speaker1.is_word(index1) is True:
            param2 = 0
            repet_idx = speaker1.is_word_repeated(index1, param2, speaker2)
            if repet_idx > -1:
                last_token = index1
            else:
                break
    return last_token
select

Append (or not) a repetition.

Parameters
  • index1: (int) end index of the entry of the source (speaker1)
  • speaker1: (DataSpeaker) Entries of speaker 1
  • speaker2: (DataSpeaker) Entries of speaker 2
Returns
  • (bool)
View Source
def select(self, index1, speaker1, speaker2):
    """Append (or not) a repetition.

        :param index1: (int) end index of the entry of the source (speaker1)
        :param speaker1: (DataSpeaker) Entries of speaker 1
        :param speaker2: (DataSpeaker) Entries of speaker 2
        :returns: (bool)

        """
    keep_me = self.__rules.rule_syntagme(0, index1, speaker1)
    if keep_me is False:
        keep_me = self.__rules.rule_strict(0, index1, speaker1, speaker2)
    return keep_me
create_tier

Create a tier from content end localization lists.

Parameters
  • sources: (dict) dict of sources -- in fact, the indexes.
  • locations: (list) list of location corresponding to the tokens
Returns
  • (sppasTier)
View Source
@staticmethod
def create_tier(sources, locations):
    """Create a tier from content end localization lists.

        :param sources: (dict) dict of sources -- in fact, the indexes.
        :param locations: (list) list of location corresponding to the tokens
        :returns: (sppasTier)

        """
    tier_content = sppasTier('LexRepContent')
    for lexreprise in sources:
        start_idx = lexreprise.get_start()
        end_idx = start_idx + lexreprise.get_end()
        loc_begin = locations[start_idx]
        loc_end = locations[end_idx]
        begin_point = loc_begin.get_lowest_localization()
        end_point = loc_end.get_highest_localization()
        location = sppasLocation(sppasInterval(begin_point, end_point))
        tier_content.create_annotation(location, lexreprise.get_labels())
    return tier_content
windowing

Return the list of DataSpeaker matching the given content.

Parameters
  • content: (list) List of entries
  • location: (list) List of locations of the entries
Returns
  • list of DataSpeaker
View Source
def windowing(self, content, location=None):
    """Return the list of DataSpeaker matching the given content.

        :param content: (list) List of entries
        :param location: (list) List of locations of the entries
        :returns: list of DataSpeaker

        """
    span_tok = self._options['span']
    span_dur = self._options['spandur']
    windows = list()
    for i in range(len(content)):
        end_size = min(span_tok, len(content) - i)
        if location is not None and end_size > 1:
            win_loc = location[i:i + end_size]
            start_point = win_loc[0].get_lowest_localization()
            end_point = win_loc[end_size - 1].get_highest_localization()
            win_dur = end_point.get_midpoint() - start_point.get_midpoint()
            while win_dur > span_dur and end_size > 0:
                end_point = win_loc[end_size - 1].get_highest_localization()
                win_dur = end_point.get_midpoint() - start_point.get_midpoint()
                if win_dur <= span_dur:
                    break
                end_size -= 1
            if end_size < min(span_tok, len(content) - i):
                logging.debug(' ... window was reduced to {} tokens.'.format(end_size + 1))
        win_tok = content[i:i + end_size]
        windows.append(DataSpeaker(win_tok))
    return windows
lexical_variation_detect

Detect the lexical variations between 2 tiers.

Parameters
  • tier1: (sppasTier)
  • tier2: (sppasTier)
View Source
def lexical_variation_detect(self, tier1, tier2):
    """Detect the lexical variations between 2 tiers.

        :param tier1: (sppasTier)
        :param tier2: (sppasTier)

        """
    content_tier1, loc_tier1 = self.tier_to_list(tier1, True)
    content_tier2, loc_tier2 = self.tier_to_list(tier2, True)
    window_list1 = self.windowing(content_tier1, loc_tier1)
    window_list2 = self.windowing(content_tier2, loc_tier2)
    sources1 = self._detect_all_sources(window_list1, window_list2)
    sources2 = self._detect_all_sources(window_list2, window_list1)
    tier1 = self.create_tier(sources1, loc_tier1)
    tier2 = self.create_tier(sources2, loc_tier2)
    tier1.set_name(tier1.get_name() + '-1')
    tier2.set_name(tier2.get_name() + '-2')
    return (tier1, tier2)
get_inputs

Return 2 tiers with aligned tokens.

Parameters
  • input_files: (list)
Raises

NoTierInputError

Returns
  • (sppasTier)
View Source
def get_inputs(self, input_files):
    """Return 2 tiers with aligned tokens.

        :param input_files: (list)
        :raise: NoTierInputError
        :return: (sppasTier)

        """
    if len(input_files) != 2:
        raise Exception('Invalid format of input files.')
    tier_src = None
    for filename in input_files[0]:
        parser = sppasTrsRW(filename)
        trs_input = parser.read()
        if tier_src is None:
            tier_src = sppasFindTier.aligned_tokens(trs_input)
    if tier_src is None:
        logging.error('A source tier with time-aligned tokens was expected but not found.')
        raise NoTierInputError
    tier_echo = None
    for filename in input_files[1]:
        parser = sppasTrsRW(filename)
        trs_input = parser.read()
        if tier_echo is None:
            tier_echo = sppasFindTier.aligned_tokens(trs_input)
    if tier_echo is None:
        logging.error('An echo tier with time-aligned tokens was expected but not found.')
        raise NoTierInputError
    return (tier_src, tier_echo)
run

Run the automatic annotation process on an input.

Parameters
  • input_files: (list of list of str) time-aligned tokens of 2 files
  • output: (str) the output file name
Returns
  • (sppasTranscription)
View Source
def run(self, input_files, output=None):
    """Run the automatic annotation process on an input.

        :param input_files: (list of list of str) time-aligned tokens of 2 files
        :param output: (str) the output file name
        :returns: (sppasTranscription)

        """
    tier_tokens_src, tier_tokens_echo = self.get_inputs(input_files)
    tier1, tier2 = self.lexical_variation_detect(tier_tokens_src, tier_tokens_echo)
    trs_output = sppasTranscription(self.name)
    trs_output.set_meta('annotation_result_of', input_files[0][0])
    if len(self._word_strain) > 0:
        tier_tokens_src.set_name(tier_tokens_src.get_name() + '-1')
        trs_output.append(tier_tokens_src)
    if self._options['stopwords'] is True:
        stopwords1 = self.make_stop_words(tier_tokens_src)
        stopwords1.set_name(stopwords1.get_name() + '-1')
        trs_output.append(stopwords1)
    trs_output.append(tier1)
    if len(self._word_strain) > 0:
        tier_tokens_echo.set_name(tier_tokens_echo.get_name() + '-2')
        trs_output.append(tier_tokens_echo)
    if self._options['stopwords'] is True:
        stopwords2 = self.make_stop_words(tier_tokens_echo)
        stopwords2.set_name(stopwords2.get_name() + '-2')
        trs_output.append(stopwords2)
    trs_output.append(tier2)
    if output is not None:
        if len(trs_output) > 0:
            output_file = self.fix_out_file_ext(output)
            parser = sppasTrsRW(output_file)
            parser.write(trs_output)
            return [output_file]
        else:
            raise EmptyOutputError
    return trs_output
get_output_pattern

Pattern this annotation uses in an output filename.

View Source
def get_output_pattern(self):
    """Pattern this annotation uses in an output filename."""
    return self._options.get('outputpattern', '-rms')

Private functions

_get_longest_selected

Return the end-index of the longest selected sequence.

Parameters
  • data_spk1
  • data_spk2
View Source
def _get_longest_selected(self, data_spk1, data_spk2):
    """Return the end-index of the longest selected sequence."""
    spk2_echo_idx = sppasLexRep.get_longest(data_spk1, data_spk2)
    if spk2_echo_idx != -1:
        if self.select(spk2_echo_idx, data_spk1, data_spk2):
            return spk2_echo_idx
    return -1
_add_source

Add the source in the list of sources.

Parameters
  • sources
  • win_idx
  • end
  • dataspk
View Source
@staticmethod
def _add_source(sources, win_idx, end, dataspk):
    """Add the source in the list of sources."""
    if (win_idx, end) not in sources:
        lex_reprise = LexReprise(win_idx, end)
        lex_reprise.set_content(dataspk)
        sources.append(lex_reprise)
_detect_all_sources

Return all reprises of speaker1 in speaker2.

Returns
  • (dict) dict of sources
  • key: (indexstart, indexend)
  • value: the number of time the source is repeated
Parameters
  • win_spk1
  • win_spk2
View Source
def _detect_all_sources(self, win_spk1, win_spk2):
    """Return all reprises of speaker1 in speaker2.

        :return: (dict) dict of sources

        - key: (index_start, index_end)
        - value: the number of time the source is repeated

        """
    sources = list()
    prev_max_index = -1
    spk1_widx = 0
    while spk1_widx < len(win_spk1):
        data_spk1 = win_spk1[spk1_widx]
        max_index = -1
        spk2_widx = 0
        while spk2_widx < len(win_spk2):
            data_spk2 = win_spk2[spk2_widx]
            spk2_echo_idx = self._get_longest_selected(data_spk1, data_spk2)
            if spk2_echo_idx > -1 and spk1_widx + spk2_echo_idx > prev_max_index:
                if spk2_echo_idx > max_index:
                    max_index = spk2_echo_idx
                    if max_index == self._options['span']:
                        break
            spk2_widx += 1
        if max_index > -1:
            sppasLexRep._add_source(sources, win_idx=spk1_widx, end=max_index, dataspk=data_spk1)
            prev_max_index = spk1_widx + max_index
        spk1_widx += 1
    return sources
_merge_sources

Merge sources if content is the same.

Parameters
  • sources
View Source
def _merge_sources(self, sources):
    """Merge sources if content is the same."""
    return sources