ports/devel/py-proxmoxer/files/extra-patch-tests
Nuno Teixeira 20d2e961ee devel/py-proxmoxer: Fix tests
Sync PYPI tests with GH released version until fixed upstream

See also:	https://github.com/proxmoxer/proxmoxer/issues/195
PR:		283360
2024-12-24 08:47:09 +00:00

1117 lines
40 KiB
Text

Sync PYPI tests with GH released version
diff -ruN tests/__init__.py proxmoxer-2.2.0/tests/__init__.py
--- tests/__init__.py 1970-01-01 01:00:00.000000000 +0100
+++ proxmoxer-2.2.0/tests/__init__.py 2024-12-15 02:12:42.000000000 +0000
@@ -0,0 +1,3 @@
+__author__ = "John Hollowell"
+__copyright__ = "(c) John Hollowell 2022"
+__license__ = "MIT"
diff -ruN tests/api_mock.py proxmoxer-2.2.0/tests/api_mock.py
--- tests/api_mock.py 1970-01-01 01:00:00.000000000 +0100
+++ proxmoxer-2.2.0/tests/api_mock.py 2024-12-15 02:12:42.000000000 +0000
@@ -0,0 +1,360 @@
+__author__ = "John Hollowell"
+__copyright__ = "(c) John Hollowell 2022"
+__license__ = "MIT"
+
+import json
+import re
+from urllib.parse import parse_qsl, urlparse
+
+import pytest
+import responses
+from requests_toolbelt import MultipartEncoder
+
+
+@pytest.fixture()
+def mock_pve():
+ with responses.RequestsMock(registry=PVERegistry, assert_all_requests_are_fired=False) as rsps:
+ yield rsps
+
+
+class PVERegistry(responses.registries.FirstMatchRegistry):
+ base_url = "https://1.2.3.4:1234/api2/json"
+
+ common_headers = {
+ "Cache-Control": "max-age=0",
+ "Connection": "close, Keep-Alive",
+ "Pragma": "no-cache",
+ "Server": "pve-api-daemon/3.0",
+ "Content-Type": "application/json;charset=UTF-8",
+ }
+
+ def __init__(self):
+ super().__init__()
+ for resp in self._generate_static_responses():
+ self.add(resp)
+
+ for resp in self._generate_dynamic_responses():
+ self.add(resp)
+
+ def _generate_static_responses(self):
+ resps = []
+
+ # Basic GET requests
+ resps.append(
+ responses.Response(
+ method="GET",
+ url=self.base_url + "/version",
+ json={"data": {"version": "7.2-3", "release": "7.2", "repoid": "c743d6c1"}},
+ )
+ )
+
+ resps.append(
+ responses.Response(
+ method="POST",
+ url=re.compile(self.base_url + r"/nodes/[^/]+/storage/[^/]+/download-url"),
+ # "done" added to UPID so polling will terminate (status checking is tested elsewhere)
+ json={
+ "data": "UPID:node:003094EA:095F1EFE:63E88772:download:file.iso:root@pam:done",
+ "success": 1,
+ },
+ )
+ )
+
+ resps.append(
+ responses.Response(
+ method="POST",
+ url=re.compile(self.base_url + r"/nodes/[^/]+/storage/storage1/upload"),
+ # "done" added to UPID so polling will terminate (status checking is tested elsewhere)
+ json={"data": "UPID:node:0017C594:0ADB2769:63EC5455:imgcopy::root@pam:done"},
+ )
+ )
+ resps.append(
+ responses.Response(
+ method="POST",
+ url=re.compile(self.base_url + r"/nodes/[^/]+/storage/missing/upload"),
+ status=500,
+ body="storage 'missing' does not exist",
+ )
+ )
+
+ return resps
+
+ def _generate_dynamic_responses(self):
+ resps = []
+
+ # Authentication
+ resps.append(
+ responses.CallbackResponse(
+ method="POST",
+ url=self.base_url + "/access/ticket",
+ callback=self._cb_password_auth,
+ )
+ )
+
+ # Session testing
+ resps.append(
+ responses.CallbackResponse(
+ method="GET",
+ url=self.base_url + "/fake/echo",
+ callback=self._cb_echo,
+ )
+ )
+
+ resps.append(
+ responses.CallbackResponse(
+ method="GET",
+ url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/agent/exec"),
+ callback=self._cb_echo,
+ )
+ )
+
+ resps.append(
+ responses.CallbackResponse(
+ method="GET",
+ url=re.compile(self.base_url + r"/nodes/[^/]+/qemu/[^/]+/monitor"),
+ callback=self._cb_qemu_monitor,
+ )
+ )
+
+ resps.append(
+ responses.CallbackResponse(
+ method="GET",
+ url=re.compile(self.base_url + r"/nodes/[^/]+/tasks/[^/]+/status"),
+ callback=self._cb_task_status,
+ )
+ )
+
+ resps.append(
+ responses.CallbackResponse(
+ method="GET",
+ url=re.compile(self.base_url + r"/nodes/[^/]+/query-url-metadata.*"),
+ callback=self._cb_url_metadata,
+ )
+ )
+
+ return resps
+
+ ###################################
+ # Callbacks for Dynamic Responses #
+ ###################################
+
+ def _cb_echo(self, request):
+ body = request.body
+ if body is not None:
+ if isinstance(body, MultipartEncoder):
+ body = body.to_string() # really, to byte string
+ body = body if isinstance(body, str) else str(body, "utf-8")
+
+ resp = {
+ "method": request.method,
+ "url": request.url,
+ "headers": dict(request.headers),
+ "cookies": request._cookies.get_dict(),
+ "body": body,
+ # "body_json": dict(parse_qsl(request.body)),
+ }
+ return (200, self.common_headers, json.dumps(resp))
+
+ def _cb_password_auth(self, request):
+ form_data_dict = dict(parse_qsl(request.body))
+
+ # if this user should not be authenticated
+ if form_data_dict.get("username") == "bad_auth":
+ return (
+ 401,
+ self.common_headers,
+ json.dumps({"data": None}),
+ )
+ # if this user requires OTP and it is not included
+ if form_data_dict.get("username") == "otp" and form_data_dict.get("otp") is None:
+ return (
+ 200,
+ self.common_headers,
+ json.dumps(
+ {
+ "data": {
+ "ticket": "otp_ticket",
+ "CSRFPreventionToken": "CSRFPreventionToken",
+ "NeedTFA": 1,
+ }
+ }
+ ),
+ )
+
+ # if this is the first ticket
+ if form_data_dict.get("password") != "ticket":
+ return (
+ 200,
+ self.common_headers,
+ json.dumps(
+ {"data": {"ticket": "ticket", "CSRFPreventionToken": "CSRFPreventionToken"}}
+ ),
+ )
+ # if this is refreshing the ticket, return new ticket
+ else:
+ return (
+ 200,
+ self.common_headers,
+ json.dumps(
+ {
+ "data": {
+ "ticket": "new_ticket",
+ "CSRFPreventionToken": "CSRFPreventionToken_2",
+ }
+ }
+ ),
+ )
+
+ def _cb_task_status(self, request):
+ resp = {}
+ if "keep-running" in request.url:
+ resp = {
+ "data": {
+ "id": "110",
+ "pid": 1044989,
+ "node": "node1",
+ "pstart": 284768076,
+ "status": "running",
+ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running",
+ "starttime": 1661825068,
+ "user": "root@pam",
+ "type": "vzdump",
+ }
+ }
+
+ elif "stopped" in request.url:
+ resp = {
+ "data": {
+ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped",
+ "starttime": 1661825068,
+ "user": "root@pam",
+ "type": "vzdump",
+ "pstart": 284768076,
+ "status": "stopped",
+ "exitstatus": "interrupted by signal",
+ "pid": 1044989,
+ "id": "110",
+ "node": "node1",
+ }
+ }
+
+ elif "done" in request.url:
+ resp = {
+ "data": {
+ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
+ "starttime": 1661825068,
+ "user": "root@pam",
+ "type": "vzdump",
+ "pstart": 284768076,
+ "status": "stopped",
+ "exitstatus": "OK",
+ "pid": 1044989,
+ "id": "110",
+ "node": "node1",
+ }
+ }
+
+ elif "comment" in request.url:
+ resp = {
+ "data": {
+ "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment",
+ "node": "node",
+ "pid": 0,
+ "pstart": 0,
+ "starttime": 0,
+ "type": "task",
+ "id": "id",
+ "user": "root@pam",
+ "status": "stopped",
+ "exitstatus": "OK",
+ }
+ }
+
+ return (200, self.common_headers, json.dumps(resp))
+
+ def _cb_url_metadata(self, request):
+ form_data_dict = dict(parse_qsl((urlparse(request.url)).query))
+
+ if "file.iso" in form_data_dict.get("url", ""):
+ return (
+ 200,
+ self.common_headers,
+ json.dumps(
+ {
+ "data": {
+ "size": 123456,
+ "filename": "file.iso",
+ "mimetype": "application/x-iso9660-image",
+ # "mimetype": "application/octet-stream",
+ },
+ "success": 1,
+ }
+ ),
+ )
+ elif "invalid.iso" in form_data_dict.get("url", ""):
+ return (
+ 500,
+ self.common_headers,
+ json.dumps(
+ {
+ "status": 500,
+ "message": "invalid server response: '500 Can't connect to sub.domain.tld:443 (certificate verify failed)'\n",
+ "success": 0,
+ "data": None,
+ }
+ ),
+ )
+ elif "missing.iso" in form_data_dict.get("url", ""):
+ return (
+ 500,
+ self.common_headers,
+ json.dumps(
+ {
+ "status": 500,
+ "success": 0,
+ "message": "invalid server response: '404 Not Found'\n",
+ "data": None,
+ }
+ ),
+ )
+
+ elif "index.html" in form_data_dict.get("url", ""):
+ return (
+ 200,
+ self.common_headers,
+ json.dumps(
+ {
+ "success": 1,
+ "data": {"filename": "index.html", "mimetype": "text/html", "size": 17664},
+ }
+ ),
+ )
+
+ def _cb_qemu_monitor(self, request):
+ body = request.body
+ if body is not None:
+ body = body if isinstance(body, str) else str(body, "utf-8")
+
+ # if the command is an array, throw the type error PVE would throw
+ if "&" in body:
+ return (
+ 400,
+ self.common_headers,
+ json.dumps(
+ {
+ "data": None,
+ "errors": {"command": "type check ('string') failed - got ARRAY"},
+ }
+ ),
+ )
+ else:
+ resp = {
+ "method": request.method,
+ "url": request.url,
+ "headers": dict(request.headers),
+ "cookies": request._cookies.get_dict(),
+ "body": body,
+ # "body_json": dict(parse_qsl(request.body)),
+ }
+ print(resp)
+ return (200, self.common_headers, json.dumps(resp))
diff -ruN tests/files_mock.py proxmoxer-2.2.0/tests/files_mock.py
--- tests/files_mock.py 1970-01-01 01:00:00.000000000 +0100
+++ proxmoxer-2.2.0/tests/files_mock.py 2024-12-15 02:12:42.000000000 +0000
@@ -0,0 +1,127 @@
+__author__ = "John Hollowell"
+__copyright__ = "(c) John Hollowell 2022"
+__license__ = "MIT"
+
+import re
+
+import pytest
+import responses
+from requests import exceptions
+
+from .api_mock import PVERegistry
+
+
+@pytest.fixture()
+def mock_files():
+ with responses.RequestsMock(
+ registry=FilesRegistry, assert_all_requests_are_fired=False
+ ) as rsps:
+ yield rsps
+
+
+class FilesRegistry(responses.registries.FirstMatchRegistry):
+ base_url = "https://sub.domain.tld"
+
+ common_headers = {
+ "Cache-Control": "max-age=0",
+ "Connection": "close, Keep-Alive",
+ "Pragma": "no-cache",
+ "Server": "pve-api-daemon/3.0",
+ "Content-Type": "application/json;charset=UTF-8",
+ }
+
+ def __init__(self):
+ super().__init__()
+ for resp in self._generate_static_responses():
+ self.add(resp)
+
+ def _generate_static_responses(self):
+ resps = []
+
+ # Basic GET requests
+ resps.append(responses.Response(method="GET", url=self.base_url, body="hello world"))
+ resps.append(
+ responses.Response(method="GET", url=self.base_url + "/file.iso", body="CONTENTS")
+ )
+
+ # sibling
+ resps.append(
+ responses.Response(
+ method="GET", url=self.base_url + "/sibling/file.iso", body="CONTENTS\n"
+ )
+ )
+ resps.append(
+ responses.Response(
+ method="GET",
+ url=self.base_url + "/sibling/TESTINGSUMS",
+ body="this_is_the_hash file.iso",
+ )
+ )
+
+ # extension
+ resps.append(
+ responses.Response(
+ method="GET", url=self.base_url + "/extension/file.iso", body="CONTENTS\n"
+ )
+ )
+ resps.append(
+ responses.Response(
+ method="GET",
+ url=self.base_url + "/extension/file.iso.testing",
+ body="this_is_the_hash file.iso",
+ )
+ )
+ resps.append(
+ responses.Response(
+ method="GET",
+ url=self.base_url + "/extension/connectionerror.iso.testing",
+ body=exceptions.ConnectionError(),
+ )
+ )
+ resps.append(
+ responses.Response(
+ method="GET",
+ url=self.base_url + "/extension/readtimeout.iso.testing",
+ body=exceptions.ReadTimeout(),
+ )
+ )
+
+ # extension upper
+ resps.append(
+ responses.Response(
+ method="GET", url=self.base_url + "/upper/file.iso", body="CONTENTS\n"
+ )
+ )
+ resps.append(
+ responses.Response(
+ method="GET",
+ url=self.base_url + "/upper/file.iso.TESTING",
+ body="this_is_the_hash file.iso",
+ )
+ )
+
+ resps.append(
+ responses.Response(
+ method="GET",
+ url=re.compile(self.base_url + r"/checksums/file.iso.\w+"),
+ body="1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 file.iso",
+ )
+ )
+
+ return resps
+
+
+@pytest.fixture()
+def mock_files_and_pve():
+ with responses.RequestsMock(registry=BothRegistry, assert_all_requests_are_fired=False) as rsps:
+ yield rsps
+
+
+class BothRegistry(responses.registries.FirstMatchRegistry):
+ def __init__(self):
+ super().__init__()
+ registries = [FilesRegistry(), PVERegistry()]
+
+ for reg in registries:
+ for resp in reg.registered:
+ self.add(resp)
diff -ruN tests/tools/__init__.py proxmoxer-2.2.0/tests/tools/__init__.py
--- tests/tools/__init__.py 1970-01-01 01:00:00.000000000 +0100
+++ proxmoxer-2.2.0/tests/tools/__init__.py 2024-12-15 02:12:42.000000000 +0000
@@ -0,0 +1,3 @@
+__author__ = "John Hollowell"
+__copyright__ = "(c) John Hollowell 2022"
+__license__ = "MIT"
diff -ruN tests/tools/test_files.py proxmoxer-2.2.0/tests/tools/test_files.py
--- tests/tools/test_files.py 1970-01-01 01:00:00.000000000 +0100
+++ proxmoxer-2.2.0/tests/tools/test_files.py 2024-12-15 02:12:42.000000000 +0000
@@ -0,0 +1,375 @@
+__author__ = "John Hollowell"
+__copyright__ = "(c) John Hollowell 2023"
+__license__ = "MIT"
+
+import logging
+import tempfile
+from unittest import mock
+
+import pytest
+
+from proxmoxer import ProxmoxAPI, core
+from proxmoxer.tools import ChecksumInfo, Files, SupportedChecksums
+
+from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401
+from ..files_mock import ( # pylint: disable=unused-import # noqa: F401
+ mock_files,
+ mock_files_and_pve,
+)
+
+MODULE_LOGGER_NAME = "proxmoxer.tools.files"
+
+
+class TestChecksumInfo:
+ def test_basic(self):
+ info = ChecksumInfo("name", 123)
+
+ assert info.name == "name"
+ assert info.hex_size == 123
+
+ def test_str(self):
+ info = ChecksumInfo("name", 123)
+
+ assert str(info) == "name"
+
+ def test_repr(self):
+ info = ChecksumInfo("name", 123)
+
+ assert repr(info) == "name (123 digits)"
+
+
+class TestGetChecksum:
+ def test_get_checksum_from_sibling_file_success(self, mock_files):
+ url = "https://sub.domain.tld/sibling/file.iso"
+ exp_hash = "this_is_the_hash"
+ info = ChecksumInfo("testing", 16)
+ res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
+ res2 = Files._get_checksum_from_sibling_file(url, checksum_info=info, filename="file.iso")
+
+ assert res1 == exp_hash
+ assert res2 == exp_hash
+
+ def test_get_checksum_from_sibling_file_fail(self, mock_files):
+ url = "https://sub.domain.tld/sibling/missing.iso"
+ info = ChecksumInfo("testing", 16)
+ res1 = Files._get_checksum_from_sibling_file(url, checksum_info=info)
+ res2 = Files._get_checksum_from_sibling_file(
+ url, checksum_info=info, filename="missing.iso"
+ )
+
+ assert res1 is None
+ assert res2 is None
+
+ def test_get_checksum_from_extension_success(self, mock_files):
+ url = "https://sub.domain.tld/extension/file.iso"
+ exp_hash = "this_is_the_hash"
+ info = ChecksumInfo("testing", 16)
+ res1 = Files._get_checksum_from_extension(url, checksum_info=info)
+ res2 = Files._get_checksum_from_extension(url, checksum_info=info, filename="file.iso")
+
+ assert res1 == exp_hash
+ assert res2 == exp_hash
+
+ def test_get_checksum_from_extension_fail(self, mock_files):
+ url = "https://sub.domain.tld/extension/missing.iso"
+
+ info = ChecksumInfo("testing", 16)
+ res1 = Files._get_checksum_from_extension(url, checksum_info=info)
+ res2 = Files._get_checksum_from_extension(
+ url, checksum_info=info, filename="connectionerror.iso"
+ )
+ res3 = Files._get_checksum_from_extension(
+ url, checksum_info=info, filename="readtimeout.iso"
+ )
+
+ assert res1 is None
+ assert res2 is None
+ assert res3 is None
+
+ def test_get_checksum_from_extension_upper_success(self, mock_files):
+ url = "https://sub.domain.tld/upper/file.iso"
+ exp_hash = "this_is_the_hash"
+ info = ChecksumInfo("testing", 16)
+ res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
+ res2 = Files._get_checksum_from_extension_upper(
+ url, checksum_info=info, filename="file.iso"
+ )
+
+ assert res1 == exp_hash
+ assert res2 == exp_hash
+
+ def test_get_checksum_from_extension_upper_fail(self, mock_files):
+ url = "https://sub.domain.tld/upper/missing.iso"
+ info = ChecksumInfo("testing", 16)
+ res1 = Files._get_checksum_from_extension_upper(url, checksum_info=info)
+ res2 = Files._get_checksum_from_extension_upper(
+ url, checksum_info=info, filename="missing.iso"
+ )
+
+ assert res1 is None
+ assert res2 is None
+
+ def test_get_checksums_from_file_url_all_checksums(self, mock_files):
+ base_url = "https://sub.domain.tld/checksums/file.iso"
+ full_checksum_string = "1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+ for types_enum in SupportedChecksums:
+ checksum_info = types_enum.value
+
+ data = Files.get_checksums_from_file_url(base_url, preferred_type=checksum_info)
+
+ assert data[0] == full_checksum_string[0 : checksum_info.hex_size]
+ assert data[1] == checksum_info
+
+ def test_get_checksums_from_file_url_missing(self, mock_files):
+ url = "https://sub.domain.tld/missing.iso"
+
+ data = Files.get_checksums_from_file_url(url)
+
+ assert data[0] is None
+ assert data[1] is None
+
+
+class TestFiles:
+ prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
+
+ def test_init_basic(self):
+ f = Files(self.prox, "node1", "storage1")
+
+ assert f._prox == self.prox
+ assert f._node == "node1"
+ assert f._storage == "storage1"
+
+ def test_repr(self):
+ f = Files(self.prox, "node1", "storage1")
+ assert (
+ repr(f)
+ == "Files (node1/storage1 at ProxmoxAPI (https backend for https://1.2.3.4:1234/api2/json))"
+ )
+
+ def test_get_file_info_pass(self, mock_pve):
+ f = Files(self.prox, "node1", "storage1")
+ info = f.get_file_info("https://sub.domain.tld/file.iso")
+
+ assert info["filename"] == "file.iso"
+ assert info["mimetype"] == "application/x-iso9660-image"
+ assert info["size"] == 123456
+
+ def test_get_file_info_fail(self, mock_pve):
+ f = Files(self.prox, "node1", "storage1")
+ info = f.get_file_info("https://sub.domain.tld/invalid.iso")
+
+ assert info is None
+
+
+class TestFilesDownload:
+ prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
+ f = Files(prox, "node1", "storage1")
+
+ def test_download_discover_checksum(self, mock_files_and_pve, caplog):
+ status = self.f.download_file_to_storage("https://sub.domain.tld/checksums/file.iso")
+
+ # this is the default "done" task mock information
+ assert status == {
+ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
+ "starttime": 1661825068,
+ "user": "root@pam",
+ "type": "vzdump",
+ "pstart": 284768076,
+ "status": "stopped",
+ "exitstatus": "OK",
+ "pid": 1044989,
+ "id": "110",
+ "node": "node1",
+ }
+ assert caplog.record_tuples == []
+
+ def test_download_no_blocking(self, mock_files_and_pve, caplog):
+ status = self.f.download_file_to_storage(
+ "https://sub.domain.tld/checksums/file.iso", blocking_status=False
+ )
+
+ # this is the default "done" task mock information
+ assert status == {
+ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
+ "starttime": 1661825068,
+ "user": "root@pam",
+ "type": "vzdump",
+ "pstart": 284768076,
+ "status": "stopped",
+ "exitstatus": "OK",
+ "pid": 1044989,
+ "id": "110",
+ "node": "node1",
+ }
+ assert caplog.record_tuples == []
+
+ def test_download_no_discover_checksum(self, mock_files_and_pve, caplog):
+ caplog.set_level(logging.WARNING, logger=MODULE_LOGGER_NAME)
+
+ status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso")
+
+ # this is the default "stopped" task mock information
+ assert status == {
+ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
+ "starttime": 1661825068,
+ "user": "root@pam",
+ "type": "vzdump",
+ "pstart": 284768076,
+ "status": "stopped",
+ "exitstatus": "OK",
+ "pid": 1044989,
+ "id": "110",
+ "node": "node1",
+ }
+ assert caplog.record_tuples == [
+ (
+ MODULE_LOGGER_NAME,
+ logging.WARNING,
+ "Unable to discover checksum. Will not do checksum validation",
+ ),
+ ]
+
+ def test_uneven_checksum(self, caplog, mock_files_and_pve):
+ caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
+ status = self.f.download_file_to_storage("https://sub.domain.tld/file.iso", checksum="asdf")
+
+ assert status is None
+
+ assert caplog.record_tuples == [
+ (
+ MODULE_LOGGER_NAME,
+ logging.ERROR,
+ "Must pass both checksum and checksum_type or leave both None for auto-discovery",
+ ),
+ ]
+
+ def test_uneven_checksum_type(self, caplog, mock_files_and_pve):
+ caplog.set_level(logging.DEBUG, logger=MODULE_LOGGER_NAME)
+ status = self.f.download_file_to_storage(
+ "https://sub.domain.tld/file.iso", checksum_type="asdf"
+ )
+
+ assert status is None
+
+ assert caplog.record_tuples == [
+ (
+ MODULE_LOGGER_NAME,
+ logging.ERROR,
+ "Must pass both checksum and checksum_type or leave both None for auto-discovery",
+ ),
+ ]
+
+ def test_get_file_info_missing(self, mock_pve):
+ f = Files(self.prox, "node1", "storage1")
+ info = f.get_file_info("https://sub.domain.tld/missing.iso")
+
+ assert info is None
+
+ def test_get_file_info_non_iso(self, mock_pve):
+ f = Files(self.prox, "node1", "storage1")
+ info = f.get_file_info("https://sub.domain.tld/index.html")
+
+ assert info["filename"] == "index.html"
+ assert info["mimetype"] == "text/html"
+
+
+class TestFilesUpload:
+ prox = ProxmoxAPI("1.2.3.4:1234", token_name="name", token_value="value")
+ f = Files(prox, "node1", "storage1")
+
+ def test_upload_no_file(self, mock_files_and_pve, caplog):
+ status = self.f.upload_local_file_to_storage("/does-not-exist.iso")
+
+ assert status is None
+ assert caplog.record_tuples == [
+ (
+ MODULE_LOGGER_NAME,
+ logging.ERROR,
+ '"/does-not-exist.iso" does not exist or is not a file',
+ ),
+ ]
+
+ def test_upload_dir(self, mock_files_and_pve, caplog):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ status = self.f.upload_local_file_to_storage(tmp_dir)
+
+ assert status is None
+ assert caplog.record_tuples == [
+ (
+ MODULE_LOGGER_NAME,
+ logging.ERROR,
+ f'"{tmp_dir}" does not exist or is not a file',
+ ),
+ ]
+
+ def test_upload_empty_file(self, mock_files_and_pve, caplog):
+ with tempfile.NamedTemporaryFile("rb") as f_obj:
+ status = self.f.upload_local_file_to_storage(filename=f_obj.name)
+
+ assert status is not None
+ assert caplog.record_tuples == []
+
+ def test_upload_non_empty_file(self, mock_files_and_pve, caplog):
+ with tempfile.NamedTemporaryFile("w+b") as f_obj:
+ f_obj.write(b"a" * 100)
+ f_obj.seek(0)
+ status = self.f.upload_local_file_to_storage(filename=f_obj.name)
+
+ assert status is not None
+ assert caplog.record_tuples == []
+
+ def test_upload_no_checksum(self, mock_files_and_pve, caplog):
+ with tempfile.NamedTemporaryFile("rb") as f_obj:
+ status = self.f.upload_local_file_to_storage(
+ filename=f_obj.name, do_checksum_check=False
+ )
+
+ assert status is not None
+ assert caplog.record_tuples == []
+
+ def test_upload_checksum_unavailable(self, mock_files_and_pve, caplog, apply_no_checksums):
+ with tempfile.NamedTemporaryFile("rb") as f_obj:
+ status = self.f.upload_local_file_to_storage(filename=f_obj.name)
+
+ assert status is not None
+ assert caplog.record_tuples == [
+ (
+ MODULE_LOGGER_NAME,
+ logging.WARNING,
+ "There are no Proxmox supported checksums which are supported by hashlib. Skipping checksum validation",
+ )
+ ]
+
+ def test_upload_non_blocking(self, mock_files_and_pve, caplog):
+ with tempfile.NamedTemporaryFile("rb") as f_obj:
+ status = self.f.upload_local_file_to_storage(filename=f_obj.name, blocking_status=False)
+
+ assert status is not None
+ assert caplog.record_tuples == []
+
+ def test_upload_proxmox_error(self, mock_files_and_pve, caplog):
+ with tempfile.NamedTemporaryFile("rb") as f_obj:
+ f_copy = Files(self.f._prox, self.f._node, "missing")
+
+ with pytest.raises(core.ResourceException) as exc_info:
+ f_copy.upload_local_file_to_storage(filename=f_obj.name)
+
+ assert exc_info.value.status_code == 500
+ assert exc_info.value.status_message == "Internal Server Error"
+ # assert exc_info.value.content == "storage 'missing' does not exist"
+
+ def test_upload_io_error(self, mock_files_and_pve, caplog):
+ with tempfile.NamedTemporaryFile("rb") as f_obj:
+ mo = mock.mock_open()
+ mo.side_effect = IOError("ERROR MESSAGE")
+ with mock.patch("builtins.open", mo):
+ status = self.f.upload_local_file_to_storage(filename=f_obj.name)
+
+ assert status is None
+ assert caplog.record_tuples == [(MODULE_LOGGER_NAME, logging.ERROR, "ERROR MESSAGE")]
+
+
+@pytest.fixture
+def apply_no_checksums():
+ with mock.patch("hashlib.algorithms_available", set()):
+ yield
diff -ruN tests/tools/test_tasks.py proxmoxer-2.2.0/tests/tools/test_tasks.py
--- tests/tools/test_tasks.py 1970-01-01 01:00:00.000000000 +0100
+++ proxmoxer-2.2.0/tests/tools/test_tasks.py 2024-12-15 02:12:42.000000000 +0000
@@ -0,0 +1,223 @@
+__author__ = "John Hollowell"
+__copyright__ = "(c) John Hollowell 2022"
+__license__ = "MIT"
+
+import logging
+
+import pytest
+
+from proxmoxer import ProxmoxAPI
+from proxmoxer.tools import Tasks
+
+from ..api_mock import mock_pve # pylint: disable=unused-import # noqa: F401
+
+
+class TestBlockingStatus:
+ def test_basic(self, mocked_prox, caplog):
+ caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
+
+ status = Tasks.blocking_status(
+ mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done"
+ )
+
+ assert status == {
+ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done",
+ "starttime": 1661825068,
+ "user": "root@pam",
+ "type": "vzdump",
+ "pstart": 284768076,
+ "status": "stopped",
+ "exitstatus": "OK",
+ "pid": 1044989,
+ "id": "110",
+ "node": "node1",
+ }
+ assert caplog.record_tuples == [
+ (
+ "proxmoxer.core",
+ 20,
+ "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done/status",
+ ),
+ (
+ "proxmoxer.core",
+ 10,
+ 'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:done", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "OK", "pid": 1044989, "id": "110", "node": "node1"}}\'',
+ ),
+ ]
+
+ def test_zeroed(self, mocked_prox, caplog):
+ caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
+
+ status = Tasks.blocking_status(
+ mocked_prox, "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment"
+ )
+
+ assert status == {
+ "upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment",
+ "node": "node",
+ "pid": 0,
+ "pstart": 0,
+ "starttime": 0,
+ "type": "task",
+ "id": "id",
+ "user": "root@pam",
+ "status": "stopped",
+ "exitstatus": "OK",
+ }
+ assert caplog.record_tuples == [
+ (
+ "proxmoxer.core",
+ 20,
+ "GET https://1.2.3.4:1234/api2/json/nodes/node/tasks/UPID:node:00000000:00000000:00000000:task:id:root@pam:comment/status",
+ ),
+ (
+ "proxmoxer.core",
+ 10,
+ 'Status code: 200, output: b\'{"data": {"upid": "UPID:node:00000000:00000000:00000000:task:id:root@pam:comment", "node": "node", "pid": 0, "pstart": 0, "starttime": 0, "type": "task", "id": "id", "user": "root@pam", "status": "stopped", "exitstatus": "OK"}}\'',
+ ),
+ ]
+
+ def test_killed(self, mocked_prox, caplog):
+ caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
+
+ status = Tasks.blocking_status(
+ mocked_prox, "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped"
+ )
+
+ assert status == {
+ "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped",
+ "starttime": 1661825068,
+ "user": "root@pam",
+ "type": "vzdump",
+ "pstart": 284768076,
+ "status": "stopped",
+ "exitstatus": "interrupted by signal",
+ "pid": 1044989,
+ "id": "110",
+ "node": "node1",
+ }
+ assert caplog.record_tuples == [
+ (
+ "proxmoxer.core",
+ 20,
+ "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped/status",
+ ),
+ (
+ "proxmoxer.core",
+ 10,
+ 'Status code: 200, output: b\'{"data": {"upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:stopped", "starttime": 1661825068, "user": "root@pam", "type": "vzdump", "pstart": 284768076, "status": "stopped", "exitstatus": "interrupted by signal", "pid": 1044989, "id": "110", "node": "node1"}}\'',
+ ),
+ ]
+
+ def test_timeout(self, mocked_prox, caplog):
+ caplog.set_level(logging.DEBUG, logger="proxmoxer.core")
+
+ status = Tasks.blocking_status(
+ mocked_prox,
+ "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running",
+ timeout=0.021,
+ polling_interval=0.01,
+ )
+
+ assert status is None
+ assert caplog.record_tuples == [
+ (
+ "proxmoxer.core",
+ 20,
+ "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status",
+ ),
+ (
+ "proxmoxer.core",
+ 10,
+ 'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'',
+ ),
+ (
+ "proxmoxer.core",
+ 20,
+ "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status",
+ ),
+ (
+ "proxmoxer.core",
+ 10,
+ 'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'',
+ ),
+ (
+ "proxmoxer.core",
+ 20,
+ "GET https://1.2.3.4:1234/api2/json/nodes/node1/tasks/UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running/status",
+ ),
+ (
+ "proxmoxer.core",
+ 10,
+ 'Status code: 200, output: b\'{"data": {"id": "110", "pid": 1044989, "node": "node1", "pstart": 284768076, "status": "running", "upid": "UPID:node1:000FF1FD:10F9374C:630D702C:vzdump:110:root@pam:keep-running", "starttime": 1661825068, "user": "root@pam", "type": "vzdump"}}\'',
+ ),
+ ]
+
+
+class TestDecodeUpid:
+ def test_basic(self):
+ upid = "UPID:node:000CFC5C:03E8D0C3:6194806C:aptupdate::root@pam:"
+ decoded = Tasks.decode_upid(upid)
+
+ assert decoded["upid"] == upid
+ assert decoded["node"] == "node"
+ assert decoded["pid"] == 851036
+ assert decoded["pstart"] == 65589443
+ assert decoded["starttime"] == 1637122156
+ assert decoded["type"] == "aptupdate"
+ assert decoded["id"] == ""
+ assert decoded["user"] == "root@pam"
+ assert decoded["comment"] == ""
+
+ def test_all_values(self):
+ upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:local"
+ decoded = Tasks.decode_upid(upid)
+
+ assert decoded["upid"] == upid
+ assert decoded["node"] == "node1"
+ assert decoded["pid"] == 851962
+ assert decoded["pstart"] == 65597267
+ assert decoded["starttime"] == 1637122234
+ assert decoded["type"] == "vzdump"
+ assert decoded["id"] == "103"
+ assert decoded["user"] == "root@pam"
+ assert decoded["comment"] == "local"
+
+ def test_invalid_length(self):
+ upid = "UPID:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam"
+ with pytest.raises(AssertionError) as exc_info:
+ Tasks.decode_upid(upid)
+
+ assert str(exc_info.value) == "UPID is not in the correct format"
+
+ def test_invalid_start(self):
+ upid = "ASDF:node1:000CFFFA:03E8EF53:619480BA:vzdump:103:root@pam:"
+ with pytest.raises(AssertionError) as exc_info:
+ Tasks.decode_upid(upid)
+
+ assert str(exc_info.value) == "UPID is not in the correct format"
+
+
+class TestDecodeLog:
+ def test_basic(self):
+ log_list = [{"n": 1, "t": "client connection: 127.0.0.1:49608"}, {"t": "TASK OK", "n": 2}]
+ log_str = Tasks.decode_log(log_list)
+
+ assert log_str == "client connection: 127.0.0.1:49608\nTASK OK"
+
+ def test_empty(self):
+ log_list = []
+ log_str = Tasks.decode_log(log_list)
+
+ assert log_str == ""
+
+ def test_unordered(self):
+ log_list = [{"n": 3, "t": "third"}, {"t": "first", "n": 1}, {"t": "second", "n": 2}]
+ log_str = Tasks.decode_log(log_list)
+
+ assert log_str == "first\nsecond\nthird"
+
+
+@pytest.fixture
+def mocked_prox(mock_pve):
+ return ProxmoxAPI("1.2.3.4:1234", user="user", password="password")