#!/usr/bin/env python3 """ ARGOS SOC — Web Installer Server Tecnotel Servizi SRL """ import json import os import subprocess import secrets import shutil import hashlib import threading import signal from datetime import datetime, timezone from pathlib import Path from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse APP_DIR = Path("/opt/argos/app") CONFIG_DIR = Path("/opt/argos/config") DATA_DIR = Path("/opt/argos/data") FEEDS_DIR = Path("/opt/argos/feeds") LOGS_DIR = Path("/opt/argos/logs") CERTS_DIR = Path("/opt/argos/certs") SETUP_DIR = Path("/opt/argos/setup") APP_USER = "argos" PORT = 8888 # ── Licenza ARGOS — pubkey per verifica firma Ed25519 ───────────────────────── # Stessa pubkey hardcoded in backend/core.py. Raw Ed25519 (32 byte) base64. # Il setup_server la usa per verificare licenze ricevute dal cliente PRIMA # di procedere con l'installazione. _LICENSE_PUBLIC_KEY_B64 = "GMRsZMoxOlCBiJU66EsQcj0ZO0gVd0GHB5LelEo/hns=" # ── Clone argos: username Basic Auth bot Gitea (costante del sistema) ───────── GITEA_BOT_USER = "argos-portal-bot" GITEA_REPO_PATH = "/tecnotel/argos.git" install_log = [] install_done = False install_error = False def get_machine_id() -> str: """Fingerprint univoco del server (stesso algoritmo di core.py). SHA256 hex di: /etc/machine-id | hostname | MAC prima interfaccia fisica. """ import socket as _sock import subprocess as _sp parts = [] try: with open("/etc/machine-id") as f: parts.append(f.read().strip()) except Exception: parts.append("") try: parts.append(_sock.gethostname()) except Exception: parts.append("") try: r = _sp.run(["cat", "/sys/class/net/eth0/address"], capture_output=True, text=True, timeout=2) mac = r.stdout.strip() if not mac or mac == "00:00:00:00:00:00": r = _sp.run(["ip", "-o", "link", "show"], capture_output=True, text=True, timeout=2) for line in r.stdout.splitlines(): if "link/ether" in line and "00:00:00:00:00:00" not in line: if "docker" in line or "br-" in line or "veth" in line: continue mac = line.split("link/ether")[1].split()[0].strip() break parts.append(mac or "") except Exception: parts.append("") combined = "|".join(parts) return hashlib.sha256(combined.encode()).hexdigest() def verify_license(raw_bytes): """Verifica firma Ed25519 + machine_id match + scadenza. Ritorna tupla (ok, license_dict, error_message). Se ok=True, license_dict contiene il payload completo (inclusi gitea_url, gitea_token se presenti). Se ok=False, error_message e' umano-friendly. """ try: raw = json.loads(raw_bytes) except Exception as e: return (False, None, f"File non e' JSON valido: {e}") if not isinstance(raw, dict): return (False, None, "Formato licenza non riconosciuto") # Verifica firma try: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey from cryptography.exceptions import InvalidSignature import base64 sig = raw.pop("signature", "") if not sig: return (False, None, "Licenza senza firma (campo 'signature' mancante)") payload = json.dumps(raw, sort_keys=True, separators=(",", ":")) raw["signature"] = sig # ripristina pub_bytes = base64.b64decode(_LICENSE_PUBLIC_KEY_B64) pub = Ed25519PublicKey.from_public_bytes(pub_bytes) try: pub.verify(base64.b64decode(sig), payload.encode()) except InvalidSignature: return (False, None, "Firma non valida. La licenza potrebbe essere manipolata o emessa da altro vendor.") except ImportError: return (False, None, "Libreria 'cryptography' non disponibile. Installa: apt install python3-cryptography") except Exception as e: return (False, None, f"Errore verifica firma: {e}") # Verifica machine_id lic_machine = raw.get("machine_id", "") if not lic_machine: return (False, None, "Licenza senza machine_id — formato non supportato") cur_machine = get_machine_id() if lic_machine != cur_machine: return (False, None, f"Machine ID non corrisponde: licenza per {lic_machine[:12]}..., " f"questo server e' {cur_machine[:12]}... " "La licenza non e' valida per questa macchina.") # Verifica scadenza expires = raw.get("expires_at", "") if expires: today = datetime.now().strftime("%Y-%m-%d") if expires < today: return (False, None, f"Licenza scaduta il {expires}") # Verifica presenza credenziali Gitea (la licenza deve averle per permettere clone) if not raw.get("gitea_url") or not raw.get("gitea_token"): return (False, None, "Licenza priva di credenziali Gitea. " "Contatta Tecnotel per riemetterla con il nuovo formato.") return (True, raw, "") def log(msg): ts = datetime.now().strftime("%H:%M:%S") line = f"[{ts}] {msg}" install_log.append(line) print(line, flush=True) def run(cmd, check=True): log(f"$ {cmd}") result = subprocess.run(cmd, shell=True, capture_output=True, text=True) if result.stdout.strip(): log(result.stdout.strip()) if result.stderr.strip(): log(result.stderr.strip()) if check and result.returncode != 0: raise RuntimeError(f"Comando fallito (exit {result.returncode}): {cmd}") return result def chown(path): run(f"chown -R {APP_USER}:{APP_USER} {path}", check=False) def generate_argos_json(data): # Hostname primario dal wizard (campo "domain" nella UI = FQDN tipo soc.azienda.it) hostname = data.get("domain", "").strip().lower() # Alias dal wizard: stringa separata da spazi -> lista pulita aliases_raw = data.get("aliases", "").strip() aliases = [a.strip().lower() for a in aliases_raw.split() if a.strip()] return { "_version": "1.2", "_installed": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "cliente": { "name": data.get("cliente_name", ""), "full_name": data.get("cliente_full", ""), "domain": data.get("cliente_domain", ""), "type": data.get("cliente_type", "enterprise"), "ai_context": "", "sharepoint_tenant": "", "console_url": "" }, "network": { "hostname": hostname, "aliases": aliases }, "system": { "secret_key": secrets.token_hex(32), "internal_api_key": secrets.token_hex(24), "tz": "Europe/Rome" }, "opensearch": { "url": data.get("os_url", ""), "user": data.get("os_user", "admin"), "password": data.get("os_pass", "") }, "paths": { "data_dir": str(DATA_DIR), "feeds_dir": str(FEEDS_DIR), "logs_dir": str(LOGS_DIR), "config_dir": str(CONFIG_DIR), "analytics_db": str(DATA_DIR / "analytics.db") }, "ports": { "backend": 8080, "sync": 8081, "ops": 8082, "analytics": 8083 } } def generate_integrations_json(): example = APP_DIR / "config/integrations.json.example" if example.exists(): with open(example) as f: return json.load(f) # fallback minimale return { "_version": "1.0", "endpoint": {}, "firewall": {}, "identity": {}, "notifications": {}, "threat_intel": {}, "pdf": { "org_name": "", "author": "Tecnotel Servizi SRL", "motto": "Controllo totale. Difesa continua.", "classification": "RISERVATO - USO INTERNO", "app_logo": str(APP_DIR / "frontend/dist/logo_argos_bianco.png"), "client_logo": str(CONFIG_DIR / "assets" / "logo_cliente.png") } } def create_admin_user(data): username = data.get("admin_username", "admin").strip() email = data.get("admin_email_user", "").strip() password = data.get("admin_password", "").strip() if not username or not password: log("WARN: credenziali admin mancanti — skip creazione utente") return import os as _os # Hash compatibile con auth.py: salt:sha256(salt+pw+"argos-2026") salt = _os.urandom(16).hex() pw_hash = f"{salt}:{hashlib.sha256((salt + password + 'argos-2026').encode()).hexdigest()}" users_file = DATA_DIR / "argos_users.json" DATA_DIR.mkdir(parents=True, exist_ok=True) # Leggi utenti esistenti o crea struttura vuota try: with open(users_file) as f: users_data = json.load(f) except Exception: users_data = {"users": {}} users_data["users"][username.lower()] = { "password": pw_hash, "role": "admin", "name": username, "email": email, "enabled": True } with open(users_file, "w") as f: json.dump(users_data, f, indent=2) os.chmod(users_file, 0o600) chown(users_file) log(f"Utente admin '{username}' creato in argos_users.json") def install(data): global install_done, install_error try: log("=== AVVIO INSTALLAZIONE ARGOS SOC ===") # 0. Carica licenza (gia' validata da /api/license/upload) log("── Verifica licenza ARGOS ──") lic_path = SETUP_DIR / "license.json" if not lic_path.exists(): raise RuntimeError( "License.json non trovata in /opt/argos/setup/. " "Caricare una licenza valida prima di avviare l'installazione." ) ok, lic, err = verify_license(lic_path.read_bytes()) if not ok: raise RuntimeError(f"Licenza non valida: {err}") gitea_url = lic.get("gitea_url", "").rstrip("/") gitea_token = lic.get("gitea_token", "") # Deriva host dal gitea_url (es: https://host/api/v1 -> https://host) gitea_host = gitea_url[:-len("/api/v1")] if gitea_url.endswith("/api/v1") else gitea_url log(f"Licenza OK: {lic.get('customer')} / {lic.get('tier')} / exp {lic.get('expires_at')}") # 1. Clone repository argos (URL autenticato temporaneo: token NON in .git/config) log("── Clone repository ARGOS ──") if APP_DIR.exists() and (APP_DIR / ".git").exists(): log("Repository gia' presente — skip clone") else: auth_url = f"https://{GITEA_BOT_USER}:{gitea_token}@{gitea_host[len('https://'):]}{GITEA_REPO_PATH}" APP_DIR.parent.mkdir(parents=True, exist_ok=True) # Clona con URL autenticato run(f"git clone {auth_url} {APP_DIR}") # Dopo clone, ripulisci .git/config rimuovendo il token clean_url = f"{gitea_host}{GITEA_REPO_PATH}" run(f"git -C {APP_DIR} remote set-url origin {clean_url}") run(f"chown -R {APP_USER}:{APP_USER} {APP_DIR}") log("Repository ARGOS clonato") # 2. Python virtualenv log("── Creazione virtualenv Python ──") venv_dir = APP_DIR / "backend/venv" if not venv_dir.exists(): run(f"python3 -m venv {venv_dir}") run(f"{venv_dir}/bin/pip install --upgrade pip -q") req_file = APP_DIR / "backend/requirements.txt" if req_file.exists(): run(f"{venv_dir}/bin/pip install -r {req_file} -q") run(f"chown -R {APP_USER}:{APP_USER} {venv_dir}") log("Virtualenv Python pronto") # 3. argos.json log("── Generazione argos.json ──") CONFIG_DIR.mkdir(parents=True, exist_ok=True) config = generate_argos_json(data) cfg_path = CONFIG_DIR / "argos.json" with open(cfg_path, "w") as f: json.dump(config, f, indent=2, ensure_ascii=False) os.chmod(cfg_path, 0o600) chown(CONFIG_DIR) log("argos.json creato") # 4. integrations.json log("── Generazione integrations.json ──") integ = generate_integrations_json() # Aggiorna pdf con nome organizzazione integ.setdefault("pdf", {}) integ["pdf"]["org_name"] = data.get("cliente_full") or data.get("cliente_name", "") integ["pdf"]["client_logo"] = str(CONFIG_DIR / "assets" / "logo_cliente.png") integ_path = CONFIG_DIR / "integrations.json" with open(integ_path, "w") as f: json.dump(integ, f, indent=2, ensure_ascii=False) os.chmod(integ_path, 0o600) chown(integ_path) log("integrations.json creato") # 5. modules.json mods = APP_DIR / "config/modules.json.example" if mods.exists(): shutil.copy(mods, CONFIG_DIR / "modules.json") chown(CONFIG_DIR / "modules.json") log("modules.json copiato") # 6. Logo cliente logo_src = SETUP_DIR / "logo_cliente.png" if logo_src.exists(): DATA_DIR.mkdir(parents=True, exist_ok=True) shutil.copy(logo_src, CONFIG_DIR / "assets" / "logo_cliente.png") chown(CONFIG_DIR / "assets" / "logo_cliente.png") log("Logo cliente copiato") # 7. Build frontend log("── Build frontend React ──") run(f"cd {APP_DIR}/frontend && npm install --silent") run(f"cd {APP_DIR}/frontend && npm run build") chown(APP_DIR / "frontend") # Permessi lettura per nginx (www-data) run(f"chmod 755 /opt/argos /opt/argos/app /opt/argos/app/frontend {APP_DIR}/frontend/dist") run(f"chmod -R 755 {APP_DIR}/frontend/dist/") log("Frontend buildato") # 8. Dipendenze Python (re-run in caso di aggiornamenti) log("── Dipendenze Python ──") venv_pip = APP_DIR / "backend/venv/bin/pip" req = APP_DIR / "backend/requirements.txt" if req.exists(): run(f"{venv_pip} install -r {req} -q") log("Dipendenze Python installate") # 9. Utente admin log("── Creazione utente admin ──") create_admin_user(data) # 10. SSL log("── Configurazione SSL ──") domain = data.get("domain", "").strip() aliases = data.get("aliases", "").strip() ssl_mode = data.get("ssl_mode", "letsencrypt") all_names = (domain + " " + aliases).strip() if ssl_mode == "manual": crt_src = SETUP_DIR / "uploaded.crt" key_src = SETUP_DIR / "uploaded.key" if not crt_src.exists() or not key_src.exists(): raise RuntimeError("File SSL .crt o .key non trovati in /opt/argos/setup/") CERTS_DIR.mkdir(parents=True, exist_ok=True) shutil.copy(crt_src, CERTS_DIR / "fullchain.pem") shutil.copy(key_src, CERTS_DIR / "privkey.pem") os.chmod(CERTS_DIR / "privkey.pem", 0o600) ssl_crt = str(CERTS_DIR / "fullchain.pem") ssl_key = str(CERTS_DIR / "privkey.pem") log("Certificato SSL manuale copiato") else: _write_nginx_http(all_names) run("nginx -t && systemctl restart nginx") certbot_d = " ".join(f"-d {n}" for n in all_names.split()) email = data.get("admin_email", "admin@tecnotelsrl.com") run(f"certbot --nginx {certbot_d} --non-interactive --agree-tos -m {email}") ssl_crt = f"/etc/letsencrypt/live/{domain}/fullchain.pem" ssl_key = f"/etc/letsencrypt/live/{domain}/privkey.pem" log("Certificato Let's Encrypt ottenuto") # 11. Nginx finale log("── Nginx configurazione finale ──") _write_nginx_final(all_names, ssl_crt, ssl_key) run("nginx -t && systemctl restart nginx") log("Nginx configurato") # 12. Systemd services log("── Creazione e avvio servizi ──") _write_services() run("systemctl daemon-reload") for svc in ["argos-backend", "argos-sync", "argos-ops", "argos-analytics"]: run(f"systemctl enable --now {svc}") log(f"{svc} avviato") # 13. Copia licenza + chiudi web installer log("── Copia licenza in posizione finale ──") DATA_DIR.mkdir(parents=True, exist_ok=True) final_lic = DATA_DIR / "license.json" shutil.copy(lic_path, final_lic) os.chmod(final_lic, 0o600) chown(final_lic) log(f"Licenza copiata in {final_lic}") log("── Chiusura web installer ──") run("systemctl disable --now argos-setup", check=False) run("ufw delete allow 8888/tcp", check=False) log("Porta 8888 chiusa — web installer disabilitato") log("=== INSTALLAZIONE COMPLETATA ===") install_done = True # Spegni il processo dopo 8s (tempo per inviare la risposta al browser) def shutdown(): import time time.sleep(15) os.kill(os.getpid(), signal.SIGTERM) threading.Thread(target=shutdown, daemon=True).start() except Exception as e: log(f"ERRORE: {e}") install_log.append(f"__ERROR__: {e}") install_error = True def _write_nginx_http(all_names): conf = f"""server {{ listen 80; server_name {all_names}; location /.well-known/acme-challenge/ {{ root /var/www/html; }} location / {{ return 301 https://$host$request_uri; }} }} """ _write_nginx_conf(conf) def _write_nginx_final(all_names, ssl_crt, ssl_key): conf = f"""limit_req_zone $binary_remote_addr zone=argos:10m rate=20r/s; server {{ listen 80; server_name {all_names}; location / {{ return 301 https://$host$request_uri; }} location /.well-known/acme-challenge/ {{ root /var/www/html; }} }} server {{ listen 443 ssl http2; server_name {all_names}; ssl_certificate {ssl_crt}; ssl_certificate_key {ssl_key}; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers HIGH:!aNULL:!MD5; add_header X-Frame-Options SAMEORIGIN; add_header X-Content-Type-Options nosniff; add_header Strict-Transport-Security "max-age=31536000" always; client_max_body_size 50m; location /api/analytics/ {{ limit_req zone=argos burst=40 nodelay; proxy_pass http://127.0.0.1:8083; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto https; proxy_read_timeout 120s; }} location /feeds/ {{ alias {FEEDS_DIR}/; autoindex off; default_type "text/plain; charset=utf-8"; access_log {LOGS_DIR}/nginx-feeds.log; }} location /api/ {{ limit_req zone=argos burst=40 nodelay; proxy_pass http://127.0.0.1:8080; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto https; proxy_read_timeout 120s; }} location / {{ root {APP_DIR}/frontend/dist; try_files $uri $uri/ /index.html; expires 1h; }} access_log {LOGS_DIR}/nginx-access.log; error_log {LOGS_DIR}/nginx-error.log; }} """ _write_nginx_conf(conf) def _write_nginx_conf(conf): with open("/etc/nginx/sites-available/argos", "w") as f: f.write(conf) p = Path("/etc/nginx/sites-enabled/argos") if not p.exists(): p.symlink_to("/etc/nginx/sites-available/argos") # Rimuovi default e setup temporaneo for f in ["/etc/nginx/sites-enabled/default", "/etc/nginx/sites-enabled/argos-setup"]: if Path(f).exists(): Path(f).unlink() def _write_services(): venv_py = str(APP_DIR / "backend/venv/bin/python3") backend_dir = str(APP_DIR / "backend") services = { "backend": ("services/argos_backend.py", "Backend API"), "sync": ("services/argos_sync.py", "Sync Daemon"), "ops": ("services/argos_ops.py", "Ops Daemon"), "analytics": ("services/argos_analytics.py", "Analytics Daemon"), } for svc, (script, desc) in services.items(): unit = f"""[Unit] Description=ARGOS {desc} After=network.target [Service] Type=simple User={APP_USER} Group={APP_USER} WorkingDirectory={backend_dir} Environment=PYTHONPATH={backend_dir} Environment=CONFIG_FILE={CONFIG_DIR}/argos.json Environment=INTEGRATIONS_FILE={CONFIG_DIR}/integrations.json Environment=DATA_DIR={DATA_DIR} Environment=FEEDS_DIR={FEEDS_DIR} Environment=LOGS_DIR={LOGS_DIR} ExecStart={venv_py} {backend_dir}/{script} Restart=always RestartSec=5 StandardOutput=journal StandardError=journal SyslogIdentifier=argos-{svc} [Install] WantedBy=multi-user.target """ with open(f"/etc/systemd/system/argos-{svc}.service", "w") as f: f.write(unit) class SetupHandler(BaseHTTPRequestHandler): def log_message(self, *args): pass def do_GET(self): path = urlparse(self.path).path if path in ("/", "/setup"): html_path = Path(__file__).parent / "setup.html" if not html_path.exists(): self.send_response(404); self.end_headers(); return html = html_path.read_bytes() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", len(html)) self.end_headers() self.wfile.write(html) elif path == "/api/status": self._json({"done": install_done, "error": install_error, "log": install_log[-60:]}) elif path == "/api/machine-id": self._json({"machine_id": get_machine_id()}) else: self.send_response(404); self.end_headers() def do_POST(self): path = urlparse(self.path).path length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(length) if path == "/api/install": try: data = json.loads(body) threading.Thread(target=install, args=(data,), daemon=True).start() self._json({"ok": True}) except Exception as e: self._json({"ok": False, "error": str(e)}, 400) elif path == "/api/upload/cert": SETUP_DIR.mkdir(parents=True, exist_ok=True) (SETUP_DIR / "uploaded.crt").write_bytes(body) self._json({"ok": True}) elif path == "/api/upload/key": SETUP_DIR.mkdir(parents=True, exist_ok=True) (SETUP_DIR / "uploaded.key").write_bytes(body) self._json({"ok": True}) elif path == "/api/upload/logo": SETUP_DIR.mkdir(parents=True, exist_ok=True) (SETUP_DIR / "logo_cliente.png").write_bytes(body) self._json({"ok": True}) elif path == "/api/license/upload": ok, lic, err = verify_license(body) if not ok: self._json({"ok": False, "error": err}, 400) return # Salva licenza valida nel SETUP_DIR per essere usata durante install() SETUP_DIR.mkdir(parents=True, exist_ok=True) lic_path = SETUP_DIR / "license.json" lic_path.write_bytes(body) os.chmod(lic_path, 0o600) # Risposta: solo dati sommari (non rinvia token in plain) self._json({ "ok": True, "summary": { "customer": lic.get("customer", ""), "tier": lic.get("tier", ""), "issued_to": lic.get("issued_to", ""), "issued_at": lic.get("issued_at", ""), "expires_at": lic.get("expires_at", ""), "has_gitea": bool(lic.get("gitea_token")), } }) else: self.send_response(404); self.end_headers() def _json(self, data, code=200): body = json.dumps(data).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", len(body)) self.end_headers() self.wfile.write(body) if __name__ == "__main__": print(f"\n{'='*55}") print(f" ARGOS SOC — Web Installer") print(f" Tecnotel Servizi SRL") print(f" Apri: http://:{PORT}") print(f"{'='*55}\n") HTTPServer(("0.0.0.0", PORT), SetupHandler).serve_forever()