Add HACS, Themes

This commit is contained in:
2022-05-04 10:50:54 -07:00
parent af527f1e65
commit 9c7c4a5863
183 changed files with 16569 additions and 17 deletions

View File

@@ -0,0 +1 @@
"""Initialize HACS utils."""

View File

@@ -0,0 +1,137 @@
"""Backup."""
from __future__ import annotations
import os
import shutil
import tempfile
from time import sleep
from typing import TYPE_CHECKING
from .path import is_safe
if TYPE_CHECKING:
from ..base import HacsBase
from ..repositories.base import HacsRepository
DEFAULT_BACKUP_PATH = f"{tempfile.gettempdir()}/hacs_backup/"
class Backup:
"""Backup."""
def __init__(
self,
hacs: HacsBase,
local_path: str | None = None,
backup_path: str = DEFAULT_BACKUP_PATH,
repository: HacsRepository | None = None,
) -> None:
"""initialize."""
self.hacs = hacs
self.repository = repository
self.local_path = local_path or repository.content.path.local
self.backup_path = backup_path
if repository:
self.backup_path = (
tempfile.gettempdir()
+ f"/hacs_persistent_{repository.data.category}/"
+ repository.data.name
)
self.backup_path_full = f"{self.backup_path}{self.local_path.split('/')[-1]}"
def _init_backup_dir(self) -> bool:
"""Init backup dir."""
if not os.path.exists(self.local_path):
return False
if not is_safe(self.hacs, self.local_path):
return False
if os.path.exists(self.backup_path):
shutil.rmtree(self.backup_path)
# Wait for the folder to be removed
while os.path.exists(self.backup_path):
sleep(0.1)
os.makedirs(self.backup_path, exist_ok=True)
return True
def create(self) -> None:
"""Create a backup in /tmp"""
if not self._init_backup_dir():
return
try:
if os.path.isfile(self.local_path):
shutil.copyfile(self.local_path, self.backup_path_full)
os.remove(self.local_path)
else:
shutil.copytree(self.local_path, self.backup_path_full)
shutil.rmtree(self.local_path)
while os.path.exists(self.local_path):
sleep(0.1)
self.hacs.log.debug(
"Backup for %s, created in %s",
self.local_path,
self.backup_path_full,
)
except BaseException as exception: # lgtm [py/catch-base-exception] pylint: disable=broad-except
self.hacs.log.warning("Could not create backup: %s", exception)
def restore(self) -> None:
"""Restore from backup."""
if not os.path.exists(self.backup_path_full):
return
if os.path.isfile(self.backup_path_full):
if os.path.exists(self.local_path):
os.remove(self.local_path)
shutil.copyfile(self.backup_path_full, self.local_path)
else:
if os.path.exists(self.local_path):
shutil.rmtree(self.local_path)
while os.path.exists(self.local_path):
sleep(0.1)
shutil.copytree(self.backup_path_full, self.local_path)
self.hacs.log.debug("Restored %s, from backup %s", self.local_path, self.backup_path_full)
def cleanup(self) -> None:
"""Cleanup backup files."""
if not os.path.exists(self.backup_path):
return
shutil.rmtree(self.backup_path)
# Wait for the folder to be removed
while os.path.exists(self.backup_path):
sleep(0.1)
self.hacs.log.debug("Backup dir %s cleared", self.backup_path)
class BackupNetDaemon(Backup):
"""BackupNetDaemon."""
def create(self) -> None:
"""Create a backup in /tmp"""
if not self._init_backup_dir():
return
for filename in os.listdir(self.repository.content.path.local):
if not filename.endswith(".yaml"):
continue
source_file_name = f"{self.repository.content.path.local}/{filename}"
target_file_name = f"{self.backup_path}/{filename}"
shutil.copyfile(source_file_name, target_file_name)
def restore(self) -> None:
"""Create a backup in /tmp"""
if not os.path.exists(self.backup_path):
return
for filename in os.listdir(self.backup_path):
if not filename.endswith(".yaml"):
continue
source_file_name = f"{self.backup_path}/{filename}"
target_file_name = f"{self.repository.content.path.local}/{filename}"
shutil.copyfile(source_file_name, target_file_name)

