veza/ansible/roles/zfs/files/clean_up_zfs_snap.py
2025-12-03 22:56:50 +01:00

50 lines
2.4 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*- # noqa: UP009
import sys # noqa: I001
import time # noqa: F401
import argparse
import datetime
import socket
from zfscos import zfscos
# begin locksocket
MYNAME = sys.argv[0]
try:
lock_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
lock_socket.bind('\0' + MYNAME) # noqa: Q000
except: # noqa: E722
print('could not create locksocket, program already running, only one copy of this program may run at a given time, exiting') # noqa: Q000
sys.exit(1)
# begin parser
parser = argparse.ArgumentParser(description = "clean up snapshots for any number of datasets and all their children, there are 2 retentions to configure" )
parser.add_argument("-ds", "--dataset", help="list of dataset to consider, seperated by a space", type=str, nargs='+', required = True) # noqa: Q000
parser.add_argument("-fh", "--full-history", dest="fh", help="keep this many days of all snapshots (including hourly snapshots) default=3", type=int, default=3)
parser.add_argument("-lh", "--long-history", dest="lh", help="keep this many days of daily snapshots (will only keep the snapshot from midnight this long) default=180", type=int, default=180)
parser.add_argument("-n", "--dry-run", dest="dryrun", help="show what would be done, but do nothing", action="store_true")
args = parser.parse_args()
localtime = datetime.datetime.now()
def snap_rotate(ds,fh,lh,dryrun):
if dryrun is False:
msg='' # noqa: Q000
else:
msg='[DRYRUN] ' # noqa: Q000
for snap in zfscos.zlist(ds,'snap'): # noqa: Q000
if snap['creation'] < (localtime - datetime.timedelta(days=lh)): # noqa: Q000
print(msg + 'removing snapshot ' + snap['name'] + ' older than long history ' + str(lh) + ' days') # noqa: Q000
if args.dryrun is False:
zfscos.destroy(snap['name']) # noqa: Q000
elif snap['creation'] < (localtime - datetime.timedelta(days=fh)) and int(snap['creation'].strftime('%H')) != 0: # noqa: Q000
print(msg + 'removing snapshot ' + snap['name'] + ' older than full history ' + str(fh) + ' days') # noqa: Q000
if args.dryrun is False:
zfscos.destroy(snap['name']) # noqa: Q000
for ds in args.dataset:
if zfscos.zexist(ds):
snap_rotate(ds,args.fh,args.lh,args.dryrun)
else:
print( 'dataset ' + ds + ' does not exist, ignoring' ) # noqa: Q000