argos-setup/setup_server.py

735 lines
27 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
ARGOS SOC Web Installer Server
Tecnotel Servizi SRL
"""
import json
import os
import subprocess
import secrets
import shutil
import hashlib
import threading
import signal
from datetime import datetime, timezone
from pathlib import Path
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse
APP_DIR = Path("/opt/argos/app")
CONFIG_DIR = Path("/opt/argos/config")
DATA_DIR = Path("/opt/argos/data")
FEEDS_DIR = Path("/opt/argos/feeds")
LOGS_DIR = Path("/opt/argos/logs")
CERTS_DIR = Path("/opt/argos/certs")
SETUP_DIR = Path("/opt/argos/setup")
APP_USER = "argos"
PORT = 8888
# ── Licenza ARGOS — pubkey per verifica firma Ed25519 ─────────────────────────
# Stessa pubkey hardcoded in backend/core.py. Raw Ed25519 (32 byte) base64.
# Il setup_server la usa per verificare licenze ricevute dal cliente PRIMA
# di procedere con l'installazione.
_LICENSE_PUBLIC_KEY_B64 = "GMRsZMoxOlCBiJU66EsQcj0ZO0gVd0GHB5LelEo/hns="
# ── Clone argos: username Basic Auth bot Gitea (costante del sistema) ─────────
GITEA_BOT_USER = "argos-portal-bot"
GITEA_REPO_PATH = "/tecnotel/argos.git"
install_log = []
install_done = False
install_error = False
def get_machine_id() -> str:
"""Fingerprint univoco del server (stesso algoritmo di core.py).
SHA256 hex di: /etc/machine-id | hostname | MAC prima interfaccia fisica.
"""
import socket as _sock
import subprocess as _sp
parts = []
try:
with open("/etc/machine-id") as f:
parts.append(f.read().strip())
except Exception:
parts.append("")
try:
parts.append(_sock.gethostname())
except Exception:
parts.append("")
try:
r = _sp.run(["cat", "/sys/class/net/eth0/address"],
capture_output=True, text=True, timeout=2)
mac = r.stdout.strip()
if not mac or mac == "00:00:00:00:00:00":
r = _sp.run(["ip", "-o", "link", "show"],
capture_output=True, text=True, timeout=2)
for line in r.stdout.splitlines():
if "link/ether" in line and "00:00:00:00:00:00" not in line:
if "docker" in line or "br-" in line or "veth" in line:
continue
mac = line.split("link/ether")[1].split()[0].strip()
break
parts.append(mac or "")
except Exception:
parts.append("")
combined = "|".join(parts)
return hashlib.sha256(combined.encode()).hexdigest()
def verify_license(raw_bytes):
"""Verifica firma Ed25519 + machine_id match + scadenza.
Ritorna tupla (ok, license_dict, error_message).
Se ok=True, license_dict contiene il payload completo (inclusi gitea_url,
gitea_token se presenti). Se ok=False, error_message e' umano-friendly.
"""
try:
raw = json.loads(raw_bytes)
except Exception as e:
return (False, None, f"File non e' JSON valido: {e}")
if not isinstance(raw, dict):
return (False, None, "Formato licenza non riconosciuto")
# Verifica firma
try:
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from cryptography.exceptions import InvalidSignature
import base64
sig = raw.pop("signature", "")
if not sig:
return (False, None, "Licenza senza firma (campo 'signature' mancante)")
payload = json.dumps(raw, sort_keys=True, separators=(",", ":"))
raw["signature"] = sig # ripristina
pub_bytes = base64.b64decode(_LICENSE_PUBLIC_KEY_B64)
pub = Ed25519PublicKey.from_public_bytes(pub_bytes)
try:
pub.verify(base64.b64decode(sig), payload.encode())
except InvalidSignature:
return (False, None, "Firma non valida. La licenza potrebbe essere manipolata o emessa da altro vendor.")
except ImportError:
return (False, None, "Libreria 'cryptography' non disponibile. Installa: apt install python3-cryptography")
except Exception as e:
return (False, None, f"Errore verifica firma: {e}")
# Verifica machine_id
lic_machine = raw.get("machine_id", "")
if not lic_machine:
return (False, None, "Licenza senza machine_id — formato non supportato")
cur_machine = get_machine_id()
if lic_machine != cur_machine:
return (False, None,
f"Machine ID non corrisponde: licenza per {lic_machine[:12]}..., "
f"questo server e' {cur_machine[:12]}... "
"La licenza non e' valida per questa macchina.")
# Verifica scadenza
expires = raw.get("expires_at", "")
if expires:
today = datetime.now().strftime("%Y-%m-%d")
if expires < today:
return (False, None, f"Licenza scaduta il {expires}")
# Verifica presenza credenziali Gitea (la licenza deve averle per permettere clone)
if not raw.get("gitea_url") or not raw.get("gitea_token"):
return (False, None,
"Licenza priva di credenziali Gitea. "
"Contatta Tecnotel per riemetterla con il nuovo formato.")
return (True, raw, "")
def log(msg):
ts = datetime.now().strftime("%H:%M:%S")
line = f"[{ts}] {msg}"
install_log.append(line)
print(line, flush=True)
def run(cmd, check=True):
log(f"$ {cmd}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.stdout.strip(): log(result.stdout.strip())
if result.stderr.strip(): log(result.stderr.strip())
if check and result.returncode != 0:
raise RuntimeError(f"Comando fallito (exit {result.returncode}): {cmd}")
return result
def chown(path):
run(f"chown -R {APP_USER}:{APP_USER} {path}", check=False)
def generate_argos_json(data):
# Hostname primario dal wizard (campo "domain" nella UI = FQDN tipo soc.azienda.it)
hostname = data.get("domain", "").strip().lower()
# Alias dal wizard: stringa separata da spazi -> lista pulita
aliases_raw = data.get("aliases", "").strip()
aliases = [a.strip().lower() for a in aliases_raw.split() if a.strip()]
return {
"_version": "1.2",
"_installed": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"cliente": {
"name": data.get("cliente_name", ""),
"full_name": data.get("cliente_full", ""),
"domain": data.get("cliente_domain", ""),
"type": data.get("cliente_type", "enterprise"),
"ai_context": "",
"sharepoint_tenant": "",
"console_url": ""
},
"network": {
"hostname": hostname,
"aliases": aliases
},
"system": {
"secret_key": secrets.token_hex(32),
"internal_api_key": secrets.token_hex(24),
"anthropic_key": "",
"tz": "Europe/Rome",
"vendor_heartbeat": {
"enabled": True,
"url": "https://argos-update.tecnotelsrl.com",
"install_key": "5b1ab5c872383f686d3a25a5e123adca",
"interval_h": 6
}
},
"opensearch": {
"url": data.get("os_url", ""),
"user": data.get("os_user", "admin"),
"password": data.get("os_pass", "")
},
"paths": {
"data_dir": str(DATA_DIR),
"feeds_dir": str(FEEDS_DIR),
"logs_dir": str(LOGS_DIR),
"config_dir": str(CONFIG_DIR),
"analytics_db": str(DATA_DIR / "analytics.db"),
"analytics_exclude_entities": ""
},
"ports": {
"backend": 8080,
"sync": 8081,
"ops": 8082,
"analytics": 8083
}
}
def generate_integrations_json():
example = APP_DIR / "config/integrations.json.example"
if example.exists():
with open(example) as f:
return json.load(f)
# fallback minimale
return {
"_version": "1.0",
"endpoint": {}, "firewall": {}, "identity": {},
"notifications": {}, "threat_intel": {},
"pdf": {
"org_name": "", "author": "Tecnotel Servizi SRL",
"motto": "Controllo totale. Difesa continua.",
"classification": "RISERVATO - USO INTERNO",
"app_logo": str(APP_DIR / "frontend/dist/logo_argos_bianco.png"),
"client_logo": str(CONFIG_DIR / "assets" / "logo_cliente.png")
}
}
def create_admin_user(data):
username = data.get("admin_username", "admin").strip()
email = data.get("admin_email_user", "").strip()
password = data.get("admin_password", "").strip()
if not username or not password:
log("WARN: credenziali admin mancanti — skip creazione utente")
return
import os as _os
# Hash compatibile con auth.py: salt:sha256(salt+pw+"argos-2026")
salt = _os.urandom(16).hex()
pw_hash = f"{salt}:{hashlib.sha256((salt + password + 'argos-2026').encode()).hexdigest()}"
users_file = DATA_DIR / "argos_users.json"
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Leggi utenti esistenti o crea struttura vuota
try:
with open(users_file) as f:
users_data = json.load(f)
except Exception:
users_data = {"users": {}}
users_data["users"][username.lower()] = {
"password": pw_hash,
"role": "admin",
"name": username,
"email": email,
"enabled": True
}
with open(users_file, "w") as f:
json.dump(users_data, f, indent=2)
os.chmod(users_file, 0o600)
chown(users_file)
log(f"Utente admin '{username}' creato in argos_users.json")
def install(data):
global install_done, install_error
try:
log("=== AVVIO INSTALLAZIONE ARGOS SOC ===")
# 0. Carica licenza (gia' validata da /api/license/upload)
log("── Verifica licenza ARGOS ──")
lic_path = SETUP_DIR / "license.json"
if not lic_path.exists():
raise RuntimeError(
"License.json non trovata in /opt/argos/setup/. "
"Caricare una licenza valida prima di avviare l'installazione."
)
ok, lic, err = verify_license(lic_path.read_bytes())
if not ok:
raise RuntimeError(f"Licenza non valida: {err}")
gitea_url = lic.get("gitea_url", "").rstrip("/")
gitea_token = lic.get("gitea_token", "")
# Deriva host dal gitea_url (es: https://host/api/v1 -> https://host)
gitea_host = gitea_url[:-len("/api/v1")] if gitea_url.endswith("/api/v1") else gitea_url
log(f"Licenza OK: {lic.get('customer')} / {lic.get('tier')} / exp {lic.get('expires_at')}")
# 1. Clone repository argos (URL autenticato temporaneo: token NON in .git/config)
log("── Clone repository ARGOS ──")
if APP_DIR.exists() and (APP_DIR / ".git").exists():
log("Repository gia' presente — skip clone")
else:
auth_url = f"https://{GITEA_BOT_USER}:{gitea_token}@{gitea_host[len('https://'):]}{GITEA_REPO_PATH}"
APP_DIR.parent.mkdir(parents=True, exist_ok=True)
# Clona con URL autenticato
# Git >= 2.35 rifiuta operazioni su repo con ownership "dubbia"
# (es. clone come root in dir pre-creata come argos). Aggiungiamo
# /opt/argos/app a safe.directory globale PRIMA del clone.
run(f"git config --global --add safe.directory {APP_DIR}")
run(f"git clone {auth_url} {APP_DIR}")
# Dopo clone, ripulisci .git/config rimuovendo il token
clean_url = f"{gitea_host}{GITEA_REPO_PATH}"
run(f"git -C {APP_DIR} remote set-url origin {clean_url}")
run(f"chown -R {APP_USER}:{APP_USER} {APP_DIR}")
log("Repository ARGOS clonato")
# 2. Python virtualenv
log("── Creazione virtualenv Python ──")
venv_dir = APP_DIR / "backend/venv"
if not venv_dir.exists():
run(f"python3 -m venv {venv_dir}")
run(f"{venv_dir}/bin/pip install --upgrade pip -q")
req_file = APP_DIR / "backend/requirements.txt"
if req_file.exists():
run(f"{venv_dir}/bin/pip install -r {req_file} -q")
run(f"chown -R {APP_USER}:{APP_USER} {venv_dir}")
log("Virtualenv Python pronto")
# 3. argos.json
log("── Generazione argos.json ──")
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
config = generate_argos_json(data)
cfg_path = CONFIG_DIR / "argos.json"
with open(cfg_path, "w") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
os.chmod(cfg_path, 0o600)
chown(CONFIG_DIR)
log("argos.json creato")
# 4. integrations.json
log("── Generazione integrations.json ──")
integ = generate_integrations_json()
# Aggiorna pdf con nome organizzazione
integ.setdefault("pdf", {})
integ["pdf"]["org_name"] = data.get("cliente_full") or data.get("cliente_name", "")
integ["pdf"]["client_logo"] = str(CONFIG_DIR / "assets" / "logo_cliente.png")
integ_path = CONFIG_DIR / "integrations.json"
with open(integ_path, "w") as f:
json.dump(integ, f, indent=2, ensure_ascii=False)
os.chmod(integ_path, 0o600)
chown(integ_path)
log("integrations.json creato")
# 5. modules.json
mods = APP_DIR / "config/modules.json.example"
if mods.exists():
shutil.copy(mods, CONFIG_DIR / "modules.json")
chown(CONFIG_DIR / "modules.json")
log("modules.json copiato")
# 6. Logo cliente
logo_src = SETUP_DIR / "logo_cliente.png"
if logo_src.exists():
DATA_DIR.mkdir(parents=True, exist_ok=True)
shutil.copy(logo_src, CONFIG_DIR / "assets" / "logo_cliente.png")
chown(CONFIG_DIR / "assets" / "logo_cliente.png")
log("Logo cliente copiato")
# 7. Build frontend
log("── Build frontend React ──")
run(f"cd {APP_DIR}/frontend && npm install --silent")
run(f"cd {APP_DIR}/frontend && npm run build")
chown(APP_DIR / "frontend")
# Permessi lettura per nginx (www-data)
run(f"chmod 755 /opt/argos /opt/argos/app /opt/argos/app/frontend {APP_DIR}/frontend/dist")
run(f"chmod -R 755 {APP_DIR}/frontend/dist/")
log("Frontend buildato")
# 8. Dipendenze Python (re-run in caso di aggiornamenti)
log("── Dipendenze Python ──")
venv_pip = APP_DIR / "backend/venv/bin/pip"
req = APP_DIR / "backend/requirements.txt"
if req.exists():
run(f"{venv_pip} install -r {req} -q")
log("Dipendenze Python installate")
# 9. Utente admin
log("── Creazione utente admin ──")
create_admin_user(data)
# 10. SSL
log("── Configurazione SSL ──")
domain = data.get("domain", "").strip()
aliases = data.get("aliases", "").strip()
ssl_mode = data.get("ssl_mode", "letsencrypt")
all_names = (domain + " " + aliases).strip()
if ssl_mode == "manual":
crt_src = SETUP_DIR / "uploaded.crt"
key_src = SETUP_DIR / "uploaded.key"
if not crt_src.exists() or not key_src.exists():
raise RuntimeError("File SSL .crt o .key non trovati in /opt/argos/setup/")
CERTS_DIR.mkdir(parents=True, exist_ok=True)
shutil.copy(crt_src, CERTS_DIR / "fullchain.pem")
shutil.copy(key_src, CERTS_DIR / "privkey.pem")
os.chmod(CERTS_DIR / "privkey.pem", 0o600)
ssl_crt = str(CERTS_DIR / "fullchain.pem")
ssl_key = str(CERTS_DIR / "privkey.pem")
log("Certificato SSL manuale copiato")
else:
_write_nginx_http(all_names)
run("nginx -t && systemctl restart nginx")
certbot_d = " ".join(f"-d {n}" for n in all_names.split())
email = data.get("admin_email", "admin@tecnotelsrl.com")
run(f"certbot --nginx {certbot_d} --non-interactive --agree-tos -m {email}")
ssl_crt = f"/etc/letsencrypt/live/{domain}/fullchain.pem"
ssl_key = f"/etc/letsencrypt/live/{domain}/privkey.pem"
log("Certificato Let's Encrypt ottenuto")
# 11. Nginx finale
log("── Nginx configurazione finale ──")
_write_nginx_final(all_names, ssl_crt, ssl_key)
run("nginx -t && systemctl restart nginx")
log("Nginx configurato")
# 12. Systemd services
log("── Creazione e avvio servizi ──")
_write_services()
run("systemctl daemon-reload")
for svc in ["argos-backend", "argos-sync", "argos-ops", "argos-analytics"]:
run(f"systemctl enable --now {svc}")
log(f"{svc} avviato")
# 13. Copia licenza + chiudi web installer
log("── Copia licenza in posizione finale ──")
DATA_DIR.mkdir(parents=True, exist_ok=True)
final_lic = DATA_DIR / "license.json"
shutil.copy(lic_path, final_lic)
os.chmod(final_lic, 0o600)
chown(final_lic)
log(f"Licenza copiata in {final_lic}")
log("── Chiusura web installer ──")
run("systemctl disable --now argos-setup", check=False)
run("ufw delete allow 8888/tcp", check=False)
log("Porta 8888 chiusa — web installer disabilitato")
log("=== INSTALLAZIONE COMPLETATA ===")
_schedule_cleanup()
install_done = True
# Spegni il processo dopo 8s (tempo per inviare la risposta al browser)
def shutdown():
import time
time.sleep(15)
os.kill(os.getpid(), signal.SIGTERM)
threading.Thread(target=shutdown, daemon=True).start()
except Exception as e:
log(f"ERRORE: {e}")
install_log.append(f"__ERROR__: {e}")
install_error = True
def _write_nginx_http(all_names):
conf = f"""server {{
listen 80;
server_name {all_names};
location /.well-known/acme-challenge/ {{ root /var/www/html; }}
location / {{ return 301 https://$host$request_uri; }}
}}
"""
_write_nginx_conf(conf)
def _write_nginx_final(all_names, ssl_crt, ssl_key):
conf = f"""limit_req_zone $binary_remote_addr zone=argos:10m rate=20r/s;
server {{
listen 80;
server_name {all_names};
location / {{ return 301 https://$host$request_uri; }}
location /.well-known/acme-challenge/ {{ root /var/www/html; }}
}}
server {{
listen 443 ssl http2;
server_name {all_names};
ssl_certificate {ssl_crt};
ssl_certificate_key {ssl_key};
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
add_header X-Frame-Options SAMEORIGIN;
add_header X-Content-Type-Options nosniff;
add_header Strict-Transport-Security "max-age=31536000" always;
client_max_body_size 50m;
location /api/analytics/ {{
limit_req zone=argos burst=40 nodelay;
proxy_pass http://127.0.0.1:8083;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto https;
proxy_read_timeout 120s;
}}
location /feeds/ {{
alias {FEEDS_DIR}/;
autoindex off;
default_type "text/plain; charset=utf-8";
access_log {LOGS_DIR}/nginx-feeds.log;
}}
location /api/ {{
limit_req zone=argos burst=40 nodelay;
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto https;
proxy_read_timeout 120s;
}}
location / {{
root {APP_DIR}/frontend/dist;
try_files $uri $uri/ /index.html;
expires 1h;
}}
access_log {LOGS_DIR}/nginx-access.log;
error_log {LOGS_DIR}/nginx-error.log;
}}
"""
_write_nginx_conf(conf)
def _write_nginx_conf(conf):
with open("/etc/nginx/sites-available/argos", "w") as f:
f.write(conf)
p = Path("/etc/nginx/sites-enabled/argos")
if not p.exists():
p.symlink_to("/etc/nginx/sites-available/argos")
# Rimuovi default e setup temporaneo
for f in ["/etc/nginx/sites-enabled/default", "/etc/nginx/sites-enabled/argos-setup"]:
if Path(f).exists(): Path(f).unlink()
def _write_services():
venv_py = str(APP_DIR / "backend/venv/bin/python3")
backend_dir = str(APP_DIR / "backend")
services = {
"backend": ("services/argos_backend.py", "Backend API"),
"sync": ("services/argos_sync.py", "Sync Daemon"),
"ops": ("services/argos_ops.py", "Ops Daemon"),
"analytics": ("services/argos_analytics.py", "Analytics Daemon"),
}
for svc, (script, desc) in services.items():
unit = f"""[Unit]
Description=ARGOS {desc}
After=network.target
[Service]
Type=simple
User={APP_USER}
Group={APP_USER}
WorkingDirectory={backend_dir}
Environment=PYTHONPATH={backend_dir}
Environment=CONFIG_FILE={CONFIG_DIR}/argos.json
Environment=INTEGRATIONS_FILE={CONFIG_DIR}/integrations.json
Environment=DATA_DIR={DATA_DIR}
Environment=FEEDS_DIR={FEEDS_DIR}
Environment=LOGS_DIR={LOGS_DIR}
ExecStart={venv_py} {backend_dir}/{script}
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
SyslogIdentifier=argos-{svc}
[Install]
WantedBy=multi-user.target
"""
with open(f"/etc/systemd/system/argos-{svc}.service", "w") as f:
f.write(unit)
class SetupHandler(BaseHTTPRequestHandler):
def log_message(self, *args): pass
def do_GET(self):
path = urlparse(self.path).path
if path in ("/", "/setup"):
html_path = Path(__file__).parent / "setup.html"
if not html_path.exists():
self.send_response(404); self.end_headers(); return
html = html_path.read_bytes()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", len(html))
self.end_headers()
self.wfile.write(html)
elif path == "/api/status":
self._json({"done": install_done, "error": install_error, "log": install_log[-60:]})
elif path == "/api/machine-id":
self._json({"machine_id": get_machine_id()})
else:
self.send_response(404); self.end_headers()
def do_POST(self):
path = urlparse(self.path).path
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
if path == "/api/install":
try:
data = json.loads(body)
threading.Thread(target=install, args=(data,), daemon=True).start()
self._json({"ok": True})
except Exception as e:
self._json({"ok": False, "error": str(e)}, 400)
elif path == "/api/upload/cert":
SETUP_DIR.mkdir(parents=True, exist_ok=True)
(SETUP_DIR / "uploaded.crt").write_bytes(body)
self._json({"ok": True})
elif path == "/api/upload/key":
SETUP_DIR.mkdir(parents=True, exist_ok=True)
(SETUP_DIR / "uploaded.key").write_bytes(body)
self._json({"ok": True})
elif path == "/api/upload/logo":
SETUP_DIR.mkdir(parents=True, exist_ok=True)
(SETUP_DIR / "logo_cliente.png").write_bytes(body)
self._json({"ok": True})
elif path == "/api/license/upload":
ok, lic, err = verify_license(body)
if not ok:
self._json({"ok": False, "error": err}, 400)
return
# Salva licenza valida nel SETUP_DIR per essere usata durante install()
SETUP_DIR.mkdir(parents=True, exist_ok=True)
lic_path = SETUP_DIR / "license.json"
lic_path.write_bytes(body)
os.chmod(lic_path, 0o600)
# Risposta: solo dati sommari (non rinvia token in plain)
self._json({
"ok": True,
"summary": {
"customer": lic.get("customer", ""),
"tier": lic.get("tier", ""),
"issued_to": lic.get("issued_to", ""),
"issued_at": lic.get("issued_at", ""),
"expires_at": lic.get("expires_at", ""),
"has_gitea": bool(lic.get("gitea_token")),
}
})
else:
self.send_response(404); self.end_headers()
def _json(self, data, code=200):
body = json.dumps(data).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(body))
self.end_headers()
self.wfile.write(body)
def _schedule_cleanup():
"""Schedula disabilitazione service argos-setup + rimozione /opt/argos-setup-pkg/.
Usa systemd-run --scope per lanciare in background con delay, cosi'
questa funzione puo' ritornare prima che il cleanup inizi (altrimenti
il processo cancellerebbe se stesso sotto i piedi).
"""
import subprocess, shlex
script = r"""#!/bin/bash
# Attendi 5 secondi per permettere al service padre di rispondere all'ultima
# richiesta HTTP di stato e chiudere pulito.
sleep 5
# Disabilita e ferma il service argos-setup
systemctl stop argos-setup.service 2>/dev/null || true
systemctl disable argos-setup.service 2>/dev/null || true
rm -f /etc/systemd/system/argos-setup.service
systemctl daemon-reload
# Chiudi porta 8888 nel firewall
ufw delete allow 8888/tcp 2>/dev/null || true
# Rimuovi la cartella del pacchetto (auto-delete del codice corrente)
rm -rf /opt/argos-setup-pkg
# Log
echo "argos-setup cleanup completato $(date -Iseconds)" >> /var/log/argos-setup-cleanup.log
"""
# Scrivi lo script in /tmp e lanciato via systemd-run detached
script_path = "/tmp/argos-setup-cleanup.sh"
try:
with open(script_path, "w") as f:
f.write(script)
import os
os.chmod(script_path, 0o755)
# systemd-run lancia in uno scope separato: sopravvive al termine di questo processo
subprocess.Popen(
["systemd-run", "--no-block", "--unit", "argos-setup-cleanup",
"--scope", "/bin/bash", script_path],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
log("Cleanup schedulato via systemd-run (delay 5s)")
except Exception as e:
log(f"Errore schedulazione cleanup: {e}")
if __name__ == "__main__":
print(f"\n{'='*55}")
print(f" ARGOS SOC — Web Installer")
print(f" Tecnotel Servizi SRL")
print(f" Apri: http://<IP_SERVER>:{PORT}")
print(f"{'='*55}\n")
HTTPServer(("0.0.0.0", PORT), SetupHandler).serve_forever()