#!/usr/bin/env python3 """ ARGOS SOC — Web Installer Server Tecnotel Servizi SRL """ import json import os import subprocess import secrets import shutil import hashlib import threading import signal from datetime import datetime, timezone from pathlib import Path from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse APP_DIR = Path("/opt/argos/app") CONFIG_DIR = Path("/opt/argos/config") DATA_DIR = Path("/opt/argos/data") FEEDS_DIR = Path("/opt/argos/feeds") LOGS_DIR = Path("/opt/argos/logs") CERTS_DIR = Path("/opt/argos/certs") SETUP_DIR = Path("/opt/argos/setup") APP_USER = "argos" PORT = 8888 # ── Licenza ARGOS — pubkey per verifica firma Ed25519 ───────────────────────── # Stessa pubkey hardcoded in backend/core.py. Raw Ed25519 (32 byte) base64. # Il setup_server la usa per verificare licenze ricevute dal cliente PRIMA # di procedere con l'installazione. _LICENSE_PUBLIC_KEY_B64 = "GMRsZMoxOlCBiJU66EsQcj0ZO0gVd0GHB5LelEo/hns=" # ── Clone argos: username Basic Auth bot Gitea (costante del sistema) ───────── GITEA_BOT_USER = "argos-portal-bot" GITEA_REPO_PATH = "/tecnotel/argos.git" install_log = [] install_done = False install_error = False def get_machine_id() -> str: """Fingerprint univoco del server (stesso algoritmo di core.py). SHA256 hex di: /etc/machine-id | hostname | MAC prima interfaccia fisica. """ import socket as _sock import subprocess as _sp parts = [] try: with open("/etc/machine-id") as f: parts.append(f.read().strip()) except Exception: parts.append("") try: parts.append(_sock.gethostname()) except Exception: parts.append("") try: r = _sp.run(["cat", "/sys/class/net/eth0/address"], capture_output=True, text=True, timeout=2) mac = r.stdout.strip() if not mac or mac == "00:00:00:00:00:00": r = _sp.run(["ip", "-o", "link", "show"], capture_output=True, text=True, timeout=2) for line in r.stdout.splitlines(): if "link/ether" in line and "00:00:00:00:00:00" not in line: if "docker" in line or "br-" in line or "veth" in line: continue mac = line.split("link/ether")[1].split()[0].strip() break parts.append(mac or "") except Exception: parts.append("") combined = "|".join(parts) return hashlib.sha256(combined.encode()).hexdigest() def verify_license(raw_bytes): """Verifica firma Ed25519 + machine_id match + scadenza. Ritorna tupla (ok, license_dict, error_message). Se ok=True, license_dict contiene il payload completo (inclusi gitea_url, gitea_token se presenti). Se ok=False, error_message e' umano-friendly. """ try: raw = json.loads(raw_bytes) except Exception as e: return (False, None, f"File non e' JSON valido: {e}") if not isinstance(raw, dict): return (False, None, "Formato licenza non riconosciuto") # Verifica firma try: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey from cryptography.exceptions import InvalidSignature import base64 sig = raw.pop("signature", "") if not sig: return (False, None, "Licenza senza firma (campo 'signature' mancante)") payload = json.dumps(raw, sort_keys=True, separators=(",", ":")) raw["signature"] = sig # ripristina pub_bytes = base64.b64decode(_LICENSE_PUBLIC_KEY_B64) pub = Ed25519PublicKey.from_public_bytes(pub_bytes) try: pub.verify(base64.b64decode(sig), payload.encode()) except InvalidSignature: return (False, None, "Firma non valida. La licenza potrebbe essere manipolata o emessa da altro vendor.") except ImportError: return (False, None, "Libreria 'cryptography' non disponibile. Installa: apt install python3-cryptography") except Exception as e: return (False, None, f"Errore verifica firma: {e}") # Verifica machine_id lic_machine = raw.get("machine_id", "") if not lic_machine: return (False, None, "Licenza senza machine_id — formato non supportato") cur_machine = get_machine_id() if lic_machine != cur_machine: return (False, None, f"Machine ID non corrisponde: licenza per {lic_machine[:12]}..., " f"questo server e' {cur_machine[:12]}... " "La licenza non e' valida per questa macchina.") # Verifica scadenza expires = raw.get("expires_at", "") if expires: today = datetime.now().strftime("%Y-%m-%d") if expires < today: return (False, None, f"Licenza scaduta il {expires}") # Verifica presenza credenziali Gitea (la licenza deve averle per permettere clone) if not raw.get("gitea_url") or not raw.get("gitea_token"): return (False, None, "Licenza priva di credenziali Gitea. " "Contatta Tecnotel per riemetterla con il nuovo formato.") return (True, raw, "") def log(msg): ts = datetime.now().strftime("%H:%M:%S") line = f"[{ts}] {msg}" install_log.append(line) print(line, flush=True) def run(cmd, check=True): log(f"$ {cmd}") result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.stdout.strip(): log(result.stdout.strip()) if result.stderr.strip(): log(result.stderr.strip()) if check and result.returncode != 0: raise RuntimeError(f"Comando fallito (exit {result.returncode}): {cmd}") return result def chown(path): run(f"chown -R {APP_USER}:{APP_USER} {path}", check=False) def generate_argos_json(data): # Hostname primario dal wizard (campo "domain" nella UI = FQDN tipo soc.azienda.it) hostname = data.get("domain", "").strip().lower() # Alias dal wizard: stringa separata da spazi -> lista pulita aliases_raw = data.get("aliases", "").strip() aliases = [a.strip().lower() for a in aliases_raw.split() if a.strip()] return { "_version": "1.2", "_installed": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "cliente": { "name": data.get("cliente_name", ""), "full_name": data.get("cliente_full", ""), "domain": data.get("cliente_domain", ""), "type": data.get("cliente_type", "enterprise"), "ai_context": data.get("ai_context", ""), "console_url": "" }, "network": { "hostname": hostname, "aliases": aliases }, "system": { "secret_key": secrets.token_hex(32), "internal_api_key": secrets.token_hex(24), "anthropic_key": "", "tz": "Europe/Rome", "vendor_heartbeat": { "enabled": True, "url": "https://argos-update.tecnotelsrl.com", "install_key": "5b1ab5c872383f686d3a25a5e123adca", "interval_h": 6 } }, "opensearch": { "url": data.get("os_url", ""), "user": data.get("os_user", "admin"), "password": data.get("os_pass", "") }, "paths": { "data_dir": str(DATA_DIR), "feeds_dir": str(FEEDS_DIR), "logs_dir": str(LOGS_DIR), "config_dir": str(CONFIG_DIR), "analytics_db": str(DATA_DIR / "analytics.db"), "analytics_exclude_entities": "" }, "ports": { "backend": 8080, "sync": 8081, "ops": 8082, "analytics": 8083 } } def generate_integrations_json(): example = APP_DIR / "config/integrations.json.example" if example.exists(): with open(example) as f: return json.load(f) # fallback minimale return { "_version": "1.0", "endpoint": {}, "firewall": {}, "identity": {}, "notifications": {}, "threat_intel": {}, "pdf": { "org_name": "", "author": "Tecnotel Servizi SRL", "motto": "Controllo totale. Difesa continua.", "classification": "RISERVATO - USO INTERNO", "app_logo": str(APP_DIR / "frontend/dist/logo_argos_bianco.png"), "client_logo": str(CONFIG_DIR / "assets" / "logo_cliente.png") } } def create_admin_user(data): username = data.get("admin_username", "admin").strip() email = data.get("admin_email_user", "").strip() password = data.get("admin_password", "").strip() if not username or not password: log("WARN: credenziali admin mancanti — skip creazione utente") return import os as _os # Hash compatibile con auth.py: salt:sha256(salt+pw+"argos-2026") salt = _os.urandom(16).hex() pw_hash = f"{salt}:{hashlib.sha256((salt + password + 'argos-2026').encode()).hexdigest()}" users_file = CONFIG_DIR / "argos_users.json" DATA_DIR.mkdir(parents=True, exist_ok=True) # Leggi utenti esistenti o crea struttura vuota try: with open(users_file) as f: users_data = json.load(f) except Exception: users_data = {"users": {}} users_data["users"][username.lower()] = { "password": pw_hash, "role": "admin", "name": username, "email": email, "enabled": True } with open(users_file, "w") as f: json.dump(users_data, f, indent=2) os.chmod(users_file, 0o600) chown(users_file) log(f"Utente admin '{username}' creato in argos_users.json") def install(data): global install_done, install_error try: log("=== AVVIO INSTALLAZIONE ARGOS SOC ===") # 0. Carica licenza (gia' validata da /api/license/upload) log("── Verifica licenza ARGOS ──") lic_path = SETUP_DIR / "license.json" if not lic_path.exists(): raise RuntimeError( "License.json non trovata in /opt/argos/setup/. " "Caricare una licenza valida prima di avviare l'installazione." ) ok, lic, err = verify_license(lic_path.read_bytes()) if not ok: raise RuntimeError(f"Licenza non valida: {err}") gitea_url = lic.get("gitea_url", "").rstrip("/") gitea_token = lic.get("gitea_token", "") # Deriva host dal gitea_url (es: https://host/api/v1 -> https://host) gitea_host = gitea_url[:-len("/api/v1")] if gitea_url.endswith("/api/v1") else gitea_url log(f"Licenza OK: {lic.get('customer')} / {lic.get('tier')} / exp {lic.get('expires_at')}") # 1. Clone repository argos (URL autenticato temporaneo: token NON in .git/config) log("── Clone repository ARGOS ──") if APP_DIR.exists() and (APP_DIR / ".git").exists(): log("Repository gia' presente — skip clone") else: auth_url = f"https://{GITEA_BOT_USER}:{gitea_token}@{gitea_host[len('https://'):]}{GITEA_REPO_PATH}" APP_DIR.parent.mkdir(parents=True, exist_ok=True) # Clona con URL autenticato # Git >= 2.35 rifiuta operazioni su repo con ownership "dubbia" # (es. clone come root in dir pre-creata come argos). Aggiungiamo # /opt/argos/app a safe.directory globale PRIMA del clone. run(f"git config --global --add safe.directory {APP_DIR}") run(f"git clone {auth_url} {APP_DIR}") # Dopo clone, ripulisci .git/config rimuovendo il token clean_url = f"{gitea_host}{GITEA_REPO_PATH}" run(f"git -C {APP_DIR} remote set-url origin {clean_url}") run(f"chown -R {APP_USER}:{APP_USER} {APP_DIR}") log("Repository ARGOS clonato") # 2. Python virtualenv log("── Creazione virtualenv Python ──") venv_dir = APP_DIR / "backend/venv" if not venv_dir.exists(): run(f"python3 -m venv {venv_dir}") run(f"{venv_dir}/bin/pip install --upgrade pip -q") req_file = APP_DIR / "backend/requirements.txt" if req_file.exists(): run(f"{venv_dir}/bin/pip install -r {req_file} -q") run(f"chown -R {APP_USER}:{APP_USER} {venv_dir}") log("Virtualenv Python pronto") # 3. argos.json log("── Generazione argos.json ──") CONFIG_DIR.mkdir(parents=True, exist_ok=True) config = generate_argos_json(data) cfg_path = CONFIG_DIR / "argos.json" with open(cfg_path, "w") as f: json.dump(config, f, indent=2, ensure_ascii=False) os.chmod(cfg_path, 0o600) chown(CONFIG_DIR) log("argos.json creato") # 4. integrations.json log("── Generazione integrations.json ──") integ = generate_integrations_json() # Aggiorna pdf con nome organizzazione integ.setdefault("pdf", {}) integ["pdf"]["org_name"] = data.get("cliente_full") or data.get("cliente_name", "") integ["pdf"]["client_logo"] = str(CONFIG_DIR / "assets" / "logo_cliente.png") integ_path = CONFIG_DIR / "integrations.json" with open(integ_path, "w") as f: json.dump(integ, f, indent=2, ensure_ascii=False) os.chmod(integ_path, 0o600) chown(integ_path) log("integrations.json creato") # 5. modules.json mods = APP_DIR / "config/modules.json.example" if mods.exists(): shutil.copy(mods, CONFIG_DIR / "modules.json") chown(CONFIG_DIR / "modules.json") log("modules.json copiato") # 5b. File .example aggiuntivi (config sezioni recenti) # - automations.json: config feed TI sources + cron daemon TI # - siem_integrations.json: catalogo SIEM Integration Builder # - subnet_registry.json: mapping sede/reparto da subnet for stem in ("automations", "siem_integrations", "subnet_registry"): src = APP_DIR / f"config/{stem}.json.example" dst = CONFIG_DIR / f"{stem}.json" if src.exists() and not dst.exists(): shutil.copy(src, dst) os.chmod(dst, 0o640) chown(dst) log(f"{stem}.json copiato da template") # 6. Logo cliente logo_src = SETUP_DIR / "logo_cliente.png" if logo_src.exists(): DATA_DIR.mkdir(parents=True, exist_ok=True) shutil.copy(logo_src, CONFIG_DIR / "assets" / "logo_cliente.png") chown(CONFIG_DIR / "assets" / "logo_cliente.png") log("Logo cliente copiato") # 7. Build frontend log("── Build frontend React ──") run(f"cd {APP_DIR}/frontend && npm install --silent") run(f"cd {APP_DIR}/frontend && npm run build") chown(APP_DIR / "frontend") # Permessi lettura per nginx (www-data) run(f"chmod 755 /opt/argos /opt/argos/app /opt/argos/app/frontend {APP_DIR}/frontend/dist") run(f"chmod -R 755 {APP_DIR}/frontend/dist/") log("Frontend buildato") # 8. Dipendenze Python (re-run in caso di aggiornamenti) log("── Dipendenze Python ──") venv_pip = APP_DIR / "backend/venv/bin/pip" req = APP_DIR / "backend/requirements.txt" if req.exists(): run(f"{venv_pip} install -r {req} -q") log("Dipendenze Python installate") # 9. Utente admin log("── Creazione utente admin ──") create_admin_user(data) # 10. SSL log("── Configurazione SSL ──") domain = data.get("domain", "").strip() aliases = data.get("aliases", "").strip() ssl_mode = data.get("ssl_mode", "letsencrypt") all_names = (domain + " " + aliases).strip() if ssl_mode == "manual": crt_src = SETUP_DIR / "uploaded.crt" key_src = SETUP_DIR / "uploaded.key" if not crt_src.exists() or not key_src.exists(): raise RuntimeError("File SSL .crt o .key non trovati in /opt/argos/setup/") CERTS_DIR.mkdir(parents=True, exist_ok=True) shutil.copy(crt_src, CERTS_DIR / "fullchain.pem") shutil.copy(key_src, CERTS_DIR / "privkey.pem") os.chmod(CERTS_DIR / "privkey.pem", 0o600) ssl_crt = str(CERTS_DIR / "fullchain.pem") ssl_key = str(CERTS_DIR / "privkey.pem") log("Certificato SSL manuale copiato") elif ssl_mode == "selfsigned": # Cert autofirmato — utile per installazioni LAN/dev senza DNS # pubblico. Validita' 10 anni, RSA 4096, SAN da hostname+alias. log("Generazione certificato autofirmato (RSA 4096, validita' 10 anni)") CERTS_DIR.mkdir(parents=True, exist_ok=True) crt_path = CERTS_DIR / "fullchain.pem" key_path = CERTS_DIR / "privkey.pem" cnf_path = CERTS_DIR / "openssl-selfsigned.cnf" # Build subjectAltName list: tutti i nomi DNS + IP server san_dns = [n for n in all_names.split() if n] try: server_ip = subprocess.check_output( ["hostname", "-I"], text=True ).strip().split()[0] except Exception: server_ip = "" san_lines = "\n".join(f"DNS.{i+1} = {n}" for i, n in enumerate(san_dns)) if server_ip: san_lines += f"\nIP.1 = {server_ip}" # Subject info: cliente_full come O, domain come CN client_full = data.get("cliente_full") or data.get("cliente_name") or "ARGOS" cn = domain or "argos.local" cnf = f"""[req] default_bits = 4096 prompt = no default_md = sha256 distinguished_name = dn req_extensions = req_ext x509_extensions = v3_ext [dn] C = IT O = {client_full} OU = ARGOS SOC CN = {cn} [req_ext] subjectAltName = @alt_names [v3_ext] subjectAltName = @alt_names basicConstraints = critical, CA:FALSE keyUsage = critical, digitalSignature, keyEncipherment extendedKeyUsage = serverAuth [alt_names] {san_lines} """ cnf_path.write_text(cnf) # Genero la chiave e il cert in un solo passo run( f"openssl req -x509 -nodes -days 3650 -newkey rsa:4096 " f"-keyout {key_path} -out {crt_path} -config {cnf_path}" ) os.chmod(key_path, 0o600) os.chmod(crt_path, 0o644) chown(key_path) chown(crt_path) ssl_crt = str(crt_path) ssl_key = str(key_path) log(f"Cert autofirmato generato (CN={cn}, SAN: {len(san_dns)} DNS" f"{' + 1 IP' if server_ip else ''})") log("⚠️ ATTENZIONE: i browser segnaleranno il cert come non attendibile.") log(" Usare solo per LAN/test. Per produzione: Let's Encrypt o cert CA.") else: _write_nginx_http(all_names) run("nginx -t && systemctl restart nginx") certbot_d = " ".join(f"-d {n}" for n in all_names.split()) email = data.get("admin_email", "admin@tecnotelsrl.com") run(f"certbot --nginx {certbot_d} --non-interactive --agree-tos -m {email}") ssl_crt = f"/etc/letsencrypt/live/{domain}/fullchain.pem" ssl_key = f"/etc/letsencrypt/live/{domain}/privkey.pem" log("Certificato Let's Encrypt ottenuto") # 11. Nginx finale log("── Nginx configurazione finale ──") _write_nginx_final(all_names, ssl_crt, ssl_key) run("nginx -t && systemctl restart nginx") log("Nginx configurato") # 12. Systemd services log("── Creazione e avvio servizi ──") _write_services() run("systemctl daemon-reload") for svc in ["argos-backend", "argos-sync", "argos-ops", "argos-analytics"]: run(f"systemctl enable --now {svc}") log(f"{svc} avviato") # 13. Copia licenza + chiudi web installer log("── Copia licenza in posizione finale ──") DATA_DIR.mkdir(parents=True, exist_ok=True) final_lic = DATA_DIR / "license.json" shutil.copy(lic_path, final_lic) os.chmod(final_lic, 0o600) chown(final_lic) log(f"Licenza copiata in {final_lic}") log("── Chiusura web installer ──") run("systemctl disable --now argos-setup", check=False) run("ufw delete allow 8888/tcp", check=False) log("Porta 8888 chiusa — web installer disabilitato") log("=== INSTALLAZIONE COMPLETATA ===") _schedule_cleanup() install_done = True # Spegni il processo dopo 8s (tempo per inviare la risposta al browser) def shutdown(): import time time.sleep(15) os.kill(os.getpid(), signal.SIGTERM) threading.Thread(target=shutdown, daemon=True).start() except Exception as e: log(f"ERRORE: {e}") install_log.append(f"__ERROR__: {e}") install_error = True def _write_nginx_http(all_names): conf = f"""server {{ listen 80; server_name {all_names}; location /.well-known/acme-challenge/ {{ root /var/www/html; }} location / {{ return 301 https://$host$request_uri; }} }} """ _write_nginx_conf(conf) def _write_nginx_final(all_names, ssl_crt, ssl_key): conf = f"""limit_req_zone $binary_remote_addr zone=argos:10m rate=20r/s; server {{ listen 80; server_name {all_names}; location / {{ return 301 https://$host$request_uri; }} location /.well-known/acme-challenge/ {{ root /var/www/html; }} }} server {{ listen 443 ssl http2; server_name {all_names}; ssl_certificate {ssl_crt}; ssl_certificate_key {ssl_key}; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers HIGH:!aNULL:!MD5; add_header X-Frame-Options SAMEORIGIN; add_header X-Content-Type-Options nosniff; add_header Strict-Transport-Security "max-age=31536000" always; client_max_body_size 50m; location /api/analytics/ {{ limit_req zone=argos burst=40 nodelay; proxy_pass http://127.0.0.1:8083; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto https; proxy_read_timeout 120s; }} location /feeds/ {{ alias {FEEDS_DIR}/; autoindex off; default_type "text/plain; charset=utf-8"; access_log {LOGS_DIR}/nginx-feeds.log; }} location /api/ {{ limit_req zone=argos burst=40 nodelay; proxy_pass http://127.0.0.1:8080; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto https; # Timeout estesi per /api/soc-report/generate # (collect + 8 AI narrative parallele + PDF ~2:30) proxy_connect_timeout 60s; proxy_send_timeout 300s; proxy_read_timeout 300s; client_body_timeout 300s; }} location / {{ root {APP_DIR}/frontend/dist; try_files $uri $uri/ /index.html; expires 1h; }} access_log {LOGS_DIR}/nginx-access.log; error_log {LOGS_DIR}/nginx-error.log; }} """ _write_nginx_conf(conf) def _write_nginx_conf(conf): with open("/etc/nginx/sites-available/argos", "w") as f: f.write(conf) p = Path("/etc/nginx/sites-enabled/argos") if not p.exists(): p.symlink_to("/etc/nginx/sites-available/argos") # Rimuovi default e setup temporaneo for f in ["/etc/nginx/sites-enabled/default", "/etc/nginx/sites-enabled/argos-setup"]: if Path(f).exists(): Path(f).unlink() def _write_services(): """Genera le 4 unit systemd ARGOS in versione Gunicorn. Mappatura service → wsgi target: - argos-backend : argos_backend:app (no wrapper, app a top-level) - argos-sync : argos_sync_wsgi:app (wrapper avvia main_loop) - argos-ops : argos_ops_wsgi:app (wrapper avvia main_loop) - argos-analytics : argos_analytics_wsgi:app (wrapper avvia _scheduler_loop) Vincolo critico: workers=1 obbligatorio per sync/ops/analytics. Hanno scheduler in-memory + thread daemon che, con N worker, sarebbero duplicati N volte (job doppi, file lock contention, race su SQLite). Backend e' API stateless: 2 worker sync per migliorare throughput. Gli altri usano gthread (threads=4) per servire API mentre lo scheduler interno gira nel thread daemon. Aggiornato 04/05/2026: migrazione da Flask dev server a Gunicorn 25.3+. """ venv_bin = str(APP_DIR / "backend/venv/bin") backend_dir = str(APP_DIR / "backend") services_dir = str(APP_DIR / "backend/services") # (wsgi_target, workers, threads, timeout_sec, port, description) # # NOTA backend (1 worker + 8 threads, era 2 sync workers fino al 2026-05-04): # Le sessioni utente sono mantenute in-memory (_sessions_cache in core.py) # con persistence su file (argos_sessions.json) MA senza ricarica per # worker. Con N>1 worker, il worker B non vede sessioni create da worker # A → 401 random post-login (round-robin kernel). Workaround attuale: # 1 worker con 8 threads = concorrenza intra-process via GIL release su # I/O. Sufficiente per il carico ASREM. Soluzione futura: session store # condiviso (Redis o SQLite) per scalare a N>1 worker. services = { "backend": ("argos_backend:app", 1, 8, 120, 8080, "Backend API"), "sync": ("argos_sync_wsgi:app", 1, 4, 600, 8081, "Sync Daemon"), "ops": ("argos_ops_wsgi:app", 1, 4, 600, 8082, "Ops Daemon"), "analytics": ("argos_analytics_wsgi:app", 1, 4, 300, 8083, "Analytics Daemon"), } for svc, (wsgi, workers, threads, timeout, port, desc) in services.items(): # Tutti i servizi ora usano gthread (threads>=4); workers=1 obbligatorio. threads_line = f" --threads {threads} \\\n" if threads > 1 else "" unit = f"""[Unit] Description=ARGOS {desc} (Gunicorn) After=network.target [Service] Type=simple User={APP_USER} Group={APP_USER} WorkingDirectory={services_dir} Environment=PYTHONPATH={backend_dir}:{services_dir} Environment=CONFIG_FILE={CONFIG_DIR}/argos.json Environment=INTEGRATIONS_FILE={CONFIG_DIR}/integrations.json Environment=DATA_DIR={DATA_DIR} Environment=FEEDS_DIR={FEEDS_DIR} Environment=LOGS_DIR={LOGS_DIR} ExecStart={venv_bin}/gunicorn {wsgi} \\ --workers {workers} \\ {threads_line} --bind 127.0.0.1:{port} \\ --timeout {timeout} \\ --graceful-timeout 30 \\ --keep-alive 5 \\ --access-logfile - \\ --error-logfile - \\ --log-level info \\ --capture-output Restart=always RestartSec=5 StandardOutput=journal StandardError=journal SyslogIdentifier=argos-{svc} [Install] WantedBy=multi-user.target """ with open(f"/etc/systemd/system/argos-{svc}.service", "w") as f: f.write(unit) class SetupHandler(BaseHTTPRequestHandler): def log_message(self, *args): pass def do_GET(self): path = urlparse(self.path).path if path in ("/", "/setup"): html_path = Path(__file__).parent / "setup.html" if not html_path.exists(): self.send_response(404); self.end_headers(); return html = html_path.read_bytes() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", len(html)) self.end_headers() self.wfile.write(html) elif path == "/api/status": self._json({"done": install_done, "error": install_error, "log": install_log[-60:]}) elif path == "/api/machine-id": self._json({"machine_id": get_machine_id()}) else: self.send_response(404); self.end_headers() def do_POST(self): path = urlparse(self.path).path length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(length) if path == "/api/install": try: data = json.loads(body) threading.Thread(target=install, args=(data,), daemon=True).start() self._json({"ok": True}) except Exception as e: self._json({"ok": False, "error": str(e)}, 400) elif path == "/api/upload/cert": SETUP_DIR.mkdir(parents=True, exist_ok=True) (SETUP_DIR / "uploaded.crt").write_bytes(body) self._json({"ok": True}) elif path == "/api/upload/key": SETUP_DIR.mkdir(parents=True, exist_ok=True) (SETUP_DIR / "uploaded.key").write_bytes(body) self._json({"ok": True}) elif path == "/api/upload/logo": SETUP_DIR.mkdir(parents=True, exist_ok=True) (SETUP_DIR / "logo_cliente.png").write_bytes(body) self._json({"ok": True}) elif path == "/api/license/upload": ok, lic, err = verify_license(body) if not ok: self._json({"ok": False, "error": err}, 400) return # Salva licenza valida nel SETUP_DIR per essere usata durante install() SETUP_DIR.mkdir(parents=True, exist_ok=True) lic_path = SETUP_DIR / "license.json" lic_path.write_bytes(body) os.chmod(lic_path, 0o600) # Risposta: solo dati sommari (non rinvia token in plain) self._json({ "ok": True, "summary": { "customer": lic.get("customer", ""), "tier": lic.get("tier", ""), "issued_to": lic.get("issued_to", ""), "issued_at": lic.get("issued_at", ""), "expires_at": lic.get("expires_at", ""), "has_gitea": bool(lic.get("gitea_token")), } }) else: self.send_response(404); self.end_headers() def _json(self, data, code=200): body = json.dumps(data).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", len(body)) self.end_headers() self.wfile.write(body) def _schedule_cleanup(): """Schedula disabilitazione service argos-setup + rimozione /opt/argos-setup-pkg/. Usa systemd-run --scope per lanciare in background con delay, cosi' questa funzione puo' ritornare prima che il cleanup inizi (altrimenti il processo cancellerebbe se stesso sotto i piedi). """ import subprocess, shlex script = r"""#!/bin/bash # Attendi 5 secondi per permettere al service padre di rispondere all'ultima # richiesta HTTP di stato e chiudere pulito. sleep 5 # Disabilita e ferma il service argos-setup systemctl stop argos-setup.service 2>/dev/null || true systemctl disable argos-setup.service 2>/dev/null || true rm -f /etc/systemd/system/argos-setup.service systemctl daemon-reload # Chiudi porta 8888 nel firewall ufw delete allow 8888/tcp 2>/dev/null || true # Rimuovi la cartella del pacchetto (auto-delete del codice corrente) rm -rf /opt/argos-setup-pkg # Log echo "argos-setup cleanup completato $(date -Iseconds)" >> /var/log/argos-setup-cleanup.log """ # Scrivi lo script in /tmp e lanciato via systemd-run come transient unit # indipendente (NO --scope: lo scope eredita cgroup del padre, viene killato # quando setup_server muore prima che lo script completi i 5s di sleep + rm). script_path = "/tmp/argos-setup-cleanup.sh" try: with open(script_path, "w") as f: f.write(script) import os, time os.chmod(script_path, 0o755) # systemd-run senza --scope: crea transient .service unit indipendente # che sopravvive al termine del processo padre. Unit name include # timestamp per evitare conflitti se setup viene rilanciato. unit_name = f"argos-setup-cleanup-{int(time.time())}.service" subprocess.Popen( ["systemd-run", "--no-block", "--unit", unit_name, "/bin/bash", script_path], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) log(f"Cleanup schedulato via systemd-run come {unit_name} (delay 5s)") except Exception as e: log(f"Errore schedulazione cleanup: {e}") if __name__ == "__main__": print(f"\n{'='*55}") print(f" ARGOS SOC — Web Installer") print(f" Tecnotel Servizi SRL") print(f" Apri: http://:{PORT}") print(f"{'='*55}\n") HTTPServer(("0.0.0.0", PORT), SetupHandler).serve_forever()