Added Logger + Fixed Code + README and README_IT
This commit is contained in:
304
functions.py
304
functions.py
@@ -1,167 +1,257 @@
|
||||
from constants import *
|
||||
# functions.py
|
||||
from pathlib import Path
|
||||
import os, gzip, tarfile, shutil
|
||||
|
||||
## Create the backup default folders
|
||||
def default_backup_dir():
|
||||
os.makedirs(HOST_BACKUP_FOLDER, exist_ok=True)
|
||||
|
||||
from pathlib import Path
|
||||
from constants import *
|
||||
import logging
|
||||
import os
|
||||
import gzip
|
||||
import tarfile
|
||||
import shutil
|
||||
from typing import List, Tuple
|
||||
|
||||
def autorotate_backups(dry_run: bool = False):
|
||||
"""
|
||||
Scansiona tutte le sottocartelle immediate di HOST_BACKUP_FOLDER.
|
||||
Per ogni sottocartella prende i file *.gz (inclusi .tar.gz), li ordina
|
||||
per mtime (più nuovi prima), mantiene i primi `keep_backups` e rimuove
|
||||
gli altri (a meno che dry_run==True).
|
||||
Restituisce (candidates_found, actually_deleted).
|
||||
"""
|
||||
# Import Costants: (ROOT_DIR, JSON_LIST, JSON_CONF, HOST_BACKUP_FOLDER, DATETODAY, ...)
|
||||
from constants import *
|
||||
|
||||
_LOG = logging.getLogger(__name__)
|
||||
|
||||
def default_backup_dir() -> None:
|
||||
"""
|
||||
Ensure the host backup folder exists.
|
||||
"""
|
||||
try:
|
||||
Path(HOST_BACKUP_FOLDER).mkdir(parents=True, exist_ok=True)
|
||||
_LOG.info("Backup base directory ensured: %s", HOST_BACKUP_FOLDER)
|
||||
except Exception:
|
||||
_LOG.exception("Failed to create HOST_BACKUP_FOLDER: %s", HOST_BACKUP_FOLDER)
|
||||
|
||||
|
||||
## Backup files rotation
|
||||
def autorotate_backups(dry_run: bool = False) -> Tuple[int, int]:
|
||||
"""
|
||||
Rotate backup files in each immediate subfolder of HOST_BACKUP_FOLDER.
|
||||
|
||||
Behavior:
|
||||
- For each immediate subfolder, find files matching *.gz (this includes .tar.gz),
|
||||
sort them by modification time (newest first), keep the first `keep_backups`
|
||||
and delete the older ones.
|
||||
- If dry_run is True, only log what would be deleted.
|
||||
|
||||
Returns:
|
||||
(candidates_found, actually_deleted)
|
||||
"""
|
||||
base = Path(HOST_BACKUP_FOLDER)
|
||||
|
||||
if not base.exists():
|
||||
print("ERROR: HOST_BACKUP_FOLDER does not exist:", base)
|
||||
_LOG.error("HOST_BACKUP_FOLDER does not exist: %s", base)
|
||||
return 0, 0
|
||||
|
||||
keep = int(JSON_CONF.get("keep_backups", 7))
|
||||
try:
|
||||
keep = int(JSON_CONF.get("keep_backups", 7))
|
||||
except Exception:
|
||||
keep = 7
|
||||
_LOG.warning("Invalid keep_backups value in config, falling back to %d", keep)
|
||||
|
||||
total_candidates = 0
|
||||
total_deleted = 0
|
||||
|
||||
# ottengo tutte le directory immediate dentro HOST_BACKUP_FOLDER
|
||||
# immediate subdirectories
|
||||
targets = sorted([p for p in base.iterdir() if p.is_dir()])
|
||||
|
||||
if not targets:
|
||||
print("No subfolders found in HOST_BACKUP_FOLDER:", base)
|
||||
_LOG.info("No subfolders found in HOST_BACKUP_FOLDER: %s", base)
|
||||
return 0, 0
|
||||
|
||||
for folder in targets:
|
||||
# prendi solo file (evita di includere directory per errore)
|
||||
backups = sorted(
|
||||
(f for f in folder.glob("*.gz") if f.is_file()),
|
||||
key=lambda f: f.stat().st_mtime,
|
||||
reverse=True
|
||||
)
|
||||
try:
|
||||
backups = sorted(
|
||||
(f for f in folder.glob("*.gz") if f.is_file()),
|
||||
key=lambda f: f.stat().st_mtime,
|
||||
reverse=True
|
||||
)
|
||||
except Exception:
|
||||
_LOG.exception("Failed to list backups in folder: %s", folder)
|
||||
continue
|
||||
|
||||
old_backups = backups[keep:]
|
||||
|
||||
print("\nFolder:", folder)
|
||||
print("Total backups:", len(backups))
|
||||
print("Keep:", keep)
|
||||
print("Old to remove:", len(old_backups))
|
||||
_LOG.info("Folder: %s", folder)
|
||||
_LOG.info(" Total backups found: %d", len(backups))
|
||||
_LOG.info(" Keep: %d", keep)
|
||||
_LOG.info(" Old backups to remove: %d", len(old_backups))
|
||||
|
||||
for b in old_backups:
|
||||
print(" Old backup:", b)
|
||||
_LOG.info(" Candidate for removal: %s", b)
|
||||
|
||||
# elimina se non dry_run
|
||||
if not dry_run and old_backups:
|
||||
for b in old_backups:
|
||||
try:
|
||||
b.unlink()
|
||||
total_deleted += 1
|
||||
print(" -> deleted")
|
||||
except Exception as e:
|
||||
print(f" -> failed to delete {b}: {e}")
|
||||
_LOG.info(" -> deleted: %s", b)
|
||||
except Exception:
|
||||
_LOG.exception(" -> failed to delete: %s", b)
|
||||
|
||||
total_candidates += len(old_backups)
|
||||
|
||||
print("\nSummary:")
|
||||
print(f" Candidates found: {total_candidates}")
|
||||
print(f" Actually deleted: {total_deleted} (dry_run={dry_run})")
|
||||
_LOG.info("Rotation summary: candidates_found=%d, actually_deleted=%d (dry_run=%s)",
|
||||
total_candidates, total_deleted, dry_run)
|
||||
|
||||
return total_candidates, total_deleted
|
||||
|
||||
|
||||
## Show what backups path are enabled or disabled
|
||||
def show_enabled():
|
||||
print()
|
||||
print("### ENABLED PATHS ###")
|
||||
for path, flag, name in JSON_LIST:
|
||||
if flag > 0:
|
||||
print(f"- {path}")
|
||||
print ("")
|
||||
print("### DISABLED PATHS ###")
|
||||
for path, flag, name in JSON_LIST:
|
||||
if flag == 0:
|
||||
print(f"- {path}")
|
||||
|
||||
|
||||
|
||||
## Checking which of the enabled path are available for a backup
|
||||
def check_existing_folders(debug="off"):
|
||||
checked_paths = []
|
||||
correct_folder = []
|
||||
correct_file = []
|
||||
notexists = []
|
||||
empty = []
|
||||
|
||||
|
||||
|
||||
for path, flag, namepath in JSON_LIST:
|
||||
if flag != 1:
|
||||
## Show what is enabled in the file json
|
||||
def show_enabled() -> None:
|
||||
"""
|
||||
Log enabled and disabled paths defined in JSON_LIST.
|
||||
"""
|
||||
_LOG.info("### ENABLED PATHS ###")
|
||||
for entry in JSON_LIST:
|
||||
try:
|
||||
path, flag, name = entry
|
||||
except Exception:
|
||||
_LOG.warning("Malformed entry in dir_backups.json: %s", entry)
|
||||
continue
|
||||
pathnow = Path(path)
|
||||
if flag and int(flag) > 0:
|
||||
_LOG.info("- %s (name: %s)", path, name)
|
||||
print("")
|
||||
_LOG.info("### DISABLED PATHS ###")
|
||||
for entry in JSON_LIST:
|
||||
try:
|
||||
path, flag, name = entry
|
||||
except Exception:
|
||||
continue
|
||||
if int(flag) == 0:
|
||||
_LOG.info("- %s (name: %s)", path, name)
|
||||
|
||||
|
||||
## Check if the declared folder exists
|
||||
def check_existing_folders(debug: str = "off") -> List[Tuple[Path, str, str]]:
|
||||
"""
|
||||
Check which enabled paths exist and classify them as 'folder' or 'file'.
|
||||
|
||||
Returns a list of tuples: (Path(path), name, "folder"|"file")
|
||||
|
||||
If a path is a directory, it is considered valid only if it contains at least one entry.
|
||||
"""
|
||||
checked_paths: List[Tuple[Path, str, str]] = []
|
||||
correct_folder: List[str] = []
|
||||
correct_file: List[str] = []
|
||||
notexists: List[str] = []
|
||||
empty: List[str] = []
|
||||
|
||||
for entry in JSON_LIST:
|
||||
try:
|
||||
path_str, flag, namepath = entry
|
||||
except Exception:
|
||||
_LOG.warning("Skipping malformed entry: %s", entry)
|
||||
continue
|
||||
|
||||
try:
|
||||
if int(flag) != 1:
|
||||
continue
|
||||
except Exception:
|
||||
_LOG.warning("Invalid flag for entry %s, skipping", entry)
|
||||
continue
|
||||
|
||||
pathnow = Path(path_str)
|
||||
|
||||
if pathnow.exists():
|
||||
if pathnow.is_dir() and any(pathnow.iterdir()):
|
||||
checked_paths.append([pathnow, namepath, "folder"])
|
||||
correct_folder.append(f"- Folder exists: {pathnow}")
|
||||
elif pathnow.is_file():
|
||||
checked_paths.append([pathnow, namepath, "file"])
|
||||
correct_file.append(f"- File exists: {pathnow}")
|
||||
else:
|
||||
empty.append(f"- Empty folder or special file: {pathnow}")
|
||||
try:
|
||||
if pathnow.is_dir():
|
||||
try:
|
||||
# consider non-empty directory only
|
||||
if any(pathnow.iterdir()):
|
||||
checked_paths.append((pathnow, namepath, "folder"))
|
||||
correct_folder.append(f"- Folder exists: {pathnow}")
|
||||
else:
|
||||
empty.append(f"- Empty folder: {pathnow}")
|
||||
except PermissionError:
|
||||
_LOG.warning("Permission denied reading directory: %s", pathnow)
|
||||
empty.append(f"- Unreadable/empty folder: {pathnow}")
|
||||
elif pathnow.is_file():
|
||||
checked_paths.append((pathnow, namepath, "file"))
|
||||
correct_file.append(f"- File exists: {pathnow}")
|
||||
else:
|
||||
empty.append(f"- Special file / unknown type: {pathnow}")
|
||||
except Exception:
|
||||
_LOG.exception("Error while checking path: %s", pathnow)
|
||||
else:
|
||||
notexists.append(f"- Path does not exist: {pathnow}")
|
||||
|
||||
if debug=="on":
|
||||
print("###### CHECKING EXISTING FOLDERS/FILES ######")
|
||||
print()
|
||||
print(f"# FOLDERS CHECK OK - [ {len(correct_folder)} ] #")
|
||||
if debug == "on":
|
||||
_LOG.debug("###### CHECKING EXISTING FOLDERS/FILES ######")
|
||||
_LOG.debug("# FOLDERS CHECK OK - [ %d ]", len(correct_folder))
|
||||
for folder in correct_folder:
|
||||
print(folder)
|
||||
print("")
|
||||
|
||||
print(f"# FILES CHECK OK - [ {len(correct_file)} ] #")
|
||||
_LOG.debug(folder)
|
||||
_LOG.debug("# FILES CHECK OK - [ %d ]", len(correct_file))
|
||||
for file in correct_file:
|
||||
print(file)
|
||||
print("")
|
||||
|
||||
print(f"# FOLDERS EMPTY - [ {len(empty)} ] #")
|
||||
_LOG.debug(file)
|
||||
_LOG.debug("# FOLDERS EMPTY - [ %d ]", len(empty))
|
||||
for emptyfold in empty:
|
||||
print(emptyfold)
|
||||
print("")
|
||||
|
||||
print(f"# FILES / FOLDERS NOT EXISTS - [ {len(notexists)} ] #")
|
||||
_LOG.debug(emptyfold)
|
||||
_LOG.debug("# FILES / FOLDERS NOT EXISTS - [ %d ]", len(notexists))
|
||||
for not_exists in notexists:
|
||||
print(not_exists)
|
||||
print("")
|
||||
_LOG.debug(not_exists)
|
||||
|
||||
return checked_paths
|
||||
|
||||
## Function available for the backup
|
||||
def backups_now(debug="off"):
|
||||
listnow = check_existing_folders()
|
||||
|
||||
## Backups action
|
||||
def backups_now(debug: str = "off") -> None:
|
||||
"""
|
||||
Perform backups for each valid path discovered by check_existing_folders.
|
||||
|
||||
- Directories are archived as tar.gz
|
||||
- Single files are compressed as .gz
|
||||
|
||||
If debug == "on", additional logging is emitted.
|
||||
"""
|
||||
listnow = check_existing_folders(debug=debug)
|
||||
base_backup = Path(HOST_BACKUP_FOLDER)
|
||||
base_backup.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
base_backup.mkdir(parents=True, exist_ok=True)
|
||||
except Exception:
|
||||
_LOG.exception("Failed to ensure base backup directory: %s", base_backup)
|
||||
return
|
||||
|
||||
date_str = str(DATETODAY) # DATETODAY is provided by constants.py (date object)
|
||||
|
||||
for path, name, backtype in listnow:
|
||||
pathbackup = base_backup / name
|
||||
pathbackup.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
pathbackup.mkdir(parents=True, exist_ok=True)
|
||||
except Exception:
|
||||
_LOG.exception("Failed to create backup subfolder: %s", pathbackup)
|
||||
continue
|
||||
|
||||
if backtype == "folder":
|
||||
tar_path = pathbackup / f"{name}_{DATETODAY}.tar.gz"
|
||||
if not tar_path.exists():
|
||||
if debug=="on":
|
||||
print(f"Backing up folder: {path}")
|
||||
tar_filename = f"{name}_{date_str}.tar.gz"
|
||||
tar_path = pathbackup / tar_filename
|
||||
if tar_path.exists():
|
||||
_LOG.info("Folder backup already exists, skipping: %s", tar_path)
|
||||
continue
|
||||
|
||||
_LOG.info("Backing up folder: %s -> %s", path, tar_path)
|
||||
try:
|
||||
# create a tar.gz archive; arcname preserves only the folder name
|
||||
with tarfile.open(tar_path, "w:gz") as tar:
|
||||
tar.add(path, arcname=path.name)
|
||||
_LOG.info("Successfully created archive: %s", tar_path)
|
||||
except Exception:
|
||||
_LOG.exception("Failed to create tar.gz for folder: %s", path)
|
||||
|
||||
elif backtype == "file":
|
||||
gz_path = pathbackup / f"{name}_{DATETODAY}.gz"
|
||||
if not gz_path.exists():
|
||||
if debug=="on":
|
||||
print(f"Backing up file: {path}")
|
||||
gz_filename = f"{name}_{date_str}.gz"
|
||||
gz_path = pathbackup / gz_filename
|
||||
if gz_path.exists():
|
||||
_LOG.info("File backup already exists, skipping: %s", gz_path)
|
||||
continue
|
||||
|
||||
_LOG.info("Backing up file: %s -> %s", path, gz_path)
|
||||
try:
|
||||
# open source file and compress into gzip file
|
||||
with open(path, "rb") as f_in, gzip.open(gz_path, "wb") as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
_LOG.info("Successfully created gzip: %s", gz_path)
|
||||
except Exception:
|
||||
_LOG.exception("Failed to create gzip for file: %s", path)
|
||||
else:
|
||||
_LOG.warning("Unknown backtype '%s' for path: %s", backtype, path)
|
||||
Reference in New Issue
Block a user