Knowledge base of ~80+ markdown files across 14 domains (00-13), Logseq graph, hardware design files (KiCAD), infrastructure configs, and talas-wiki static site. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
260 lines
11 KiB
Python
Executable file
260 lines
11 KiB
Python
Executable file
#!/usr/local/venvs/openvpn/bin/python
|
|
|
|
import configparser
|
|
import ipaddress
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import geoip2.database
|
|
import ldap
|
|
from pgcos import PgUtil
|
|
|
|
logger = logging.getLogger("__name__")
|
|
|
|
|
|
def configure_logger(debug: bool = False, prefix: str = "") -> None:
|
|
if debug:
|
|
log_format = f"%(asctime)s client-connect {prefix} - l.%(lineno)s:%(funcName)s() - %(levelname)s - %(message)s"
|
|
else:
|
|
log_format = f"%(asctime)s client-connect {prefix} %(levelname)s - %(message)s"
|
|
logging.basicConfig(
|
|
force=True,
|
|
format=log_format,
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
level=logging.INFO,
|
|
handlers=[logging.StreamHandler()],
|
|
)
|
|
if debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
def parse_configuration(data_model: dict, config_path: str) -> dict:
|
|
cfg_file = Path(config_path)
|
|
if not cfg_file.is_file():
|
|
raise Exception(f"configuration file {cfg_file} not found") # noqa: TRY002, TRY003
|
|
raw_config = configparser.ConfigParser()
|
|
raw_config.read(cfg_file)
|
|
config = {section: dict.fromkeys(data_model[section]) for section in data_model}
|
|
try:
|
|
for section in config: # noqa: PLC0206
|
|
for item in config[section]:
|
|
config[section][item] = raw_config.get(section, item)
|
|
except (configparser.NoSectionError, configparser.NoOptionError) as err:
|
|
raise Exception(err) # noqa: B904, TRY002
|
|
return config
|
|
|
|
|
|
def get_ip_address_from_env():
|
|
ip4 = os.getenv("trusted_ip") # noqa: SIM112
|
|
if ip4 is not None:
|
|
ip_string = ip4
|
|
else:
|
|
ip_string = os.getenv("trusted_ip6") # noqa: SIM112
|
|
try:
|
|
IP = ipaddress.ip_address(ip_string) # noqa: N806
|
|
except Exception as E: # noqa: F841
|
|
logger.error(f"could not convert '{ip_string}' to an IP address, exiting") # noqa: TRY400
|
|
sys.exit(1)
|
|
return IP.compressed
|
|
|
|
|
|
def get_as(asn_file_path, country_file_path, IP) -> dict: # noqa: N803
|
|
AS = {"number": None, "organisation": None, "country": None} # noqa: N806
|
|
if Path(asn_file_path).is_file():
|
|
reader = geoip2.database.Reader(asn_file_path)
|
|
try:
|
|
result = reader.asn(IP)
|
|
AS["number"] = result.autonomous_system_number
|
|
AS["organisation"] = result.autonomous_system_organization
|
|
except Exception as E: # noqa: F841
|
|
AS["number"] = None
|
|
AS["organisation"] = None
|
|
logger.warning(f"The IP '{IP}' not found in GeoIP database, no AS informations added to the database ")
|
|
if Path(country_file_path).is_file():
|
|
reader = geoip2.database.Reader(country_file_path)
|
|
try:
|
|
result = reader.country(IP)
|
|
AS["country"] = result.country.iso_code
|
|
except Exception as E: # noqa: F841
|
|
AS["country"] = None
|
|
logger.warning(f"The IP '{IP}' not found in GeoIP database, no country information added to the database")
|
|
return AS
|
|
|
|
|
|
def log_to_pg(
|
|
hostname: str, common_name: str, pg: PgUtil, remote_ip: str, vpn_ip: str, AS: dict, profile_name: str # noqa: N803
|
|
) -> None:
|
|
statement = "INSERT INTO connections (username, server, ipaddr, vpn_ip, country, asn, asorg, profile) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
|
|
data = [common_name, hostname, remote_ip, vpn_ip, AS["country"], AS["number"], AS["organisation"], profile_name]
|
|
pg.execute(statement=statement, data=data)
|
|
|
|
|
|
def fallback_no_pg(hostname: str, common_name: str, remote_ip: str, vpn_ip: str, AS, profile_name: str) -> None: # noqa: N803
|
|
logfile = "/var/log/openvpn/postgres_unavailable.sql"
|
|
logger.warning(f"could not connect to postgres log database, will log to file '{logfile}' instead")
|
|
f = open(logfile, "a") # noqa: SIM115, PTH123
|
|
time_ascii = os.getenv("time_ascii") # noqa: SIM112
|
|
f.write(
|
|
f"INSERT INTO connections (username, server, ipaddr, vpn_ip, country, asn, asorg, profile, ts_begin) VALUES ('{common_name}', '{hostname}', '{remote_ip}', '{vpn_ip}', '{AS['country']}', '{AS['number']}', '{AS['organisation']}', '{profile_name}', '{time_ascii}');\n"
|
|
)
|
|
f.close()
|
|
|
|
|
|
def get_ldap_profile(ldap_connection: ldap.ldapobject.SimpleLDAPObject, base: str, login: str, config_path: str) -> str:
|
|
ldap_ldap_object = ldap_connection.search_s(
|
|
base, ldap.SCOPE_SUBTREE, filterstr=f"uid={login}", attrlist=["RemoteAccess"]
|
|
)
|
|
logger.debug(ldap_ldap_object[0])
|
|
for RemoteAccess in ldap_ldap_object[0][1]["RemoteAccess"]: # noqa: N806
|
|
remoteaccess_string = RemoteAccess.decode("utf-8")
|
|
profile_name = remoteaccess_string.replace("openvpn_", "")
|
|
if remoteaccess_string == profile_name:
|
|
logger.debug(f'"{remoteaccess_string}" is not an openvpn access, skipping it (must begin by "openvpn_"')
|
|
continue
|
|
logger.debug(f'found RemoteAccess valid "{remoteaccess_string}", will look for profile "{profile_name}"')
|
|
try:
|
|
config_structure = {profile_name: ["route", "dhcp-option"]}
|
|
profile_raw = parse_configuration(config_structure, config_path)[profile_name]
|
|
|
|
route_list = profile_raw["route"].split(",")
|
|
if profile_raw["dhcp-option"] == "":
|
|
dhcp_option_list = None
|
|
else:
|
|
dhcp_option_list = profile_raw["dhcp-option"].split(",")
|
|
|
|
profile = {"route": route_list, "dhcp-option": dhcp_option_list}
|
|
return profile_name, profile # noqa: TRY300
|
|
except Exception as e:
|
|
logger.debug(e)
|
|
logger.error(f"no valid profile found for user {login}, denying access")
|
|
sys.exit(1)
|
|
|
|
|
|
def get_ldap_connection(url: str, dn: str, password: str) -> ldap.ldapobject.SimpleLDAPObject:
|
|
try:
|
|
ldap_connection = ldap.initialize(url)
|
|
ldap_connection.protocol_version = ldap.VERSION3
|
|
if url.startswith("ldaps://"):
|
|
ldap_connection.TLS_AVAIL = 1
|
|
elif url.startswith("ldap://"):
|
|
ldap_connection.start_tls_s()
|
|
ldap_connection.simple_bind_s(dn, password)
|
|
logger.debug(f"successfully connected to {url} as {dn}")
|
|
return ldap_connection # noqa: TRY300
|
|
except ldap.LDAPError as e:
|
|
logger.error(f"could not connect to {url}") # noqa: TRY400
|
|
logger.error(e) # noqa: TRY400
|
|
sys.exit(1)
|
|
|
|
|
|
def write_configuration(client_connect_config_file: str, profile: dict) -> None:
|
|
f = open(client_connect_config_file, "w") # noqa: SIM115, PTH123
|
|
for route in profile["route"]:
|
|
network_with_mask = ipaddress.ip_network(route)
|
|
network_address = str(network_with_mask.network_address)
|
|
network_netmask = str(network_with_mask.netmask)
|
|
f.write(f'push "route {network_address} {network_netmask}"\n')
|
|
if profile["dhcp-option"] is not None:
|
|
for dhcp_option in profile["dhcp-option"]:
|
|
f.write(f'push "dhcp-option {dhcp_option}"\n')
|
|
f.close()
|
|
f = open(client_connect_config_file, "r") # noqa: SIM115, PTH123, UP015
|
|
logger.debug("client config file:")
|
|
logger.debug(f.read())
|
|
f.close()
|
|
|
|
|
|
def nft_remove_vpn_ip_from_all_sets(vpn_ip: str) -> None:
|
|
cmd = "sudo /usr/sbin/nft --json list sets inet"
|
|
logger.debug(cmd)
|
|
result = subprocess.run([cmd], check=True, capture_output=True, text=True, shell=True)
|
|
result = json.loads(result.stdout)
|
|
result = result["nftables"]
|
|
logger.debug(result)
|
|
for element in result:
|
|
for key, value in element.items():
|
|
if key == "set":
|
|
try:
|
|
elem_list = value["elem"]
|
|
except: # noqa: E722
|
|
continue
|
|
for elem in elem_list:
|
|
if elem == vpn_ip:
|
|
cmd = f"sudo /usr/sbin/nft delete element inet filter {value['name']} {{ {vpn_ip} }}"
|
|
logger.debug(cmd)
|
|
sub = subprocess.run([cmd], check=True, capture_output=True, text=True, shell=True) # noqa: F841
|
|
logger.info(f"removed IP address '{vpn_ip}' from the nft set '{value['name']}'")
|
|
|
|
|
|
def main():
|
|
common_name = os.getenv("common_name") # noqa: SIM112
|
|
remote_ip = get_ip_address_from_env()
|
|
trusted_port = os.getenv("trusted_port") # noqa: SIM112
|
|
log_prefix = f"{common_name}/{remote_ip}:{trusted_port}"
|
|
configure_logger(debug=False, prefix=log_prefix)
|
|
config_path = "/etc/openvpn/scripts/configuration.ini"
|
|
config_structure = {
|
|
"meta": ["debug"],
|
|
"log database": ["dbhost", "dbname", "dbport", "dbuser", "dbpass"],
|
|
"ldap": ["url", "dn", "password", "base"],
|
|
}
|
|
try:
|
|
config = parse_configuration(config_structure, config_path)
|
|
except Exception as err:
|
|
logger.error(err) # noqa: TRY400
|
|
sys.exit(1)
|
|
logger.debug(f"start connect script execution") # noqa: F541
|
|
|
|
if config["meta"]["debug"].lower() == "true":
|
|
configure_logger(debug=True, prefix=log_prefix)
|
|
|
|
hostname = socket.gethostname().split(".")[0]
|
|
vpn_ip = ipaddress.ip_address(os.getenv("ifconfig_pool_remote_ip")) # noqa: SIM112
|
|
vpn_ip = vpn_ip.compressed
|
|
client_connect_config_file = os.getenv("client_connect_config_file") # noqa: SIM112
|
|
|
|
ldap_connection = get_ldap_connection(config["ldap"]["url"], config["ldap"]["dn"], config["ldap"]["password"])
|
|
profile_name, profile = get_ldap_profile(ldap_connection, config["ldap"]["base"], common_name, config_path)
|
|
logger.info(f"using profile {profile_name} = {profile}")
|
|
write_configuration(client_connect_config_file, profile)
|
|
|
|
asn_file_path = "/etc/openvpn/scripts/GeoLite2-ASN.mmdb"
|
|
country_file_path = "/etc/openvpn/scripts/GeoLite2-Country.mmdb"
|
|
|
|
AS = get_as(asn_file_path, country_file_path, remote_ip) # noqa: N806
|
|
|
|
pg = PgUtil(
|
|
database_configuration={
|
|
"host": config["log database"]["dbhost"],
|
|
"port": config["log database"]["dbport"],
|
|
"database": config["log database"]["dbname"],
|
|
"user": config["log database"]["dbuser"],
|
|
"password": config["log database"]["dbpass"],
|
|
}
|
|
)
|
|
nft_remove_vpn_ip_from_all_sets(vpn_ip)
|
|
cmd = f"sudo /usr/sbin/nft add element inet filter openvpn_members_{profile_name} {{ {vpn_ip} }}"
|
|
result = subprocess.run([cmd], check=True, capture_output=True, text=True, shell=True) # noqa: F841
|
|
logger.info(f"added IP address '{vpn_ip}' to the nft set 'openvpn_members_{profile_name}'")
|
|
try:
|
|
pg.connect()
|
|
if not pg.is_connected():
|
|
fallback_no_pg(hostname, common_name, remote_ip, vpn_ip, AS, profile_name)
|
|
else:
|
|
logger.debug(
|
|
f'logging to postgresql://{config["log database"]["dbuser"]}@{config["log database"]["dbhost"]}:{config["log database"]["dbport"]}/{config["log database"]["dbname"]}'
|
|
)
|
|
log_to_pg(hostname, common_name, pg, remote_ip, vpn_ip, AS, profile_name)
|
|
except Exception as e:
|
|
logger.debug(e)
|
|
fallback_no_pg(hostname, common_name, remote_ip, vpn_ip, AS, profile_name)
|
|
logger.debug("end connect script execution")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|