View File

@@ -0,0 +1,74 @@
"""HACS Configuration Schemas."""
# pylint: disable=dangerous-default-value
import voluptuous as vol
from ..const import LOCALE
# Configuration:
TOKEN = "token"
SIDEPANEL_TITLE = "sidepanel_title"
SIDEPANEL_ICON = "sidepanel_icon"
FRONTEND_REPO = "frontend_repo"
FRONTEND_REPO_URL = "frontend_repo_url"
APPDAEMON = "appdaemon"
NETDAEMON = "netdaemon"
# Options:
COUNTRY = "country"
DEBUG = "debug"
RELEASE_LIMIT = "release_limit"
EXPERIMENTAL = "experimental"
# Config group
PATH_OR_URL = "frontend_repo_path_or_url"
def hacs_base_config_schema(config: dict = {}) -> dict:
"""Return a shcema configuration dict for HACS."""
if not config:
config = {
TOKEN: "xxxxxxxxxxxxxxxxxxxxxxxxxxx",
}
return {
vol.Required(TOKEN, default=config.get(TOKEN)): str,
}
def hacs_config_option_schema(options: dict = {}) -> dict:
"""Return a shcema for HACS configuration options."""
if not options:
options = {
APPDAEMON: False,
COUNTRY: "ALL",
DEBUG: False,
EXPERIMENTAL: False,
NETDAEMON: False,
RELEASE_LIMIT: 5,
SIDEPANEL_ICON: "hacs:hacs",
SIDEPANEL_TITLE: "HACS",
FRONTEND_REPO: "",
FRONTEND_REPO_URL: "",
}
return {
vol.Optional(SIDEPANEL_TITLE, default=options.get(SIDEPANEL_TITLE)): str,
vol.Optional(SIDEPANEL_ICON, default=options.get(SIDEPANEL_ICON)): str,
vol.Optional(RELEASE_LIMIT, default=options.get(RELEASE_LIMIT)): int,
vol.Optional(COUNTRY, default=options.get(COUNTRY)): vol.In(LOCALE),
vol.Optional(APPDAEMON, default=options.get(APPDAEMON)): bool,
vol.Optional(NETDAEMON, default=options.get(NETDAEMON)): bool,
vol.Optional(DEBUG, default=options.get(DEBUG)): bool,
vol.Optional(EXPERIMENTAL, default=options.get(EXPERIMENTAL)): bool,
vol.Exclusive(FRONTEND_REPO, PATH_OR_URL): str,
vol.Exclusive(FRONTEND_REPO_URL, PATH_OR_URL): str,
}
def hacs_config_combined() -> dict:
"""Combine the configuration options."""
base = hacs_base_config_schema()
options = hacs_config_option_schema()
for option in options:
base[option] = options[option]
return base

View File

