# Copyright (c) 2017-2019, Stefan Grönke
# Copyright (c) 2014-2018, iocage
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted providing that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""ioc Distribution module."""
import typing
import os
import platform
import re
import urllib.request
import html.parser
import libioc.errors
import libioc.helpers_object
[docs]class EOLParser(html.parser.HTMLParser):
"""Parser for EOL releases."""
eol_releases: typing.List[str] = []
data: typing.List[str] = []
in_id: bool = False
td_counter: int = 0
current_branch: str = ""
[docs] def handle_starttag( # noqa: T484
self,
tag: str,
attrs: typing.Dict[str, str]
) -> None:
"""Handle opening HTML tags."""
if tag == "td":
self.in_id = True
self.td_counter += 1
elif tag == "tr":
self.td_counter = 0
[docs] def handle_endtag(self, tag: str) -> None:
"""Handle closing HTML tags."""
if tag == "td":
self.in_id = False
[docs] def handle_data(self, data: str) -> None:
"""Handle data in HTML tags."""
if self.in_id is False:
return # skip non-td content
if (self.td_counter == 1) and data.startswith("stable/"):
stable_version = data[7:]
self.eol_releases.append(f"{stable_version}-STABLE")
elif (self.td_counter == 2) and (data != "n/a"):
self.eol_releases.append(data)
[docs]class DistributionGenerator:
"""Asynchronous representation of the host distribution."""
release_name_blacklist = [
"",
".",
"..",
"ISO-IMAGES"
]
eol_url: str = "https://www.freebsd.org/security/unsupported.html"
_eol_list: typing.Optional[typing.List[str]]
__mirror_link_pattern = r"a href=\"([A-z0-9\-_\.]+)/\""
_available_releases: typing.Optional[
typing.List['libioc.Release.ReleaseGenerator']
]
host: 'libioc.Host.HostGenerator'
zfs: 'libioc.ZFS.ZFS'
logger: 'libioc.Logger.Logger'
def __init__(
self,
host: typing.Optional['libioc.Host.Host']=None,
zfs: typing.Optional['libioc.ZFS.ZFS']=None,
logger: typing.Optional['libioc.Logger.Logger']=None
) -> None:
self.logger = libioc.helpers_object.init_logger(self, logger)
self.zfs = libioc.helpers_object.init_zfs(self, zfs)
self.host = libioc.helpers_object.init_host(self, host)
self._eol_list = None
self._available_releases = None
@property
def _class_release(self) -> typing.Union[
'libioc.Release.ReleaseGenerator',
'libioc.Release.Release'
]:
return libioc.Release.ReleaseGenerator
@property
def name(self) -> str:
"""
Return the name of the host distribution.
Often used to differentiate between operations for HardenedBSD or
standard FreeBSD.
"""
if os.path.exists("/usr/sbin/hbsd-update"):
return "HardenedBSD"
else:
return platform.system()
@property
def mirror_url(self) -> str:
"""
Return the mirror URL of the distribution.
URL that points to the top level directory of a distributions release
asset HTTP server.
"""
distribution = self.name
processor = self.host.processor
if distribution == "FreeBSD":
release_path = f"/ftp/releases/{processor}/{processor}"
return f"https://download.freebsd.org{release_path}"
elif distribution == "HardenedBSD":
return f"https://jenkins.hardenedbsd.org/builds"
else:
raise libioc.errors.DistributionUnknown(distribution)
@property
def hash_file(self) -> str:
"""Return the name of the checksum file found on the mirror."""
if self.name == "FreeBSD":
return "MANIFEST"
elif self.name == "HardenedBSD":
return "CHECKSUMS.SHA256"
raise libioc.errors.DistributionUnknown(
distribution_name=self.name
)
[docs] def fetch_releases(self) -> None:
"""Fetch and cache the available releases."""
self.logger.spam(f"Fetching release list from '{self.mirror_url}'")
# the mirror_url @property is validated (enforced) @property, so:
resource = urllib.request.urlopen(self.mirror_url) # nosec
charset = resource.headers.get_content_charset() # noqa: T484
response = resource.read().decode(charset if charset else "UTF-8")
found_releases = self._parse_links(response)
def parse_release_version(release_name: str) -> float:
release_fragments = release_name.split("-", maxsplit=1)
try:
return float(release_fragments[0])
except ValueError:
# non-float values indicate a high index
return float(1024)
available_releases = sorted(
map( # map long HardenedBSD release names
self._map_available_release,
filter( # filter out other CPU architectures on HardenedBSD
self._filter_available_releases,
found_releases
)
),
key=parse_release_version # sort numerically
)
self._available_releases = list(map(
lambda x: self._class_release( # noqa: T484
name=x,
host=self.host,
zfs=self.zfs,
logger=self.logger
),
filter(
lambda y: len(y) > 0,
available_releases
)
))
def _map_available_release(self, release_name: str) -> str:
if self.name == "HardenedBSD":
# e.g. HardenedBSD-11-STABLE-libressl-amd64-LATEST
return "-".join(release_name.split("-")[1:-2])
return release_name
def _filter_available_releases(self, release_name: str) -> bool:
if self.name != "HardenedBSD":
return True
arch = release_name.split("-")[-2:][0]
return (self.host.processor == arch) is True
@property
def eol_list(self) -> typing.List[str]:
"""Return the (memoized) list of release names listed as EOL."""
if self._eol_list is None:
eol_list = self._query_eol_list()
self._eol_list = eol_list
return eol_list
else:
return self._eol_list
def _query_eol_list(self) -> typing.List[str]:
"""Scrape the FreeBSD website and return a list of EOL RELEASES."""
request = urllib.request.Request(
self.eol_url,
headers={
"Accept-Charset": "utf-8"
}
)
self.logger.verbose(f"Downloading EOL info from {self.eol_url}")
with urllib.request.urlopen(request) as response: # nosec: B310
response_code = response.getcode()
if response_code != 200: # noqa: T484
libioc.errors.DownloadFailed(
topic="EOL Warnings",
code=response_code,
logger=self.logger,
level="warning"
)
return []
parser = EOLParser()
data = response.read().decode("utf-8", "ignore")
parser.feed(data)
parser.close()
return parser.eol_releases
@property
def releases(self) -> typing.List['libioc.Release.ReleaseGenerator']:
"""
List of available releases.
Raises an error when the releases cannot be fetched at the current time
"""
if self._available_releases is None:
self.fetch_releases()
if self._available_releases is not None:
return self._available_releases
raise libioc.errors.ReleaseListUnavailable()
def _parse_links(self, text: str) -> typing.List[str]:
blacklisted_releases = Distribution.release_name_blacklist
matches = filter(
lambda y: y not in blacklisted_releases,
map(
lambda z: z.strip("\"/"), # noqa: T484
re.findall(
Distribution.__mirror_link_pattern,
text,
re.MULTILINE
)
)
)
return list(matches)
[docs]class Distribution(DistributionGenerator):
"""Synchronous wrapper of the host distribution."""
@property
def _class_release(self) -> typing.Union[
'libioc.Release.ReleaseGenerator',
'libioc.Release.Release'
]:
return libioc.Release.Release