Source code for libioc.Filter

# Copyright (c) 2017-2019, Stefan Grönke
# Copyright (c) 2014-2018, iocage
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted providing that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""ioc filters for ListableResource."""
import re
import typing

import libioc.errors
import libioc.helpers
import libioc.ResourceSelector

_ResourceSelector = libioc.ResourceSelector.ResourceSelector
_TermValuesType = typing.Union[
    str,
    typing.List[str],
    libioc.ResourceSelector.ResourceSelector
]

_REGEX_PATTERN_SPLIT_COMMA = re.compile(r"(?<!\\),")


[docs]def match_filter(value: str, filter_string: str) -> bool: """Return True when the value matches the filter string.""" escaped_characters = [".", "$", "^", "(", ")", "?"] for character in escaped_characters: filter_string = filter_string.replace(character, f"\\{character}") filter_string = filter_string.replace("*", ".*") filter_string = filter_string.replace("+", ".+") pattern = f"^{filter_string}$" match = re.match(pattern, value) return match is not None
[docs]class Term(list): """A single filter term.""" glob_characters = ["*", "+"] def __init__( self, key: str, values: typing.Optional[_TermValuesType]=[] ) -> None: self.key = key if isinstance(values, str): data = self._split_filter_values(values) elif isinstance(values, list): data = values elif isinstance(values, _ResourceSelector): data = [values] elif values is None: data = [] list.__init__(self, data) @property def short(self) -> bool: """Return True if the short name of a UUID be used.""" return (self.key == "name") is True
[docs] def matches_resource( self, resource: 'libioc.Resource.Resource' ) -> bool: """Return True if the term matches the resource.""" value = resource.get(self.key) return self.matches(value, self.short)
[docs] def matches(self, value: typing.Any, short: bool=False) -> bool: """ Return True if the value matches the term. Args: value: The value that is matched against the filter term short: When value has a length of 8 characters, this argument allows to match a jail's shortname as well. This is required for selecting jails with UUIDs by the first part of the name """ # match any item of a list if (value is not None) and isinstance(value, list): # `short` not required here return any(map(self.matches, value)) input_value = libioc.helpers.to_string(value) for filter_value in self: if isinstance(filter_value, str): if self._match_filter(input_value, filter_value, short): return True elif isinstance(filter_value, _ResourceSelector): if self._match_filter(input_value, filter_value.name, short): return True elif isinstance(filter_value, list): results = list(map( lambda x: self._match_filter(input_value, x, short), filter_value )) if any(results): return True return False
def _match_filter( self, value: str, filter_string: str, short: bool=False ) -> bool: if match_filter(value, filter_string) is True: return True has_no_globs = not self._filter_string_has_globs(filter_string) if has_no_globs is True: # match against humanreadable names as well has_humanreadble_length = (len(filter_string) == 8) if (has_humanreadble_length is True) and (short is True): shortname = libioc.helpers.to_humanreadable_name(value) if shortname == filter_string: return True _parse_user_input = libioc.helpers.parse_user_input parsed_value = _parse_user_input(value) parsed_filter = _parse_user_input(filter_string) return (parsed_value == parsed_filter) is True return False def _filter_string_has_globs(self, filter_string: str) -> bool: for glob in self.glob_characters: if glob in filter_string: return True return False def _split_filter_values(self, user_input: str) -> typing.List[str]: return re.split(_REGEX_PATTERN_SPLIT_COMMA, user_input) def _validate_name_filter_string(self, filter_string: str) -> bool: globs = self.glob_characters # Allow glob only filters if (len(filter_string) == 1) and (filter_string in globs): return True # replace all glob charaters in user input filter_string_without_globs = "" for i, char in enumerate(filter_string): if char not in globs: filter_string_without_globs += char return libioc.helpers.validate_name( filter_string_without_globs ) is True def __str__(self) -> str: """Return the Filter Term as string.""" output: str = "" if self.key != "name": output += f"{self.key}=" output += ",".join([str(x) for x in self]) return f"{output}" def __repr__(self) -> str: """Return the Term in human and robot friendly format.""" return self.__str__()
[docs]class Terms(list): """ A group of filter terms. Each item in this group must match for a resource to pass the filter. This can be interpreted as logical AND """ def __init__( self, terms: typing.Optional[ typing.Iterable[typing.Union[Term, str]] ]=None, logger: typing.Optional['libioc.Logger.Logger']=None ) -> None: self.logger = logger list.__init__(self, []) Terms.set(self, terms)
[docs] def set( self, terms: typing.Optional[typing.Union[ str, typing.Iterable[typing.Union[Term, str]] ]] ) -> None: """Clear and set all terms from input data.""" self.clear() try: libioc.helpers.parse_none(terms) return except TypeError: pass data: typing.List[typing.Union[Term, str]] = [] if terms is not None: _terms = [terms] if isinstance(terms, str) else terms for term in _terms: if isinstance(term, str): data += self._parse_terms(term) elif isinstance(term, Term): data.append(term) self.extend(data)
[docs] def add( self, term: typing.Union[Term, str] ) -> None: """ Add a Term to the list of Terms. Args: term (libioc.Filter.Term, str): Interface name inside the jail """ _term = term if isinstance(term, Term) else self._parse_term(term) list.append(self, _term)
[docs] def match_resource( self, resource: 'libioc.Resource.Resource' ) -> bool: """Return True if all Terms match the resource.""" for term in self: if term.matches_resource(resource) is False: return False return True
[docs] def match_key(self, key: str, value: str) -> bool: """ Check if a value matches for a given key. Returns True if the given value matches all terms for the specified key Returns Fals if one of the terms does not match """ for term in self: if term.key != key: continue if (key == "name"): short = True if term.matches(value, short) is False: return False return True
[docs] def match_source(self, source_name: str) -> bool: """Check if the source name matches the filter terms.""" for term in self: if term.key != "name": continue # All name terms have been transformed to ResourceSelector resource_selector = term[0] if resource_selector.source_name is None: return True return (resource_selector.source_name == source_name) is True # no name term or none at all return True
def _parse_term(self, user_input: str) -> Term: value: typing.Any try: prop, value = user_input.split("=", maxsplit=1) except ValueError: prop = "name" value = user_input if prop == "name": value = [libioc.ResourceSelector.ResourceSelector( partial_value, logger=self.logger ) for partial_value in re.split( _REGEX_PATTERN_SPLIT_COMMA, value )] else: value = libioc.helpers.to_string( libioc.helpers.parse_user_input(value) ) return Term(prop, value) def _parse_terms(self, user_input: str) -> typing.List[Term]: terms = [] user_input_terms = re.split(r'(?<!\\)\s+', user_input) for user_input_term in user_input_terms: terms.append(self._parse_term(user_input_term)) return terms def __str__(self) -> str: """Return the Filter.Terms as string.""" return " ".join([str(x) for x in self]) def __repr__(self) -> str: """Return the Terms in human and robot friendly format.""" return self.__str__()