@@ -0,0 +1,263 @@
"""Data handler for HACS."""
import asyncio
from datetime import datetime
import os
from homeassistant.core import callback
from homeassistant.util import json as json_util
from ..base import HacsBase
from ..enums import HacsGitHubRepo
from ..repositories.base import HacsManifest, HacsRepository
from .logger import get_hacs_logger
from .path import is_safe
from .store import (
async_load_from_store,
async_save_to_store,
async_save_to_store_default_encoder,
get_store_for_key,
)
def update_repository_from_storage(repository, storage_data):
"""Merge in data from storage into the repo data."""
repository.data.memorize_storage(storage_data)
repository.data.update_data(storage_data)
if repository.data.installed:
return
repository.logger.debug("%s Should be installed but is not... Fixing that!", repository.string)
repository.data.installed = True
class HacsData:
"""HacsData class."""
def __init__(self, hacs: HacsBase):
"""Initialize."""
self.logger = get_hacs_logger()
self.hacs = hacs
self.content = {}
async def async_write(self, force: bool = False) -> None:
"""Write content to the store files."""
if not force and self.hacs.system.disabled:
return
self.logger.debug("<HacsData async_write> Saving data")
# Hacs
await async_save_to_store(
self.hacs.hass,
"hacs",
{
"view": self.hacs.configuration.frontend_mode,
"compact": self.hacs.configuration.frontend_compact,
"onboarding_done": self.hacs.configuration.onboarding_done,
"archived_repositories": self.hacs.common.archived_repositories,
"renamed_repositories": self.hacs.common.renamed_repositories,
"ignored_repositories": self.hacs.common.ignored_repositories,
},
)
await self._async_store_content_and_repos()
for event in ("hacs/repository", "hacs/config"):
self.hacs.hass.bus.async_fire(event, {})
async def _async_store_content_and_repos(self): # bb: ignore
"""Store the main repos file and each repo that is out of date."""
# Repositories
self.content = {}
# Not run concurrently since this is bound by disk I/O
for repository in self.hacs.repositories.list_all:
await self.async_store_repository_data(repository)
await async_save_to_store(self.hacs.hass, "repositories", self.content)
async def async_store_repository_data(self, repository: HacsRepository):
repository_manifest = repository.repository_manifest.manifest
data = {
"authors": repository.data.authors,
"category": repository.data.category,
"description": repository.data.description,
"domain": repository.data.domain,
"downloads": repository.data.downloads,
"etag_repository": repository.data.etag_repository,
"full_name": repository.data.full_name,
"first_install": repository.status.first_install,
"installed_commit": repository.data.installed_commit,
"installed": repository.data.installed,
"last_commit": repository.data.last_commit,
"last_release_tag": repository.data.last_version,
"last_updated": repository.data.last_updated,
"name": repository.data.name,
"new": repository.data.new,
"repository_manifest": repository_manifest,
"releases": repository.data.releases,
"selected_tag": repository.data.selected_tag,
"show_beta": repository.data.show_beta,
"stars": repository.data.stargazers_count,
"topics": repository.data.topics,
"version_installed": repository.data.installed_version,
}
if repository.data.last_fetched:
data["last_fetched"] = repository.data.last_fetched.timestamp()
self.content[str(repository.data.id)] = data
if (
repository.data.installed
and (repository.data.installed_commit or repository.data.installed_version)
and (export := repository.data.export_data())
):
# export_data will return `None` if the memorized
# data is already up to date which allows us to avoid
# writing data that is already up to date or generating
# executor jobs to check the data on disk to see
# if a write is needed.
await async_save_to_store_default_encoder(
self.hacs.hass,
f"hacs/{repository.data.id}.hacs",
export,
)
repository.data.memorize_storage(export)
async def restore(self):
"""Restore saved data."""
self.hacs.status.new = False
hacs = await async_load_from_store(self.hacs.hass, "hacs") or {}
repositories = await async_load_from_store(self.hacs.hass, "repositories") or {}
if not hacs and not repositories:
# Assume new install
self.hacs.status.new = True
self.logger.info("<HacsData restore> Loading base repository information")
repositories = await self.hacs.hass.async_add_executor_job(
json_util.load_json,
f"{self.hacs.core.config_path}/custom_components/hacs/utils/default.repositories",
)
self.logger.info("<HacsData restore> Restore started")
# Hacs
self.hacs.configuration.frontend_mode = hacs.get("view", "Grid")
self.hacs.configuration.frontend_compact = hacs.get("compact", False)
self.hacs.configuration.onboarding_done = hacs.get("onboarding_done", False)
self.hacs.common.archived_repositories = []
self.hacs.common.ignored_repositories = []
self.hacs.common.renamed_repositories = {}
# Clear out doubble renamed values
renamed = hacs.get("renamed_repositories", {})
for entry in renamed:
value = renamed.get(entry)
if value not in renamed:
self.hacs.common.renamed_repositories[entry] = value
# Clear out doubble archived values
for entry in hacs.get("archived_repositories", []):
if entry not in self.hacs.common.archived_repositories:
self.hacs.common.archived_repositories.append(entry)
# Clear out doubble ignored values
for entry in hacs.get("ignored_repositories", []):
if entry not in self.hacs.common.ignored_repositories:
self.hacs.common.ignored_repositories.append(entry)
hass = self.hacs.hass
stores = {}
try:
await self.register_unknown_repositories(repositories)
for entry, repo_data in repositories.items():
if entry == "0":
# Ignore repositories with ID 0
self.logger.debug(
"<HacsData restore> Found repository with ID %s - %s", entry, repo_data
)
continue
if self.async_restore_repository(entry, repo_data):
stores[entry] = get_store_for_key(hass, f"hacs/{entry}.hacs")
def _load_from_storage():
for entry, store in stores.items():
if os.path.exists(store.path) and (data := store.load()):
if (full_name := data.get("full_name")) and (
renamed := self.hacs.common.renamed_repositories.get(full_name)
) is not None:
data["full_name"] = renamed
update_repository_from_storage(
self.hacs.repositories.get_by_id(entry), data
)
await hass.async_add_executor_job(_load_from_storage)
self.logger.info("<HacsData restore> Restore done")
except BaseException as exception: # lgtm [py/catch-base-exception] pylint: disable=broad-except
self.logger.critical(
f"<HacsData restore> [{exception}] Restore Failed!", exc_info=exception
)
return False
return True
async def register_unknown_repositories(self, repositories):
"""Registry any unknown repositories."""
register_tasks = [
self.hacs.async_register_repository(
repository_full_name=repo_data["full_name"],
category=repo_data["category"],
check=False,
repository_id=entry,
)
for entry, repo_data in repositories.items()
if entry != "0" and not self.hacs.repositories.is_registered(repository_id=entry)
]
if register_tasks:
await asyncio.gather(*register_tasks)
@callback
def async_restore_repository(self, entry, repository_data):
full_name = repository_data["full_name"]
if not (repository := self.hacs.repositories.get_by_full_name(full_name)):
self.logger.error(f"<HacsData restore> Did not find {full_name} ({entry})")
return False
# Restore repository attributes
self.hacs.repositories.set_repository_id(repository, entry)
repository.data.authors = repository_data.get("authors", [])
repository.data.description = repository_data.get("description")
repository.releases.last_release_object_downloads = repository_data.get("downloads")
repository.data.last_updated = repository_data.get("last_updated")
repository.data.etag_repository = repository_data.get("etag_repository")
repository.data.topics = repository_data.get("topics", [])
repository.data.domain = repository_data.get("domain", None)
repository.data.stargazers_count = repository_data.get("stars", 0)
repository.releases.last_release = repository_data.get("last_release_tag")
repository.data.releases = repository_data.get("releases")
repository.data.hide = repository_data.get("hide", False)
repository.data.installed = repository_data.get("installed", False)
repository.data.new = repository_data.get("new", True)
repository.data.selected_tag = repository_data.get("selected_tag")
repository.data.show_beta = repository_data.get("show_beta", False)
repository.data.last_version = repository_data.get("last_release_tag")
repository.data.last_commit = repository_data.get("last_commit")
repository.data.installed_version = repository_data.get("version_installed")
repository.data.installed_commit = repository_data.get("installed_commit")
if last_fetched := repository_data.get("last_fetched"):
repository.data.last_fetched = datetime.fromtimestamp(last_fetched)
repository.repository_manifest = HacsManifest.from_dict(
repository_data.get("repository_manifest", {})
)
if repository.localpath is not None and is_safe(self.hacs, repository.localpath):
# Set local path
repository.content.path.local = repository.localpath
if repository.data.installed:
repository.status.first_install = False
if full_name == HacsGitHubRepo.INTEGRATION:
repository.data.installed_version = self.hacs.version
repository.data.installed = True
return True

