# Copyright (c) 2017-2019, Stefan Grönke
# Copyright (c) 2014-2018, iocage
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted providing that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""ioc module for host devfs rule configuration."""
import typing
import os.path
import re
import libioc.errors
import libioc.helpers
[docs]class DevfsRuleset(list):
"""
Representation of a devfs ruleset in the devfs.rules file.
DevfsRuleset instances behave like standard lists and can store strings
that multiple lines (string)
"""
PATTERN = re.compile(r"""^\[(?P<name>[a-z](?:[a-z0-9\-_]*[a-z0-9])?)=
(?P<number>[0-9]+)\]\s*(?:\#\s*(?P<comment>.*))?$""", re.X)
name: typing.Optional[str]
number: int
comment: typing.Optional[str]
def __init__(
self,
value: typing.Optional[str]=None,
number: typing.Optional[int]=None,
comment: typing.Optional[str]=None
) -> None:
"""
Initialize the DevfsRuleset.
Args:
value (string): (optional)
If specified in combination with a number, this parameter is
interpreted as the ruleset name. Otherwise it is parsed with
the expectation to find a [<name>=<number>] ruleset definition.
When value is not specified or None, DevfsRuleset is assumed
to be new or unspecified (cannot be exported before name and
number were assigned at a later time)
number (int): (optional)
The number of the ruleset. Must be specified to export the
ruleset, but is like the name not required to compare the
ruleset with another
"""
name: typing.Optional[str]
# when only one argument is passed, it's a line that need to be parsed
if value is None and number is None:
# name and number will be assigned later
name = None
elif (number is None) and (isinstance(value, str) is True):
name, number, comment = self._parse_line(str(value))
self.name = name
self.comment = comment
if number is not None:
self.number = number
list.__init__(self)
[docs] def has_rule(self, rule: str) -> bool:
"""
Return True if the rule is part of the current ruleset.
Args:
rule (string):
The rule string to be compared with current rules of the
ruleset instance
"""
return (rule in self) is True
[docs] def append(self, rule: str) -> None:
"""Append a rule to the devfs rules."""
if rule not in self:
list.append(self, rule)
[docs] def clone(self, source_ruleset: 'DevfsRuleset') -> None:
"""
Clone the rules from another ruleset.
Args:
source_ruleset (libioc.DevfsRules.DevfsRuleset):
Ruleset to copy all rules from
"""
for rule in source_ruleset:
self.append(rule)
def _parse_line(
self,
line: str
) -> typing.Tuple[str, int, typing.Optional[str]]:
# marks beginning of a new ruleset
ruleset_match = re.search(DevfsRuleset.PATTERN, line)
if ruleset_match is not None:
name = str(ruleset_match.group("name"))
number = int(ruleset_match.group("number"))
comment = ruleset_match.group("comment")
return name, number, comment
raise SyntaxError("DevfsRuleset line parsing failed")
def __str__(self) -> str:
"""Return the devfs ruleset as string."""
ruleset_line = f"[{self.name}={self.number}]"
if self.comment is not None:
ruleset_line += f" # {self.comment}"
output = [ruleset_line] + [str(x) for x in self]
return "\n".join(output) + "\n"
[docs]class DevfsRules(list):
"""
Abstraction for the hosts /etc/devfs.rules.
Read and edit devfs rules in a programmatic way.
Restart the devfs service after applying changes.
"""
_rules_file: str
_ruleset_number_index: typing.Dict[int, int]
_ruleset_name_index: typing.Dict[str, int]
_system_rule_lines: typing.List[int]
def __init__(
self,
rules_file: str="/etc/devfs.rules",
logger: typing.Optional['libioc.Logger.Logger']=None
) -> None:
"""
Initialize a DevfsRules manager for devfs.rules files.
Args:
rules_file (string): (default=/etc/devfs.rules)
Path of the devfs.rules file
logger (libioc.Logger): (optional)
Instance of the logger that is passed to occuring errors
"""
self.logger = logger
# index rulesets to find duplicated and provide easy access
self._ruleset_number_index = {}
self._ruleset_name_index = {}
# remember all lines that were loaded from defaults (system)
self._system_rule_lines = []
list.__init__(self)
# will automatically read from file - needs to be the last item
self.rules_file = rules_file
[docs] def append( # noqa: T484
self,
ruleset: typing.Union[DevfsRuleset, str],
is_system_rule: bool=False
) -> typing.Union[DevfsRuleset, str]:
"""
Add a DevfsRuleset to the list.
The rulesets added become indexed, so that lookups and duplication
checks are easy and fast
Args:
ruleset (libioc.DevfsRules.DevfsRuleset|string):
The ruleset that gets added if it is not already in the list
"""
next_line_index = len(self)
if ruleset is None or isinstance(ruleset, str):
list.append(self, ruleset)
if is_system_rule is True:
self._system_rule_lines.append(next_line_index)
return ruleset
if ruleset.name in self._ruleset_name_index.keys():
raise libioc.errors.DuplicateDevfsRuleset(
reason=f"Ruleset named '{ruleset.name}' already present",
devfs_rules_file=self.rules_file,
logger=self.logger
)
if ruleset.number in self._ruleset_number_index.keys():
raise libioc.errors.DuplicateDevfsRuleset(
reason=f"Ruleset number '{ruleset.number}' already present",
devfs_rules_file=self.rules_file,
logger=self.logger
)
if ruleset.name is None:
raise libioc.errors.MissingDevfsRulesetName(
devfs_rules_file=self.rules_file,
logger=self.logger
)
# build indexes
self._ruleset_number_index[ruleset.number] = next_line_index
self._ruleset_name_index[ruleset.name] = next_line_index
if is_system_rule is True:
self._system_rule_lines.append(next_line_index)
list.append(self, ruleset)
return ruleset
[docs] def new_ruleset(self, ruleset: DevfsRuleset) -> int:
"""
Append a new ruleset.
Similar to append(), but automatically assigns a new number
Args:
ruleset (libioc.DevfsRules.DevfsRuleset):
The new devfs ruleset that is going to be added
Returns:
int: The devfs ruleset number of the created ruleset
"""
ruleset.number = self.next_number
if ruleset.name is None:
ruleset.name = f"iocage_auto_{ruleset.number}"
self.append(ruleset)
return ruleset.number
[docs] def find_by_name(self, rule_name: str) -> DevfsRuleset:
"""Find a devfs rule by its name."""
index = self._ruleset_name_index
ruleset = self[index[rule_name]] # type: DevfsRuleset
return ruleset
[docs] def find_by_number(self, rule_number: int) -> DevfsRuleset:
"""Find a devfs rule by its rule number."""
index = self._ruleset_number_index
ruleset = self[index[rule_number]] # type: DevfsRuleset
return ruleset
@property
def default_rules_file(self) -> str:
"""Return the default path to the devfs rules file."""
return "/etc/defaults/devfs.rules"
@property
def rules_file(self) -> str:
"""Path of the devfs.rules file."""
return self._rules_file
@rules_file.setter
def rules_file(self, devfs_rules_path: str) -> None:
"""
Set different devfs rules file.
When setting a new devfs.rules source, it is read automatically
"""
self._rules_file = devfs_rules_path
try:
self.read_rules()
except FileNotFoundError:
pass
@property
def next_number(self) -> int:
"""
Return the next highest ruleset number that is available.
This counting includes the systems default devfs rulesets.
"""
return len(self._ruleset_name_index.keys()) + 1
[docs] def read_rules(self) -> None:
"""
Read existing devfs.rules file.
Existing devfs rules get reset and read from the rules_file
"""
if self.logger:
self.logger.debug(f"Reading devfs.rules from {self.rules_file}")
self.clear()
self._read_rules_file(self.default_rules_file, system=True)
self._read_rules_file(self.rules_file)
def _read_rules_file(
self,
file: str,
system: bool=False
) -> None:
f = open(file, "r", encoding="utf-8")
current_ruleset = None
for line in f.readlines():
line = line.strip().rstrip("\n")
# add comments and empty lines as string
if line.startswith("#") or (line == ""):
self.append(line, is_system_rule=system)
continue
try:
current_ruleset = DevfsRuleset(line)
self.append(current_ruleset, is_system_rule=system)
continue
except SyntaxError:
pass
# the first item must be a ruleset
if current_ruleset is None:
raise libioc.errors.InvalidDevfsRulesSyntax(
devfs_rules_file=self.rules_file,
reason="Rules must follow a ruleset declaration",
logger=self.logger
)
current_ruleset.append(line)
f.close()
[docs] def save(self) -> None:
"""
Apply changes to the devfs.rules file.
Automatically restarts devfs service when the file was changed.
"""
content_before = None
if os.path.isfile(self.rules_file):
f = open(self.rules_file, "r+")
content_before = f.read()
f.seek(0)
else:
f = open(self.rules_file, "w")
new_content = self.__str__()
if content_before == new_content:
if self.logger is not None:
self.logger.verbose(
f"devfs.rules file {self.rules_file} unchanged"
)
else:
if self.logger is not None:
self.logger.verbose(
f"Writing devfs.rules to {self.rules_file}"
)
self.logger.spam(new_content, indent=1)
f.write(new_content)
f.truncate()
self._restart_devfs_service()
f.close()
def _restart_devfs_service(self) -> None:
"""Restart devfs service after changing devfs.rules."""
if self.logger is not None:
self.logger.debug("Restarting devfs service")
libioc.helpers.exec(["service", "devfs", "restart"])
def __str__(self) -> str:
"""Return the devfs.rules content as string."""
out_lines = []
for i, line in enumerate(self):
if i not in self._system_rule_lines:
out_lines.append(str(line))
return "\n".join(out_lines)