# functions.py from pathlib import Path import logging import os import gzip import tarfile import shutil from typing import List, Tuple # Import Costants: (ROOT_DIR, JSON_LIST, JSON_CONF, HOST_BACKUP_FOLDER, DATETODAY, ...) from constants import * _LOG = logging.getLogger(__name__) def default_backup_dir() -> None: """ Ensure the host backup folder exists. """ try: Path(HOST_BACKUP_FOLDER).mkdir(parents=True, exist_ok=True) _LOG.info("Backup base directory ensured: %s", HOST_BACKUP_FOLDER) except Exception: _LOG.exception("Failed to create HOST_BACKUP_FOLDER: %s", HOST_BACKUP_FOLDER) ## Backup files rotation def autorotate_backups(dry_run: bool = False) -> Tuple[int, int]: """ Rotate backup files in each immediate subfolder of HOST_BACKUP_FOLDER. Behavior: - For each immediate subfolder, find files matching *.gz (this includes .tar.gz), sort them by modification time (newest first), keep the first `keep_backups` and delete the older ones. - If dry_run is True, only log what would be deleted. Returns: (candidates_found, actually_deleted) """ base = Path(HOST_BACKUP_FOLDER) if not base.exists(): _LOG.error("HOST_BACKUP_FOLDER does not exist: %s", base) return 0, 0 try: keep = int(JSON_CONF.get("keep_backups", 7)) except Exception: keep = 7 _LOG.warning("Invalid keep_backups value in config, falling back to %d", keep) total_candidates = 0 total_deleted = 0 # immediate subdirectories targets = sorted([p for p in base.iterdir() if p.is_dir()]) if not targets: _LOG.info("No subfolders found in HOST_BACKUP_FOLDER: %s", base) return 0, 0 for folder in targets: try: backups = sorted( (f for f in folder.glob("*.gz") if f.is_file()), key=lambda f: f.stat().st_mtime, reverse=True ) except Exception: _LOG.exception("Failed to list backups in folder: %s", folder) continue old_backups = backups[keep:] _LOG.info("Folder: %s", folder) _LOG.info(" Total backups found: %d", len(backups)) _LOG.info(" Keep: %d", keep) _LOG.info(" Old backups to remove: %d", len(old_backups)) for b in old_backups: _LOG.info(" Candidate for removal: %s", b) if not dry_run and old_backups: for b in old_backups: try: b.unlink() total_deleted += 1 _LOG.info(" -> deleted: %s", b) except Exception: _LOG.exception(" -> failed to delete: %s", b) total_candidates += len(old_backups) _LOG.info("Rotation summary: candidates_found=%d, actually_deleted=%d (dry_run=%s)", total_candidates, total_deleted, dry_run) return total_candidates, total_deleted ## Show what is enabled in the file json def show_enabled() -> None: """ Log enabled and disabled paths defined in JSON_LIST. """ _LOG.info("### ENABLED PATHS ###") for entry in JSON_LIST: try: path, flag, name = entry except Exception: _LOG.warning("Malformed entry in dir_backups.json: %s", entry) continue if flag and int(flag) > 0: _LOG.info("- %s (name: %s)", path, name) print("") _LOG.info("### DISABLED PATHS ###") for entry in JSON_LIST: try: path, flag, name = entry except Exception: continue if int(flag) == 0: _LOG.info("- %s (name: %s)", path, name) ## Check if the declared folder exists def check_existing_folders(debug: str = "off") -> List[Tuple[Path, str, str]]: """ Check which enabled paths exist and classify them as 'folder' or 'file'. Returns a list of tuples: (Path(path), name, "folder"|"file") If a path is a directory, it is considered valid only if it contains at least one entry. """ checked_paths: List[Tuple[Path, str, str]] = [] correct_folder: List[str] = [] correct_file: List[str] = [] notexists: List[str] = [] empty: List[str] = [] for entry in JSON_LIST: try: path_str, flag, namepath = entry except Exception: _LOG.warning("Skipping malformed entry: %s", entry) continue try: if int(flag) != 1: continue except Exception: _LOG.warning("Invalid flag for entry %s, skipping", entry) continue pathnow = Path(path_str) if pathnow.exists(): try: if pathnow.is_dir(): try: # consider non-empty directory only if any(pathnow.iterdir()): checked_paths.append((pathnow, namepath, "folder")) correct_folder.append(f"- Folder exists: {pathnow}") else: empty.append(f"- Empty folder: {pathnow}") except PermissionError: _LOG.warning("Permission denied reading directory: %s", pathnow) empty.append(f"- Unreadable/empty folder: {pathnow}") elif pathnow.is_file(): checked_paths.append((pathnow, namepath, "file")) correct_file.append(f"- File exists: {pathnow}") else: empty.append(f"- Special file / unknown type: {pathnow}") except Exception: _LOG.exception("Error while checking path: %s", pathnow) else: notexists.append(f"- Path does not exist: {pathnow}") if debug == "on": _LOG.debug("###### CHECKING EXISTING FOLDERS/FILES ######") _LOG.debug("# FOLDERS CHECK OK - [ %d ]", len(correct_folder)) for folder in correct_folder: _LOG.debug(folder) _LOG.debug("# FILES CHECK OK - [ %d ]", len(correct_file)) for file in correct_file: _LOG.debug(file) _LOG.debug("# FOLDERS EMPTY - [ %d ]", len(empty)) for emptyfold in empty: _LOG.debug(emptyfold) _LOG.debug("# FILES / FOLDERS NOT EXISTS - [ %d ]", len(notexists)) for not_exists in notexists: _LOG.debug(not_exists) return checked_paths ## Backups action def backups_now(debug: str = "off") -> None: """ Perform backups for each valid path discovered by check_existing_folders. - Directories are archived as tar.gz - Single files are compressed as .gz If debug == "on", additional logging is emitted. """ listnow = check_existing_folders(debug=debug) base_backup = Path(HOST_BACKUP_FOLDER) try: base_backup.mkdir(parents=True, exist_ok=True) except Exception: _LOG.exception("Failed to ensure base backup directory: %s", base_backup) return date_str = str(DATETODAY) # DATETODAY is provided by constants.py (date object) for path, name, backtype in listnow: pathbackup = base_backup / name try: pathbackup.mkdir(parents=True, exist_ok=True) except Exception: _LOG.exception("Failed to create backup subfolder: %s", pathbackup) continue if backtype == "folder": tar_filename = f"{name}_{date_str}.tar.gz" tar_path = pathbackup / tar_filename if tar_path.exists(): _LOG.info("Folder backup already exists, skipping: %s", tar_path) continue _LOG.info("Backing up folder: %s -> %s", path, tar_path) try: # create a tar.gz archive; arcname preserves only the folder name with tarfile.open(tar_path, "w:gz") as tar: tar.add(path, arcname=path.name) _LOG.info("Successfully created archive: %s", tar_path) except Exception: _LOG.exception("Failed to create tar.gz for folder: %s", path) elif backtype == "file": gz_filename = f"{name}_{date_str}.gz" gz_path = pathbackup / gz_filename if gz_path.exists(): _LOG.info("File backup already exists, skipping: %s", gz_path) continue _LOG.info("Backing up file: %s -> %s", path, gz_path) try: # open source file and compress into gzip file with open(path, "rb") as f_in, gzip.open(gz_path, "wb") as f_out: shutil.copyfileobj(f_in, f_out) _LOG.info("Successfully created gzip: %s", gz_path) except Exception: _LOG.exception("Failed to create gzip for file: %s", path) else: _LOG.warning("Unknown backtype '%s' for path: %s", backtype, path)