View File

@@ -0,0 +1,7 @@
"""Util to decode content from the github API."""
from base64 import b64decode
def decode_content(content: str) -> str:
"""Decode content."""
return b64decode(bytearray(content, "utf-8")).decode()

View File

@@ -0,0 +1,41 @@
"""HACS Decorators."""
from __future__ import annotations
import asyncio
from functools import wraps
from typing import TYPE_CHECKING, Any, Coroutine
from ..const import DEFAULT_CONCURRENT_BACKOFF_TIME, DEFAULT_CONCURRENT_TASKS
if TYPE_CHECKING:
from ..base import HacsBase
def concurrent(
concurrenttasks: int = DEFAULT_CONCURRENT_TASKS,
backoff_time=DEFAULT_CONCURRENT_BACKOFF_TIME,
) -> Coroutine[Any, Any, None]:
"""Return a modified function."""
max_concurrent = asyncio.Semaphore(concurrenttasks)
def inner_function(function) -> Coroutine[Any, Any, None]:
@wraps(function)
async def wrapper(*args, **kwargs) -> None:
hacs: HacsBase = getattr(args[0], "hacs", None)
async with max_concurrent:
result = await function(*args, **kwargs)
if (
hacs is None
or hacs.queue is None
or hacs.queue.has_pending_tasks
or "update" not in function.__name__
):
await asyncio.sleep(backoff_time)
return result
return wrapper
return inner_function

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,46 @@
"""Filter functions."""
from __future__ import annotations
from typing import Any
def filter_content_return_one_of_type(
content: list[str | Any],
namestartswith: str,
filterfiltype: str,
attr: str = "name",
) -> list[str]:
"""Only match 1 of the filter."""
contents = []
filetypefound = False
for filename in content:
if isinstance(filename, str):
if filename.startswith(namestartswith):
if filename.endswith(f".{filterfiltype}"):
if not filetypefound:
contents.append(filename)
filetypefound = True
continue
else:
contents.append(filename)
else:
if getattr(filename, attr).startswith(namestartswith):
if getattr(filename, attr).endswith(f".{filterfiltype}"):
if not filetypefound:
contents.append(filename)
filetypefound = True
continue
else:
contents.append(filename)
return contents
def get_first_directory_in_directory(content: list[str | Any], dirname: str) -> str | None:
"""Return the first directory in dirname or None."""
directory = None
for path in content:
if path.full_path.startswith(dirname) and path.full_path != dirname:
if path.is_directory:
directory = path.filename
break
return directory

