Source code for libioc.Release

# Copyright (c) 2017-2019, Stefan Grönke
# Copyright (c) 2014-2018, iocage
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted providing that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in the
#    documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""ioc release module."""
import typing
import hashlib
import os
import urllib.request
import urllib.error
import urllib.parse
import re

import libzfs

import libioc.ZFS
import libioc.errors
import libioc.helpers
import libioc.helpers_object
import libioc.events
import libioc.LaunchableResource
import libioc.ResourceSelector
import libioc.Jail
import libioc.SecureTarfile

# MyPy
import libioc.Resource
import libioc.Host
import libioc.Logger
import libioc.Config.Host
import libioc.Config.Jail.File.RCConf
import libioc.Config.Jail.File.SysctlConf


[docs]class ReleaseResource(libioc.LaunchableResource.LaunchableResource): """Resource that represents an iocage release.""" _release: typing.Optional['ReleaseGenerator'] _hashes: typing.Optional[typing.Dict[str, str]] host: libioc.Host.HostGenerator root_datasets_name: typing.Optional[str] def __init__( self, dataset: typing.Optional[libzfs.ZFSDataset]=None, dataset_name: typing.Optional[str]=None, config_type: str="auto", config_file: typing.Optional[str]=None, logger: typing.Optional['libioc.Logger.Logger']=None, zfs: typing.Optional[libioc.ZFS.ZFS]=None, host: typing.Optional['libioc.Host.HostGenerator']=None, release: typing.Optional['ReleaseGenerator']=None, root_datasets_name: typing.Optional[str]=None, ) -> None: self.host = libioc.helpers_object.init_host(self, host) self.root_datasets_name = root_datasets_name libioc.LaunchableResource.LaunchableResource.__init__( self, dataset=dataset, dataset_name=dataset_name, config_type=config_type, config_file=config_file, logger=logger, zfs=zfs ) self._release = release self._hashes = None @property def release(self) -> 'ReleaseGenerator': """ Return the release instance that belongs to the resource. Usually the resource becomes inherited from the Release itself. It can still be used linked to a foreign ReleaseGenerator by passing release as named attribute to the __init__ function """ if self._release is not None: return self._release elif isinstance(self, ReleaseGenerator): return self raise Exception( "Resource is not a valid release itself and has no linked release" ) @property def full_name(self) -> str: """ Return the full identifier of a jail. When more than one root dataset is managed by iocage, the full source and name are returned. Otherwise just the name. For example `mydataset/jailname` or just `jailname`. """ if len(self.host.datasets) > 1: return f"{self.source}/{self.name}" else: return str(self.name) @property def dataset_name(self) -> str: """ Return the name of the releases ZFS dataset. If the resource has no dataset or dataset_name assigned yet, the release id is used to find name the dataset """ try: return str(self._assigned_dataset_name) except AttributeError: pass return self._dataset_name_from_release_name @dataset_name.setter def dataset_name(self, value: str) -> None: """Set the releases dataset name.""" self._dataset_name = value @property def base_dataset(self) -> libzfs.ZFSDataset: """ Return the ZFS basejail dataset belonging to the release. base datasets are created from releases. They are required to start zfs-basejails. """ ds: libzfs.ZFSDataset = self.zfs.get_dataset(self.base_dataset_name) return ds @property def base_dataset_name(self) -> str: """Return the ZFS basejail datasets name belonging to the release.""" return f"{self._dataset_name_from_base_name}/root" @property def _dataset_name_from_release_name(self) -> str: return f"{self.source_dataset.releases.name}/{self.name}" @property def _dataset_name_from_base_name(self) -> str: return f"{self.source_dataset.base.name}/{self.name}" @property def source_dataset(self) -> 'libioc.Datasets.RootDatasets': """ Return the releases source root dataset. iocage can manage multiple source datasets (on different ZFS pools for instance). This property returns the RootDatasets instance that belongs to the release. """ try: assigned_name = str(self._assigned_dataset_name) return self.host.datasets.find_root_datasets(assigned_name) except AttributeError: pass if self.root_datasets_name is None: return self.host.datasets.main else: try: return self.host.datasets.__getitem__(self.root_datasets_name) except KeyError: raise libioc.errors.SourceNotFound(logger=self.logger) @property def source(self) -> str: """Return the name of the releases source root datasets.""" try: assigned_name = str(self._assigned_dataset_name) return str( self.host.datasets.find_root_datasets_name(assigned_name) ) except AttributeError: pass if self.root_datasets_name is None: return str(self.host.datasets.main_datasets_name) else: return str(self.root_datasets_name)
[docs]class ReleaseGenerator(ReleaseResource): """Release with generator interfaces.""" DEFAULT_RC_CONF: typing.Dict[str, typing.Union[str, bool]] = { "netif_enable": False, "sendmail_enable": False, "sendmail_submit_enable": False, "sendmail_msp_queue_enable": False, "sendmail_outbound_enable": False, "cron_flags": "-m ''", "syslogd_flags": "-ss" } DEFAULT_PERIODIC_CONF: typing.Dict[str, typing.Union[str, bool]] = { "daily_clean_hoststat_enable": False, "daily_status_mail_rejects_enable": False, "daily_status_include_submit_mailq": False, "daily_submit_queuerun": False, } DEFAULT_SYSCTL_CONF: typing.Dict[str, int] = { "net.inet.ip.fw.enable": 0 } _name: str patchlevel: typing.Optional[int] check_eol: bool logger: 'libioc.Logger.Logger' zfs: libioc.ZFS.ZFS host: 'libioc.Host.HostGenerator' _resource: libioc.Resource.Resource _assets: typing.List[str] _mirror_url: typing.Optional[str] def __init__( self, name: str, dataset: typing.Optional[libzfs.ZFSDataset]=None, dataset_name: typing.Optional[str]=None, config_type: str="auto", config_file: typing.Optional[str]=None, root_datasets_name: typing.Optional[str]=None, host: typing.Optional['libioc.Host.HostGenerator']=None, zfs: typing.Optional[libioc.ZFS.ZFS]=None, logger: typing.Optional['libioc.Logger.Logger']=None, check_hashes: bool=True, check_eol: bool=True, ) -> None: self.logger = libioc.helpers_object.init_logger(self, logger) self.zfs = libioc.helpers_object.init_zfs(self, zfs) self.host = libioc.helpers_object.init_host(self, host) resource_selector = libioc.ResourceSelector.ResourceSelector( name, logger=self.logger ) if resource_selector.source_name is not None: is_different = resource_selector.source_name != root_datasets_name if (root_datasets_name is not None) and (is_different is True): # ToDo: omit root_datasets_name at all ? raise libioc.errors.ConflictingResourceSelection( source_a=resource_selector.source_name, source_b=root_datasets_name, logger=self.logger ) else: root_datasets_name = resource_selector.source_name if libioc.helpers.validate_name(resource_selector.name) is False: raise NameError(f"Invalid 'name' for Release: '{name}'") self.name = name self._hbsd_release_branch = None self._mirror_url = None self._hashes = None self.check_hashes = check_hashes is True self.check_eol = check_eol is True ReleaseResource.__init__( self, host=self.host, logger=self.logger, zfs=self.zfs, root_datasets_name=root_datasets_name, dataset_name=dataset_name, config_type=config_type, config_file=config_file, release=self ) self._assets = ["base"] if self.host.distribution.name != "HardenedBSD": self._assets.append("lib32") @property def name(self) -> str: """Return the releases identifier.""" return self._name @name.setter def name(self, value: str) -> None: """Set the releases identifier (optionally including patchlevel).""" match = re.match(( r"^(?:(?P<source_dataset_name>.*)/)?" r"(?P<release_name>.*?)" r"(?:-p(?P<patchlevel>[0-9]+))?$" ), value) if match is None: raise libioc.errors.InvalidReleaseName( name=value, logger=self.logger ) self._name = match.group("release_name") patchlevel = match.group("patchlevel") self.patchlevel = None if (patchlevel is None) else int(patchlevel) if match.group("source_dataset_name") is not None: self.root_datasets_name = match.group("source_dataset_name") @property def full_name(self) -> str: """Return the release name including the patchlevel.""" if self.patchlevel is None: return self._name return f"{self._name}-p{self.current_snapshot_patchlevel}" @property def resource(self) -> 'libioc.Resource.Resource': """Return the releases resource.""" return self._resource @resource.setter def resource(self, value: 'libioc.Resource.Resource') -> None: """Set the releases resource.""" if value is None: self._resource = ReleaseResource( release=self, host=self.host, logger=self.logger, zfs=self.zfs ) else: self._resource = value @property def releases_folder(self) -> str: """Return the mountpoint of the iocage/releases dataset.""" return str(self.source_dataset.releases.mountpoint) @property def download_directory(self) -> str: """Return the download directory.""" return str(self.dataset.mountpoint) @property def root_dir(self) -> str: """Return the main directory of the release.""" try: if self.root_dataset.mountpoint: return str(self.root_dataset.mountpoint) except AttributeError: pass return f"{self.releases_folder}/{self.name}/root" @property def assets(self) -> typing.List[str]: """Return a list of release assets.""" return self._assets @assets.setter def assets(self, value: typing.Union[typing.List[str], str]) -> None: """Set the list of release assets.""" value = [value] if isinstance(value, str) else value self._assets = list(map( lambda x: x if not x.endswith(".txz") else x[:-4], value )) @property def real_name(self) -> str: """Map the release name on HardenedBSD.""" if self.host.distribution.name == "HardenedBSD": return f"HardenedBSD-{self.name}-{self.host.processor}-LATEST" return self.name @property def annotated_name(self) -> str: """ Return the release name with annotations. Annotations inform whether a release is newer then the host or EOL. """ annotations = set() if self.eol is True: annotations.add("EOL") if self.newer_than_host is True: annotations.add("Newer than Host") if len(annotations) > 0: return f"{self.name} ({', '.join(annotations)})" return f"{self.name}" @property def eol(self) -> typing.Optional[bool]: """ Return whether the release is EOL or checks are disabled. When check_eol is disabled, None is returned, True when the release name was found in the distributions eol_list. """ if not self.check_eol: return None if self.host.distribution.name == "FreeBSD": return (self.name in self.host.distribution.eol_list) is True elif self.host.distribution.name == "HardenedBSD": if "STABLE" in self.name: # stable releases are explicitly in the EOL list or supported return (self.name in self.host.distribution.eol_list) is True return (self.version_number in map( lambda x: self._parse_release_version(x), self.host.distribution.eol_list )) is True return False @property def current_snapshot_patchlevel(self) -> int: """Return the currently chosen patchlevel number or the latest.""" current_snapshot_name = self.current_snapshot.snapshot_name return int(current_snapshot_name.lstrip('p')) @property def current_snapshot(self) -> libzfs.ZFSSnapshot: """Return the manually configured or the latest release snapshot.""" if self.patchlevel is not None: for snapshot in self.version_snapshots: if snapshot.snapshot_name == f"p{self.patchlevel}": return snapshot return self.latest_snapshot @property def latest_snapshot(self) -> libzfs.ZFSSnapshot: """ Return or create the latest version snapshot. When no snapshot was taken before `p0` is automatically created. """ version_snapshots = self.version_snapshots if len(version_snapshots) == 0: self.logger.verbose("No release snapshot found - using @p0") return self.snapshot("p0") else: return version_snapshots[0] @property def version_snapshots(self) -> typing.List['libzfs.ZFSSnapshot']: """Return the sorted list of taken version snapshots (e.g. p3).""" versions: typing.List[int] = [] snapshots: typing.Dict[int, 'libzfs.ZFSSnapshot'] = {} pattern = re.compile(r"^p(\d+)$") for snapshot in self.root_dataset.snapshots: match = pattern.match(snapshot.snapshot_name) if match is None: continue patch_level = int(match[1]) versions.append(patch_level) snapshots[patch_level] = snapshot return [snapshots[i] for i in reversed(sorted(versions))] def _require_release_supported(self) -> None: if self.host.distribution.name == "HardenedBSD": version = self.release.version_number if (version == 0) or (version >= 10.4): return raise libioc.errors.UnsupportedRelease( version=version, logger=self.logger ) @property def version_number(self) -> float: """Return the numeric release version number or 0 for CURRENT.""" return self._parse_release_version(self.name) def _parse_release_version(self, release_version_string: str) -> float: _parts = release_version_string.split("-", maxsplit=1) parsed_version = _parts[0] try: version = float(parsed_version) if self.host.distribution.name == "HardenedBSD": if (len(_parts) == 2) and (_parts[1].upper() == "STABLE"): if (version == 10.0): return 10.4 elif (version == 11.0): return 11.1 return version except ValueError: return float(0) @property def mirror_url(self) -> str: """Return the distributions release mirror URL.""" if self._mirror_url is None: return str(self.host.distribution.mirror_url) else: return self._mirror_url @mirror_url.setter def mirror_url(self, value: str) -> None: """Override the default release mirror URL.""" url = urllib.parse.urlparse(value) if url.scheme not in self._supported_url_schemes: raise ValueError(f"Invalid URL scheme '{url.scheme}'") self._mirror_url = url.geturl() @property def remote_url(self) -> str: """Return the releases full mirror URL.""" return f"{self.mirror_url}/{self.real_name}" @property def available(self) -> bool: """Return True if the release is available on the remote mirror.""" try: request = urllib.request.Request(self.remote_url, method="HEAD") resource = urllib.request.urlopen(request) # nosec: trusted URL return (resource.getcode() == 200) is True # noqa: T484 except urllib.error.URLError: pass return False @property def fetched(self) -> bool: """Return True if the release is fetched locally.""" if self.exists is False: return False try: root_dir_index = os.listdir(self.root_dataset.mountpoint) except libzfs.ZFSException: return False for expected_directory in ["dev", "var", "etc"]: if expected_directory not in root_dir_index: return False return True @property def newer_than_host(self) -> bool: """Return True if the release is newer than the host.""" host_release_name = self._pad_release_name(self.host.release_version) release_name = self._pad_release_name(self.name) host_is_current = host_release_name.startswith("CURRENT") release_is_current = release_name.startswith("CURRENT") if release_is_current is True: if host_is_current is False: return True else: return False cropped_release_name = release_name[:len(host_release_name)] return (host_release_name < cropped_release_name) def _pad_release_name(self, release_name: str, digits: int=4) -> str: """Pad releases with 0 until it has 4 characters before the first.""" try: major_version = int(release_name.split("-")[0].split(".")[0]) padding = str("0" * (digits - len(str(major_version)))) return padding + release_name except (KeyError, AttributeError, ValueError): return release_name @property def zfs_pool(self) -> libzfs.ZFSPool: """Return the releases ZFS pool.""" try: root_pool = self.root_dataset.pool # type: libzfs.ZFSPool return root_pool except AttributeError: pool = self.host.datasets.releases.pool # type: libzfs.ZFSPool return pool @property def hashes(self) -> typing.Dict[str, str]: """Return the releases asset hashes.""" if self._hashes is None: if not os.path.isfile(self.__get_hashfile_location()): self.logger.spam("hashes have not yet been downloaded") self._fetch_hashes() self._hashes = self.read_hashes() if isinstance(self._hashes, dict): return self._hashes raise libioc.errors.ReleaseAssetHashesUnavailable( logger=self.logger ) @property def _supported_url_schemes(self) -> typing.List[str]: return ["https", "http", "ftp"] @property def hbds_release_branch(self) -> str: """Translate the release into a HardenedBSD release git branch name.""" if self._hbsd_release_branch is not None: return self._hbsd_release_branch if self.fetched is False: raise libioc.errors.ReleaseNotFetched( name=self.name, logger=self.logger ) root_dataset_mountpoint = self.root_dataset.mountpoint source_file = f"{root_dataset_mountpoint}/etc/hbsd-update.conf" if not os.path.isfile(source_file): raise libioc.errors.ReleaseUpdateBranchLookup( release_name=self.name, reason=f"{source_file} not found", logger=self.logger ) libioc.helpers.require_no_symlink(source_file, logger=self.logger) with open(source_file, "r", encoding="utf-8") as f: import ucl hbsd_update_conf = ucl.load(f.read()) self._hbsd_release_branch = hbsd_update_conf["branch"] return str(self._hbsd_release_branch)
[docs] def fetch( self, update: bool=False, fetch_updates: bool=False, event_scope: typing.Optional['libioc.events.Scope']=None, update_base: typing.Optional[bool]=None ) -> typing.Generator['libioc.events.IocEvent', None, None]: """ Fetch the release from the remote. Args: update (bool): (default=False) When enabled, updates are applied to the release, unless fetching updates was without success. fetch_updates (bool): (default=False) When enabled, updates are fetched. Disabling this option has no impact on applying updates. event_scope (libioc.events.Scope): (optional) Pass on the IocEvent stack for use in higher order functions. update_base (bool): (optional) When unset, the sysrc config `ioc_legacy_support` is taken into account to determine whether ZFS basejail datasets should be updated in case the release was created or modified. Those ZFS basejail datasets are required to start basejails created with iocage_legacy. A boolean value overrides the configuration from /etc/rc.conf. """ release_changed: typing.Optional[bool] = None self._require_release_supported() events = libioc.events fetchReleaseEvent = events.FetchRelease( self, scope=event_scope ) _scope = fetchReleaseEvent.scope releasePrepareStorageEvent = events.ReleasePrepareStorage( self, scope=_scope ) releaseDownloadEvent = events.ReleaseDownload( self, scope=_scope ) releaseExtractionEvent = events.ReleaseExtraction( self, scope=_scope ) releaseConfigurationEvent = events.ReleaseConfiguration( self, scope=_scope ) releaseCopyBaseEvent = events.ReleaseCopyBase( self, scope=_scope ) if self.fetched is False: release_changed = False yield fetchReleaseEvent.begin() yield releasePrepareStorageEvent.begin() # ToDo: allow to reach this for forced re-fetch self.create_resource() self.get_or_create_dataset("root") self._ensure_dataset_mounted() yield releasePrepareStorageEvent.end() yield releaseDownloadEvent.begin() try: yield from self._fetch_assets(releaseDownloadEvent.scope) except Exception: yield releaseDownloadEvent.fail() raise yield releaseDownloadEvent.end() yield releaseExtractionEvent.begin() try: self._extract_assets() except Exception as e: yield releaseExtractionEvent.fail(e) raise yield releaseExtractionEvent.end() release_changed = True yield fetchReleaseEvent.end() else: yield fetchReleaseEvent.skip( message="already downloaded" ) self.logger.verbose( "Release was already downloaded. Skipping download." ) yield releaseConfigurationEvent.begin() try: if any([ self._set_default_periodic_conf(), self._set_default_rc_conf(), self._set_default_sysctl_conf() ]): release_changed = True yield releaseConfigurationEvent.end() else: yield releaseConfigurationEvent.skip() except Exception as e: yield releaseConfigurationEvent.fail(e) raise e self.snapshot("p0") if fetch_updates is True: try: for event in self.updater.fetch(event_scope=_scope): if isinstance(event, libioc.events.ReleaseUpdateDownload): if (event.done and event.skipped) is True: update = False yield event except libioc.errors.IocException: update = False if update is True: for event in self.updater.apply(): if isinstance(event, libioc.events.IocEvent): yield event else: # the only non-IocEvent is our return value release_changed = event yield releaseCopyBaseEvent.begin() if update_base is True: _update_base = True elif update_base is None: # if not specified, lookup global host configuration legacy_support_key = "ioc_legacy_support" rc_conf = libioc.Config.Host.rc_conf if legacy_support_key in rc_conf: _update_base = (rc_conf[legacy_support_key] is True) else: _update_base = False else: _update_base = False if _update_base is True: if release_changed is False: yield releaseCopyBaseEvent.skip(message="release unchanged") self.update_base_release() yield releaseCopyBaseEvent.end() else: yield releaseCopyBaseEvent.skip( message="legacy basejal support disabled" ) self._cleanup()
def _copy_to_base_release(self) -> None: libioc.helpers.exec( [ "rsync", "-a", "--delete", f"{self.root_dataset.mountpoint}/", f"{self.base_dataset.mountpoint}" ], logger=self.logger ) @property def _base_resource(self) -> ReleaseResource: return ReleaseResource( release=self.release, logger=self.logger, host=self.host, zfs=self.zfs ) # ToDo: Memoize ReleaseResource
[docs] def snapshot( self, identifier: str, force: bool=False ) -> libzfs.ZFSSnapshot: """ Create a ZFS snapshot of the release. Args: identifier: This string specifies the snapshots name force: (default=False) Enabling this option forces re-creation of a snapshot in case it already exists for the given identifier Returns: libzfs.ZFSSnapshot: The ZFS snapshot object found or created """ snapshot_name = f"{self.root_dataset.name}@{identifier}" existing_snapshot: typing.Optional[libzfs.ZFSSnapshot] = None try: existing_snapshot = self.zfs.get_snapshot(snapshot_name) if (force is False) and (existing_snapshot is not None): self.logger.verbose( f"Re-using release snapshot {self.name}@{identifier}" ) return existing_snapshot except libzfs.ZFSException: existing_snapshot = None pass if existing_snapshot is not None: self.logger.verbose( f"Deleting release snapshot {self.name}@{identifier}" ) existing_snapshot.delete() existing_snapshot = None self.root_dataset.snapshot(snapshot_name) snapshot: libzfs.ZFSSnapshot = self.zfs.get_snapshot(snapshot_name) return snapshot
def _ensure_dataset_mounted(self) -> None: if not self.dataset.mountpoint: self.dataset.mount() def _fetch_hashes(self) -> None: url = f"{self.remote_url}/{self.host.distribution.hash_file}" path = self.__get_hashfile_location() self.logger.verbose(f"Downloading hashes from {url}") urllib.request.urlretrieve(url, path) # nosec: validated in @setter self.logger.debug(f"Hashes downloaded to {path}") def _fetch_assets( self, event_scope: typing.Optional['libioc.events.Scope']=None, ) -> typing.Generator['libioc.events.IocEvent', None, None]: for asset in self.assets: releaseAssetDownloadEvent = libioc.events.ReleaseAssetDownload( release=self, scope=event_scope ) yield releaseAssetDownloadEvent.begin() url = f"{self.remote_url}/{asset}.txz" path = self._get_asset_location(asset) if os.path.isfile(path): yield releaseAssetDownloadEvent.skip("{path} already exists") else: try: self.logger.debug(f"Starting download of {url}") urllib.request.urlretrieve(url, path) # nosec: validated self.logger.verbose(f"{url} was saved to {path}") yield releaseAssetDownloadEvent.end() except urllib.error.HTTPError as http_error: yield releaseAssetDownloadEvent.fail() raise libioc.errors.DownloadFailed( url=url, code=http_error.code, logger=self.logger )
[docs] def read_hashes(self) -> typing.Dict[str, str]: """Read the release asset hashes.""" # yes, this can read HardenedBSD and FreeBSD hash files path = self.__get_hashfile_location() hashes = {} with open(path, "r", encoding="utf-8") as f: for line in f.read().splitlines(): s = set(line.replace("\t", " ").split(" ")) fingerprint = None asset = None for x in s: x = x.strip("()") if len(x) == 64: fingerprint = x elif x.endswith(".txz"): asset = x[:-4] if asset and fingerprint: hashes[asset] = fingerprint count = len(hashes) self.logger.spam(f"{count} hashes read from {path}") return hashes
def __get_hashfile_location(self) -> str: hash_file = self.host.distribution.hash_file return f"{self.download_directory}/{hash_file}" def _get_asset_location(self, asset_name: str) -> str: return f"{self.download_directory}/{asset_name}.txz" def _extract_assets(self) -> None: for asset in self.assets: if self.check_hashes: self._check_asset_hash(asset) libioc.SecureTarfile.extract( file=self._get_asset_location(asset), compression_format="xz", destination=self.root_dir, logger=self.logger ) def _set_default_rc_conf(self) -> bool: for key, value in self.DEFAULT_RC_CONF.items(): self.rc_conf[key] = value return self.rc_conf.save() is True def _set_default_periodic_conf(self) -> bool: for key, value in self.DEFAULT_PERIODIC_CONF.items(): self.periodic_conf[key] = value return self.periodic_conf.save() is True def _set_default_sysctl_conf(self) -> bool: for key, value in self.DEFAULT_SYSCTL_CONF.items(): self.sysctl_conf[key] = value return self.sysctl_conf.save() is True def _update_name_from_dataset(self) -> None: if self.dataset is not None: self.name = self.dataset.name.split("/")[-2:-1]
[docs] def update_base_release(self) -> None: """Update the ZFS basejail release dataset.""" base_dataset = self.zfs.get_or_create_dataset(self.base_dataset_name) basedirs = libioc.helpers.get_basedir_list( distribution_name=self.host.distribution.name ) for folder in basedirs: self.zfs.get_or_create_dataset(f"{base_dataset.name}/{folder}") self._copy_to_base_release() self.logger.debug(f"Base release '{self.name}' updated")
def _cleanup(self) -> None: for asset in self.assets: asset_location = self._get_asset_location(asset) if os.path.isfile(asset_location): os.remove(asset_location) def _check_asset_hash(self, asset_name: str) -> None: local_file_hash = self._read_asset_hash(asset_name) expected_hash = self.hashes[asset_name] has_valid_hash = local_file_hash == expected_hash if not has_valid_hash: self.logger.warn( f"Asset {asset_name}.txz has an invalid signature" f"(was '{local_file_hash}' but expected '{expected_hash}')" ) raise libioc.errors.InvalidReleaseAssetSignature( release_name=self.name, asset_name=asset_name, logger=self.logger ) self.logger.spam( f"Asset {asset_name}.txz has a valid signature ({expected_hash})" ) def _read_asset_hash(self, asset_name: str) -> str: asset_location = self._get_asset_location(asset_name) sha256 = hashlib.sha256() with open(asset_location, "rb") as f: for block in iter(lambda: f.read(65536), b''): sha256.update(block) return sha256.hexdigest() def __str__(self) -> str: """Return the release name.""" if self.patchlevel is None: return self.name else: return f"{self.name}-p{self.patchlevel}"
[docs] def destroy( self, force: bool=False, event_scope: typing.Optional['libioc.events.Scope']=None ) -> typing.Generator['libioc.events.IocEvent', None, None]: """Delete a release.""" zfsDatasetDestroyEvent = libioc.events.ZFSDatasetDestroy( dataset=self.dataset, scope=event_scope ) yield zfsDatasetDestroyEvent.begin() try: self.zfs.delete_dataset_recursive(self.dataset) except Exception as e: zfsDatasetDestroyEvent.fail(e) raise e yield zfsDatasetDestroyEvent.end()
[docs]class Release(ReleaseGenerator): """Release with synchronous interfaces."""
[docs] def fetch( # noqa: T484 self, update: bool=False, fetch_updates: bool=False, event_scope: typing.Optional['libioc.events.Scope']=None ) -> typing.List['libioc.events.IocEvent']: """Fetch the release from the remote synchronously.""" return list(ReleaseGenerator.fetch( self, update=update, fetch_updates=fetch_updates, event_scope=event_scope ))
[docs] def destroy( # noqa: T484 self, force: bool=False, event_scope: typing.Optional['libioc.events.Scope']=None ) -> typing.List['libioc.events.IocEvent']: """Delete a release.""" return list(ReleaseGenerator.destroy( self, force=force, event_scope=event_scope ))