167 lines
5.1 KiB
Python
167 lines
5.1 KiB
Python
from constants import *
|
|
from pathlib import Path
|
|
import os, gzip, tarfile, shutil
|
|
|
|
## Create the backup default folders
|
|
def default_backup_dir():
|
|
os.makedirs(HOST_BACKUP_FOLDER, exist_ok=True)
|
|
|
|
from pathlib import Path
|
|
from constants import *
|
|
import os
|
|
|
|
def autorotate_backups(dry_run: bool = False):
|
|
"""
|
|
Scansiona tutte le sottocartelle immediate di HOST_BACKUP_FOLDER.
|
|
Per ogni sottocartella prende i file *.gz (inclusi .tar.gz), li ordina
|
|
per mtime (più nuovi prima), mantiene i primi `keep_backups` e rimuove
|
|
gli altri (a meno che dry_run==True).
|
|
Restituisce (candidates_found, actually_deleted).
|
|
"""
|
|
|
|
base = Path(HOST_BACKUP_FOLDER)
|
|
|
|
if not base.exists():
|
|
print("ERROR: HOST_BACKUP_FOLDER does not exist:", base)
|
|
return 0, 0
|
|
|
|
keep = int(JSON_CONF.get("keep_backups", 7))
|
|
|
|
total_candidates = 0
|
|
total_deleted = 0
|
|
|
|
# ottengo tutte le directory immediate dentro HOST_BACKUP_FOLDER
|
|
targets = sorted([p for p in base.iterdir() if p.is_dir()])
|
|
|
|
if not targets:
|
|
print("No subfolders found in HOST_BACKUP_FOLDER:", base)
|
|
return 0, 0
|
|
|
|
for folder in targets:
|
|
# prendi solo file (evita di includere directory per errore)
|
|
backups = sorted(
|
|
(f for f in folder.glob("*.gz") if f.is_file()),
|
|
key=lambda f: f.stat().st_mtime,
|
|
reverse=True
|
|
)
|
|
|
|
old_backups = backups[keep:]
|
|
|
|
print("\nFolder:", folder)
|
|
print("Total backups:", len(backups))
|
|
print("Keep:", keep)
|
|
print("Old to remove:", len(old_backups))
|
|
|
|
for b in old_backups:
|
|
print(" Old backup:", b)
|
|
|
|
# elimina se non dry_run
|
|
if not dry_run and old_backups:
|
|
for b in old_backups:
|
|
try:
|
|
b.unlink()
|
|
total_deleted += 1
|
|
print(" -> deleted")
|
|
except Exception as e:
|
|
print(f" -> failed to delete {b}: {e}")
|
|
|
|
total_candidates += len(old_backups)
|
|
|
|
print("\nSummary:")
|
|
print(f" Candidates found: {total_candidates}")
|
|
print(f" Actually deleted: {total_deleted} (dry_run={dry_run})")
|
|
|
|
return total_candidates, total_deleted
|
|
|
|
|
|
## Show what backups path are enabled or disabled
|
|
def show_enabled():
|
|
print()
|
|
print("### ENABLED PATHS ###")
|
|
for path, flag, name in JSON_LIST:
|
|
if flag > 0:
|
|
print(f"- {path}")
|
|
print ("")
|
|
print("### DISABLED PATHS ###")
|
|
for path, flag, name in JSON_LIST:
|
|
if flag == 0:
|
|
print(f"- {path}")
|
|
|
|
|
|
|
|
## Checking which of the enabled path are available for a backup
|
|
def check_existing_folders(debug="off"):
|
|
checked_paths = []
|
|
correct_folder = []
|
|
correct_file = []
|
|
notexists = []
|
|
empty = []
|
|
|
|
|
|
|
|
for path, flag, namepath in JSON_LIST:
|
|
if flag != 1:
|
|
continue
|
|
pathnow = Path(path)
|
|
if pathnow.exists():
|
|
if pathnow.is_dir() and any(pathnow.iterdir()):
|
|
checked_paths.append([pathnow, namepath, "folder"])
|
|
correct_folder.append(f"- Folder exists: {pathnow}")
|
|
elif pathnow.is_file():
|
|
checked_paths.append([pathnow, namepath, "file"])
|
|
correct_file.append(f"- File exists: {pathnow}")
|
|
else:
|
|
empty.append(f"- Empty folder or special file: {pathnow}")
|
|
else:
|
|
notexists.append(f"- Path does not exist: {pathnow}")
|
|
|
|
if debug=="on":
|
|
print("###### CHECKING EXISTING FOLDERS/FILES ######")
|
|
print()
|
|
print(f"# FOLDERS CHECK OK - [ {len(correct_folder)} ] #")
|
|
for folder in correct_folder:
|
|
print(folder)
|
|
print("")
|
|
|
|
print(f"# FILES CHECK OK - [ {len(correct_file)} ] #")
|
|
for file in correct_file:
|
|
print(file)
|
|
print("")
|
|
|
|
print(f"# FOLDERS EMPTY - [ {len(empty)} ] #")
|
|
for emptyfold in empty:
|
|
print(emptyfold)
|
|
print("")
|
|
|
|
print(f"# FILES / FOLDERS NOT EXISTS - [ {len(notexists)} ] #")
|
|
for not_exists in notexists:
|
|
print(not_exists)
|
|
print("")
|
|
|
|
return checked_paths
|
|
|
|
## Function available for the backup
|
|
def backups_now(debug="off"):
|
|
listnow = check_existing_folders()
|
|
base_backup = Path(HOST_BACKUP_FOLDER)
|
|
base_backup.mkdir(parents=True, exist_ok=True)
|
|
|
|
for path, name, backtype in listnow:
|
|
pathbackup = base_backup / name
|
|
pathbackup.mkdir(parents=True, exist_ok=True)
|
|
|
|
if backtype == "folder":
|
|
tar_path = pathbackup / f"{name}_{DATETODAY}.tar.gz"
|
|
if not tar_path.exists():
|
|
if debug=="on":
|
|
print(f"Backing up folder: {path}")
|
|
with tarfile.open(tar_path, "w:gz") as tar:
|
|
tar.add(path, arcname=path.name)
|
|
|
|
elif backtype == "file":
|
|
gz_path = pathbackup / f"{name}_{DATETODAY}.gz"
|
|
if not gz_path.exists():
|
|
if debug=="on":
|
|
print(f"Backing up file: {path}")
|
|
with open(path, "rb") as f_in, gzip.open(gz_path, "wb") as f_out:
|
|
shutil.copyfileobj(f_in, f_out) |