View File

@@ -0,0 +1,11 @@
"""Custom logger for HACS."""
import logging
from ..const import PACKAGE_NAME
_HACSLogger: logging.Logger = logging.getLogger(PACKAGE_NAME)
def get_hacs_logger() -> logging.Logger:
"""Return a Logger instance."""
return _HACSLogger

View File

@@ -0,0 +1,20 @@
"""Path utils"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..base import HacsBase
def is_safe(hacs: HacsBase, path: str | Path) -> bool:
"""Helper to check if path is safe to remove."""
return Path(path).as_posix() not in (
Path(f"{hacs.core.config_path}/{hacs.configuration.appdaemon_path}").as_posix(),
Path(f"{hacs.core.config_path}/{hacs.configuration.netdaemon_path}").as_posix(),
Path(f"{hacs.core.config_path}/{hacs.configuration.plugin_path}").as_posix(),
Path(f"{hacs.core.config_path}/{hacs.configuration.python_script_path}").as_posix(),
Path(f"{hacs.core.config_path}/{hacs.configuration.theme_path}").as_posix(),
Path(f"{hacs.core.config_path}/custom_components/").as_posix(),
)

View File

@@ -0,0 +1,78 @@
"""The QueueManager class."""
from __future__ import annotations
import asyncio
import time
from typing import Coroutine
from homeassistant.core import HomeAssistant
from ..exceptions import HacsExecutionStillInProgress
from .logger import get_hacs_logger
_LOGGER = get_hacs_logger()
class QueueManager:
"""The QueueManager class."""
def __init__(self, hass: HomeAssistant) -> None:
self.hass = hass
self.queue: list[Coroutine] = []
self.running = False
@property
def pending_tasks(self) -> int:
"""Return a count of pending tasks in the queue."""
return len(self.queue)
@property
def has_pending_tasks(self) -> bool:
"""Return a count of pending tasks in the queue."""
return self.pending_tasks != 0
def clear(self) -> None:
"""Clear the queue."""
self.queue = []
def add(self, task: Coroutine) -> None:
"""Add a task to the queue."""
self.queue.append(task)
async def execute(self, number_of_tasks: int | None = None) -> None:
"""Execute the tasks in the queue."""
if self.running:
_LOGGER.debug("<QueueManager> Execution is allreay running")
raise HacsExecutionStillInProgress
if len(self.queue) == 0:
_LOGGER.debug("<QueueManager> The queue is empty")
return
self.running = True
_LOGGER.debug("<QueueManager> Checking out tasks to execute")
local_queue = []
if number_of_tasks:
for task in self.queue[:number_of_tasks]:
local_queue.append(task)
else:
for task in self.queue:
local_queue.append(task)
for task in local_queue:
self.queue.remove(task)
_LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", len(local_queue))
start = time.time()
await asyncio.gather(*local_queue)
end = time.time() - start
_LOGGER.debug(
"<QueueManager> Queue execution finished for %s tasks finished in %.2f seconds",
len(local_queue),
end,
)
if self.has_pending_tasks:
_LOGGER.debug("<QueueManager> %s tasks remaining in the queue", len(self.queue))
self.running = False

View File

@@ -0,0 +1,16 @@
"""Regex utils"""
from __future__ import annotations
import re
RE_REPOSITORY = re.compile(
r"(?:(?:.*github.com.)|^)([A-Za-z0-9-]+\/[\w.-]+?)(?:(?:\.git)?|(?:[^\w.-].*)?)$"
)
def extract_repository_from_url(url: str) -> str | None:
"""Extract the owner/repo part form a URL."""
match = re.match(RE_REPOSITORY, url)
if not match:
return None
return match.group(1).lower()

View File

@@ -0,0 +1,86 @@
"""Storage handers."""
from homeassistant.helpers.json import JSONEncoder
from homeassistant.helpers.storage import Store
from homeassistant.util import json as json_util
from ..const import VERSION_STORAGE
from ..exceptions import HacsException
from .logger import get_hacs_logger
_LOGGER = get_hacs_logger()
class HACSStore(Store):
"""A subclass of Store that allows multiple loads in the executor."""
def load(self):
"""Load the data from disk if version matches."""
try:
data = json_util.load_json(self.path)
except BaseException as exception: # lgtm [py/catch-base-exception] pylint: disable=broad-except
_LOGGER.critical(
"Could not load '%s', restore it from a backup or delete the file: %s",
self.path,
exception,
)
raise HacsException(exception) from exception
if data == {} or data["version"] != self.version:
return None
return data["data"]
def get_store_key(key):
"""Return the key to use with homeassistant.helpers.storage.Storage."""
return key if "/" in key else f"hacs.{key}"
def _get_store_for_key(hass, key, encoder):
"""Create a Store object for the key."""
return HACSStore(hass, VERSION_STORAGE, get_store_key(key), encoder=encoder)
def get_store_for_key(hass, key):
"""Create a Store object for the key."""
return _get_store_for_key(hass, key, JSONEncoder)
async def async_load_from_store(hass, key):
"""Load the retained data from store and return de-serialized data."""
return await get_store_for_key(hass, key).async_load() or {}
async def async_save_to_store_default_encoder(hass, key, data):
"""Generate store json safe data to the filesystem.
The data is expected to be encodable with the default
python json encoder. It should have already been passed through
JSONEncoder if needed.
"""
await _get_store_for_key(hass, key, None).async_save(data)
async def async_save_to_store(hass, key, data):
"""Generate dynamic data to store and save it to the filesystem.
The data is only written if the content on the disk has changed
by reading the existing content and comparing it.
If the data has changed this will generate two executor jobs
If the data has not changed this will generate one executor job
"""
current = await async_load_from_store(hass, key)
if current is None or current != data:
await get_store_for_key(hass, key).async_save(data)
return
_LOGGER.debug(
"<HACSStore async_save_to_store> Did not store data for '%s'. Content did not change",
get_store_key(key),
)
async def async_remove_store(hass, key):
"""Remove a store element that should no longer be used."""
if "/" not in key:
return
await get_store_for_key(hass, key).async_remove()

View File

@@ -0,0 +1,32 @@
"""Custom template support."""
from __future__ import annotations
from typing import TYPE_CHECKING
from jinja2 import Template
if TYPE_CHECKING:
from ..repositories.base import HacsRepository
def render_template(content: str, context: HacsRepository) -> str:
"""Render templates in content."""
# Fix None issues
if context.releases.last_release_object is not None:
prerelease = context.releases.last_release_object.prerelease
else:
prerelease = False
# Render the template
try:
return Template(content).render(
installed=context.data.installed,
pending_update=context.pending_update,
prerelease=prerelease,
selected_tag=context.data.selected_tag,
version_available=context.releases.last_release,
version_installed=context.display_installed_version,
)
except BaseException as exception: # lgtm [py/catch-base-exception] pylint: disable=broad-except
context.logger.debug(exception)
return content

View File

@@ -0,0 +1,15 @@
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class Validate:
"""Validate."""
errors: list[str] = field(default_factory=list)
@property
def success(self) -> bool:
"""Return bool if the validation was a success."""
return len(self.errors) == 0

View File

@@ -0,0 +1,58 @@
"""Version utils."""
from __future__ import annotations
from functools import lru_cache
from typing import TYPE_CHECKING
from awesomeversion import (
AwesomeVersion,
AwesomeVersionException,
AwesomeVersionStrategy,
)
if TYPE_CHECKING:
from ..repositories.base import HacsRepository
@lru_cache(maxsize=1024)
def version_left_higher_then_right(left: str, right: str) -> bool:
"""Return a bool if source is newer than target, will also be true if identical."""
try:
left_version = AwesomeVersion(left)
right_version = AwesomeVersion(right)
if (
left_version.strategy != AwesomeVersionStrategy.UNKNOWN
and right_version.strategy != AwesomeVersionStrategy.UNKNOWN
):
return left_version > right_version
except (AwesomeVersionException, AttributeError):
pass
return None
def version_left_higher_or_equal_then_right(left: str, right: str) -> bool:
"""Return a bool if source is newer than target, will also be true if identical."""
if left == right:
return True
return version_left_higher_then_right(left, right)
def version_to_download(repository: HacsRepository) -> str:
"""Determine which version to download."""
if repository.data.last_version is not None:
if repository.data.selected_tag is not None:
if repository.data.selected_tag == repository.data.last_version:
repository.data.selected_tag = None
return repository.data.last_version
return repository.data.selected_tag
return repository.data.last_version
if repository.data.selected_tag is not None:
if repository.data.selected_tag == repository.data.default_branch:
return repository.data.default_branch
if repository.data.selected_tag in repository.data.published_tags:
return repository.data.selected_tag
return repository.data.default_branch or "main"

View File

@@ -0,0 +1,7 @@
"""Workarounds for issues that should not be fixed."""
DOMAIN_OVERRIDES = {
# https://github.com/hacs/integration/issues/2465
"custom-components/sensor.custom_aftership": "custom_aftership"
}