veza/ansible/roles/incus/files/init_container.py
2025-12-03 22:56:50 +01:00

411 lines
14 KiB
Python

#!/usr/local/venvs/init_container/bin/python
import argparse
import ipaddress
import logging
import os
import socket
import subprocess
import sys
import time
import pylxd
###############################################################################
# begin logger
###############################################################################
MYNAME = sys.argv[0]
LOGDIR = "~/" + os.path.basename(MYNAME).replace(".py", "_LOG") # noqa: PTH119
EXPENDED_LOGDIR = os.path.expanduser(LOGDIR) # noqa: PTH111
if os.path.isdir(EXPENDED_LOGDIR) is False: # noqa: PTH112
os.mkdir(EXPENDED_LOGDIR) # noqa: PTH102
LOGFILE = EXPENDED_LOGDIR + "/" + time.strftime("%Y-%m-%d") + ".log"
# from https://docs.python.org/2/howto/logging-cookbook.html
logger = logging.getLogger("jdl")
logger.setLevel(logging.DEBUG)
# create file handler
fh = logging.FileHandler(LOGFILE)
fh.setLevel(logging.INFO)
# create console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# create formatter and add it to the handlers
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
logger.info("Program Starting")
###############################################################################
# end logger - start parser
###############################################################################
domain_list = [
"talas.com",
"int.talas.veza",
"talas.dev",
"hds.talas.cloud",
"int.talas.cloud",
"test.talas.com",
"hds.reflex-holding.com",
"talas.sh",
]
parser = argparse.ArgumentParser(description="script to create a container on Debian or Ubuntu")
parser.add_argument("--name", help="name of the container", type=str, required=True)
parser.add_argument(
"--network",
help="network profile name for this container, this script only handle containers with 1 network profile, the name of the profile is usualy 'net-vlanXXX'",
type=str,
required=True,
)
parser.add_argument(
"--distrib",
help="distribution to use, current default is 'debian/bookworm'. Example: 'ubuntu/focal' 'debian/bookworm', you can see a list of available image with 'lxc image list images:'",
type=str,
default="debian/bookworm",
)
parser.add_argument(
"--cpu",
help="number of CPU threads for the container, will create the relevant profile if it doesn't exist (default=4)",
type=int,
default=4,
)
parser.add_argument(
"--memory",
help="memory limit in GiB, will create the relevant profile if it doesn't exist (default=4GiB)",
type=int,
default=4,
)
parser.add_argument("--quota", help="zfs quota in GiB for the root drive (default=10)", type=int, default=10)
parser.add_argument(
"--pool", help='name of the incus storage pool to use, the default value is "default" ', type=str, default="default"
)
parser.add_argument(
"--ip",
help=f"private IPv4 address, default to the DNS resolution of 'container name' with one of the following domains: {domain_list}. You can specify a mask, if not, a /24 will be used",
type=ipaddress.IPv4Interface,
)
parser.add_argument(
"--public",
help="Allow the use of public IP address with the --ip parameter, you will need to set a public DNS server with the --dns option for this to work as intended",
action="store_true",
)
parser.add_argument(
"--gateway",
help="IP address of the gateway, if unspecified, will use the last IP of the network",
type=ipaddress.ip_address,
)
parser.add_argument(
"--proxy", help="address of the proxy server, if any, syntax is 'hostname:port' ", type=str, default="no"
)
parser.add_argument(
"--dns",
help="DNS servers to use, the default is to copy the /etc/resolv.conf of the current host",
type=ipaddress.ip_address,
nargs="+",
)
parser.add_argument(
"--networkmethod",
help="which method should be used to configure the network, supported options are: 'networkd' or 'interfaces', default 'networkd', 'interfaces' should not be used anymore since Debian 11 is now working with networkd",
type=str,
choices=["networkd", "interfaces"],
default="networkd",
)
parser.add_argument("--debug", help="enable debug output", action="store_true")
args = parser.parse_args()
if args.debug:
fh.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
if args.ip is None:
resolved = False
for domain in domain_list:
try:
ip = socket.gethostbyname(f"{args.name}.{domain}") + "/24"
args.ip = ipaddress.IPv4Interface(ip)
logger.info(f'resolved "{args.name}.{domain}" to "{ip}"')
resolved = True
break
except: # noqa: E722
logger.debug(f'could not resolve "{args.name}.{domain}"')
if not resolved:
logger.info(f'could not resolve "{args.name}" with any of the following domains: {domain_list}, exiting.')
sys.exit(1)
###############################################################################
# end parser - start functions
###############################################################################
def ping(IP): # noqa: N803
try:
CMD = "ping -w 1 " + IP # noqa: N806
subprocess.check_output(CMD, shell=True)
logger.error("Specified IP is already up, exiting")
return True # noqa: TRY300
except: # noqa: E722
logger.debug("Specified IP is not responding, continuing")
return False
def check_container_exists(name):
try:
client.instances.get(name)
logger.error("container " + name + " already exists, exiting")
return True # noqa: TRY300
except: # noqa: E722
logger.debug("the container doesn't exist, proceeding")
return False
def wait(duration_in_sec):
print("waiting a bit ", end="", flush=True)
for n in range(1, duration_in_sec): # noqa: B007
time.sleep(1)
print(".", end="", flush=True)
print("")
###############################################################################
# end functions - start program
###############################################################################
# check if a network mask was specified
if args.ip.netmask == ipaddress.IPv4Address("255.255.255.255"):
logger.debug("no netmask specified, assuming netmask /24")
ip = ipaddress.IPv4Interface(str(args.ip.ip) + "/24")
else:
ip = args.ip
# verify that the IP address is private
if not ip.is_private and not args.public:
logger.error(
"IP address "
+ ip.exploded
+ " is not a private IPv4 address, use the --public option if you really want to use a public IP"
)
sys.exit(1)
# is this IP already up?
logger.debug(f"trying to ping {ip.ip} from this host")
if ping(str(ip.ip)):
sys.exit(1)
# connect ourselves to the incus daemon
try:
client = pylxd.Client(endpoint="/var/lib/incus/unix.socket")
except: # noqa: E722
logger.error("could not connect to the local incus daemon") # noqa: TRY400
sys.exit(1)
# check if the container already exists
logger.debug(f"check if a container with the name {args.name} already exists...")
if check_container_exists(args.name):
sys.exit(1)
# check if the storage pool exists
logger.debug(f'checking if the storage pool "{args.pool}" exists')
if client.storage_pools.exists(args.pool):
logger.debug(f'the storage pool "{args.pool}" exists')
else:
logger.error(f'the storage pool "{args.pool}" does not exists!')
sys.exit(1)
# check if the network profile already exists
logger.debug("check if the network profile exists (it cannot be created automatically to avoid mistakes)")
try:
client.profiles.get(args.network)
logger.debug(f"network profile {args.network} already exists")
except: # noqa: E722
logger.error( # noqa: TRY400
f"network profile {args.network} does not exists, are you sure you that you did specify a correct profile name?"
)
sys.exit(1)
profile_cpu = "cpu-" + str(args.cpu) + "-cores"
profile_memory = f"mem-{args.memory}GiB"
try:
client.profiles.get(profile_cpu)
logger.debug("profile " + profile_cpu + " already exists")
except: # noqa: E722
logger.info("creating profile " + profile_cpu)
profile_cpu_config = {"limits.cpu": str(args.cpu)}
client.profiles.create(profile_cpu, config=profile_cpu_config)
try:
client.profiles.get(profile_memory)
logger.debug("profile " + profile_memory + " already exists")
except: # noqa: E722
logger.info("creating profile " + profile_memory)
profile_memory_config = {"limits.memory": f"{args.memory}GiB"}
client.profiles.create(profile_memory, config=profile_memory_config)
profile_list = [args.network, profile_cpu, profile_memory]
device_list = {"root": {"path": "/", "pool": args.pool, "type": "disk", "size": f"{args.quota}GiB"}}
logger.debug(f"profile_list = profile_list") # noqa: F541
logger.debug('IT-8735: adding security.nesting: "true" on every containers')
config = {
"name": args.name,
"source": {
"type": "image",
"alias": args.distrib,
"mode": "pull",
"server": "https://images.linuxcontainers.org",
"protocol": "simplestreams",
},
"profiles": profile_list,
"devices": device_list,
"config": {"security.nesting": "true"},
}
logger.info("creating container " + args.name)
container = client.instances.create(config, wait=True)
logger.info("container created!")
# IT-8895 - define LXC systemd override file location
lxc_override_file = "/etc/systemd/system/service.d/lxc.conf"
logger.info(f"removing {lxc_override_file} if it exists - see IT-8895")
try:
container.files.delete(lxc_override_file)
logger.info(f"deleted {lxc_override_file}")
except Exception as e: # noqa: F841
logger.info(f"{lxc_override_file} did not exist")
# find the interface name for the network profile:
profile = client.profiles.get(args.network)
interface_name = list(profile.devices.keys())[0] # noqa: RUF015
if args.gateway is None:
gateway = str(list(ip.network.hosts())[-1])
logger.debug("defining the gateway with the last IP of the network: " + gateway)
else:
gateway = args.gateway.compressed
logger.debug("using manually defined gateway " + gateway)
# create the network config
if args.networkmethod == "networkd":
network_config_path = "/etc/systemd/network/10-autogenerated.network"
network_config_content = (
"[Match]\nName="
+ interface_name
+ "\n[Network]\nLinkLocalAddressing=no\nAddress="
+ str(ip)
+ "\nGateway="
+ gateway
+ "\n"
)
elif args.networkmethod == "interfaces":
network_config_path = "/etc/network/interfaces"
network_config_content = (
"# loopback\nauto lo\niface lo inet loopback\n\nauto "
+ interface_name
+ "\niface "
+ interface_name
+ " inet static\n\taddress "
+ str(ip.ip)
+ "\n\tnetmask "
+ str(ip.netmask)
+ "\n\tgateway "
+ gateway
+ "\n"
)
logger.debug("network config:\n" + network_config_content)
# send the network config
container.files.put(network_config_path, network_config_content)
# path and content of the apt configuration to not install the recommended packages by default
no_recommends_path = "/etc/apt/apt.conf.d/no_recommends.conf"
no_recommends_content = 'APT::Install-Recommends "false";\n'
container.files.put(no_recommends_path, no_recommends_content)
# if a proxy was defined, add the apt configuration for it
if args.proxy != "no":
apt_proxy_configuration_path = "/etc/apt/apt.conf.d/proxy.conf"
apt_proxy_configuration_content = (
'Acquire {\n HTTP::proxy "http://' + args.proxy + '";\n HTTPS::proxy "http://' + args.proxy + '";\n}\n'
)
container.files.put(apt_proxy_configuration_path, apt_proxy_configuration_content)
logger.info("starting the container...")
container.start()
wait(5)
logger.info("container started")
# execute all the needed chmod
container.execute(["chmod", "644", network_config_path])
container.execute(["chmod", "644", no_recommends_path])
if args.proxy != "no":
container.execute(["chmod", "644", apt_proxy_configuration_path])
# purge netplan.io
logger.info("apt purge -y netplan.io")
RESULT = container.execute(["apt-get", "purge", "-y", "netplan.io"])
logger.debug(str(RESULT))
wait(1)
# stop and disable systemd-resolved
logger.info("systemctl stop systemd-resolved.service")
RESULT = container.execute(["systemctl", "stop", "systemd-resolved.service"])
logger.debug(str(RESULT))
wait(2)
logger.info("systemctl disable systemd-resolved.service")
RESULT = container.execute(["systemctl", "disable", "systemd-resolved.service"])
logger.debug(str(RESULT))
wait(2)
# handle resolv.conf
resolvconf_path = "/etc/resolv.conf"
if args.dns is None:
resolvconf_content = open(resolvconf_path, "r").read() # noqa: SIM115, PTH123, UP015
else:
resolvconf_content = "# initial configuration\n"
for dns in args.dns:
resolvconf_content = resolvconf_content + "nameserver " + str(dns) + "\n"
logger.info(f"unlink {resolvconf_path}")
container.execute(["unlink", resolvconf_path])
logger.debug("resolvconf_content:\n" + resolvconf_content)
logger.info(f"sending {resolvconf_path}")
container.files.put(resolvconf_path, resolvconf_content)
container.execute(["chmod", "644", resolvconf_path])
logger.info("waiting 3 seconds to allow the system to take into account the DNS change...")
wait(3)
if args.networkmethod == "networkd":
# start and enable systemd-networkd
logger.info("systemctl enable systemd-networkd")
RESULT = container.execute(["systemctl", "enable", "systemd-networkd"])
logger.debug(str(RESULT))
logger.info("removing default config file /etc/systemd/network/eth0.network")
container.execute(["unlink", "/etc/systemd/network/eth0.network"])
wait(1)
logger.info("systemctl restart systemd-networkd")
RESULT = container.execute(["systemctl", "restart", "systemd-networkd"])
logger.debug(str(RESULT))
wait(2)
# handle authorized_keys
logger.info("mkdir /root/.ssh")
container.execute(["mkdir", "/root/.ssh"])
authorized_keys_path = "/root/.ssh/authorized_keys"
authorized_keys_content = open(authorized_keys_path, "r").read() # noqa: SIM115, PTH123, UP015
logger.info("sending " + authorized_keys_path)
container.files.put(authorized_keys_path, authorized_keys_content)
# install openssh
logger.info("apt update")
RESULT = container.execute(["apt", "update"])
logger.debug(str(RESULT))
wait(3)
logger.info("apt install -y openssh-server python3")
RESULT = container.execute(["apt", "install", "-y", "openssh-server", "python3"])
logger.debug(str(RESULT))
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4