argos-setup/setup_server.py

484 lines
16 KiB
Python

#!/usr/bin/env python3
"""
ARGOS SOC — Web Installer Server
Tecnotel Servizi SRL
"""
import json
import os
import subprocess
import secrets
import shutil
import hashlib
import threading
import signal
from datetime import datetime, timezone
from pathlib import Path
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse
APP_DIR = Path("/opt/argos/app")
CONFIG_DIR = Path("/opt/argos/config")
DATA_DIR = Path("/opt/argos/data")
FEEDS_DIR = Path("/opt/argos/feeds")
LOGS_DIR = Path("/opt/argos/logs")
CERTS_DIR = Path("/opt/argos/certs")
SETUP_DIR = Path("/opt/argos/setup")
APP_USER = "argos"
PORT = 8888
install_log = []
install_done = False
install_error = False
def log(msg):
ts = datetime.now().strftime("%H:%M:%S")
line = f"[{ts}] {msg}"
install_log.append(line)
print(line, flush=True)
def run(cmd, check=True):
log(f"$ {cmd}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.stdout.strip(): log(result.stdout.strip())
if result.stderr.strip(): log(result.stderr.strip())
if check and result.returncode != 0:
raise RuntimeError(f"Comando fallito (exit {result.returncode}): {cmd}")
return result
def chown(path):
run(f"chown -R {APP_USER}:{APP_USER} {path}", check=False)
def generate_argos_json(data):
# Hostname primario dal wizard (campo "domain" nella UI = FQDN tipo soc.azienda.it)
hostname = data.get("domain", "").strip().lower()
# Alias dal wizard: stringa separata da spazi -> lista pulita
aliases_raw = data.get("aliases", "").strip()
aliases = [a.strip().lower() for a in aliases_raw.split() if a.strip()]
return {
"_version": "1.2",
"_installed": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"cliente": {
"name": data.get("cliente_name", ""),
"full_name": data.get("cliente_full", ""),
"domain": data.get("cliente_domain", ""),
"type": data.get("cliente_type", "enterprise"),
"ai_context": "",
"sharepoint_tenant": "",
"console_url": ""
},
"network": {
"hostname": hostname,
"aliases": aliases
},
"system": {
"secret_key": secrets.token_hex(32),
"internal_api_key": secrets.token_hex(24),
"tz": "Europe/Rome"
},
"opensearch": {
"url": data.get("os_url", ""),
"user": data.get("os_user", "admin"),
"password": data.get("os_pass", "")
},
"paths": {
"data_dir": str(DATA_DIR),
"feeds_dir": str(FEEDS_DIR),
"logs_dir": str(LOGS_DIR),
"config_dir": str(CONFIG_DIR),
"analytics_db": str(DATA_DIR / "analytics.db")
},
"ports": {
"backend": 8080,
"sync": 8081,
"ops": 8082,
"analytics": 8083
}
}
def generate_integrations_json():
example = APP_DIR / "config/integrations.json.example"
if example.exists():
with open(example) as f:
return json.load(f)
# fallback minimale
return {
"_version": "1.0",
"endpoint": {}, "firewall": {}, "identity": {},
"notifications": {}, "threat_intel": {},
"pdf": {
"org_name": "", "author": "Tecnotel Servizi SRL",
"motto": "Controllo totale. Difesa continua.",
"classification": "RISERVATO - USO INTERNO",
"app_logo": str(APP_DIR / "frontend/dist/logo_argos_bianco.png"),
"client_logo": str(CONFIG_DIR / "assets" / "logo_cliente.png")
}
}
def create_admin_user(data):
username = data.get("admin_username", "admin").strip()
email = data.get("admin_email_user", "").strip()
password = data.get("admin_password", "").strip()
if not username or not password:
log("WARN: credenziali admin mancanti — skip creazione utente")
return
import os as _os
# Hash compatibile con auth.py: salt:sha256(salt+pw+"argos-2026")
salt = _os.urandom(16).hex()
pw_hash = f"{salt}:{hashlib.sha256((salt + password + 'argos-2026').encode()).hexdigest()}"
users_file = DATA_DIR / "argos_users.json"
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Leggi utenti esistenti o crea struttura vuota
try:
with open(users_file) as f:
users_data = json.load(f)
except Exception:
users_data = {"users": {}}
users_data["users"][username.lower()] = {
"password": pw_hash,
"role": "admin",
"name": username,
"email": email,
"enabled": True
}
with open(users_file, "w") as f:
json.dump(users_data, f, indent=2)
os.chmod(users_file, 0o600)
chown(users_file)
log(f"Utente admin '{username}' creato in argos_users.json")
def install(data):
global install_done, install_error
try:
log("=== AVVIO INSTALLAZIONE ARGOS SOC ===")
# 1. argos.json
log("── Generazione argos.json ──")
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
config = generate_argos_json(data)
cfg_path = CONFIG_DIR / "argos.json"
with open(cfg_path, "w") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
os.chmod(cfg_path, 0o600)
chown(CONFIG_DIR)
log("argos.json creato")
# 2. integrations.json
log("── Generazione integrations.json ──")
integ = generate_integrations_json()
# Aggiorna pdf con nome organizzazione
integ.setdefault("pdf", {})
integ["pdf"]["org_name"] = data.get("cliente_full") or data.get("cliente_name", "")
integ["pdf"]["client_logo"] = str(CONFIG_DIR / "assets" / "logo_cliente.png")
integ_path = CONFIG_DIR / "integrations.json"
with open(integ_path, "w") as f:
json.dump(integ, f, indent=2, ensure_ascii=False)
os.chmod(integ_path, 0o600)
chown(integ_path)
log("integrations.json creato")
# 3. modules.json
mods = APP_DIR / "config/modules.json.example"
if mods.exists():
shutil.copy(mods, CONFIG_DIR / "modules.json")
chown(CONFIG_DIR / "modules.json")
log("modules.json copiato")
# 4. Logo cliente
logo_src = SETUP_DIR / "logo_cliente.png"
if logo_src.exists():
DATA_DIR.mkdir(parents=True, exist_ok=True)
shutil.copy(logo_src, CONFIG_DIR / "assets" / "logo_cliente.png")
chown(CONFIG_DIR / "assets" / "logo_cliente.png")
log("Logo cliente copiato")
# 5. Build frontend
log("── Build frontend React ──")
run(f"cd {APP_DIR}/frontend && npm install --silent")
run(f"cd {APP_DIR}/frontend && npm run build")
chown(APP_DIR / "frontend")
# Permessi lettura per nginx (www-data)
run(f"chmod 755 /opt/argos /opt/argos/app /opt/argos/app/frontend {APP_DIR}/frontend/dist")
run(f"chmod -R 755 {APP_DIR}/frontend/dist/")
log("Frontend buildato")
# 6. Dipendenze Python
log("── Dipendenze Python ──")
venv_pip = APP_DIR / "backend/venv/bin/pip"
req = APP_DIR / "backend/requirements.txt"
if req.exists():
run(f"{venv_pip} install -r {req} -q")
log("Dipendenze Python installate")
# 7. Utente admin
log("── Creazione utente admin ──")
create_admin_user(data)
# 8. SSL
log("── Configurazione SSL ──")
domain = data.get("domain", "").strip()
aliases = data.get("aliases", "").strip()
ssl_mode = data.get("ssl_mode", "letsencrypt")
all_names = (domain + " " + aliases).strip()
if ssl_mode == "manual":
crt_src = SETUP_DIR / "uploaded.crt"
key_src = SETUP_DIR / "uploaded.key"
if not crt_src.exists() or not key_src.exists():
raise RuntimeError("File SSL .crt o .key non trovati in /opt/argos/setup/")
CERTS_DIR.mkdir(parents=True, exist_ok=True)
shutil.copy(crt_src, CERTS_DIR / "fullchain.pem")
shutil.copy(key_src, CERTS_DIR / "privkey.pem")
os.chmod(CERTS_DIR / "privkey.pem", 0o600)
ssl_crt = str(CERTS_DIR / "fullchain.pem")
ssl_key = str(CERTS_DIR / "privkey.pem")
log("Certificato SSL manuale copiato")
else:
_write_nginx_http(all_names)
run("nginx -t && systemctl restart nginx")
certbot_d = " ".join(f"-d {n}" for n in all_names.split())
email = data.get("admin_email", "admin@tecnotelsrl.com")
run(f"certbot --nginx {certbot_d} --non-interactive --agree-tos -m {email}")
ssl_crt = f"/etc/letsencrypt/live/{domain}/fullchain.pem"
ssl_key = f"/etc/letsencrypt/live/{domain}/privkey.pem"
log("Certificato Let's Encrypt ottenuto")
# 9. Nginx finale
log("── Nginx configurazione finale ──")
_write_nginx_final(all_names, ssl_crt, ssl_key)
run("nginx -t && systemctl restart nginx")
log("Nginx configurato")
# 10. Systemd services
log("── Creazione e avvio servizi ──")
_write_services()
run("systemctl daemon-reload")
for svc in ["argos-backend", "argos-sync", "argos-ops", "argos-analytics"]:
run(f"systemctl enable --now {svc}")
log(f"{svc} avviato")
# 11. Chiudi web installer
log("── Chiusura web installer ──")
run("systemctl disable --now argos-setup", check=False)
run("ufw delete allow 8888/tcp", check=False)
log("Porta 8888 chiusa — web installer disabilitato")
log("=== INSTALLAZIONE COMPLETATA ===")
install_done = True
# Spegni il processo dopo 8s (tempo per inviare la risposta al browser)
def shutdown():
import time
time.sleep(15)
os.kill(os.getpid(), signal.SIGTERM)
threading.Thread(target=shutdown, daemon=True).start()
except Exception as e:
log(f"ERRORE: {e}")
install_log.append(f"__ERROR__: {e}")
install_error = True
def _write_nginx_http(all_names):
conf = f"""server {{
listen 80;
server_name {all_names};
location /.well-known/acme-challenge/ {{ root /var/www/html; }}
location / {{ return 301 https://$host$request_uri; }}
}}
"""
_write_nginx_conf(conf)
def _write_nginx_final(all_names, ssl_crt, ssl_key):
conf = f"""limit_req_zone $binary_remote_addr zone=argos:10m rate=20r/s;
server {{
listen 80;
server_name {all_names};
location / {{ return 301 https://$host$request_uri; }}
location /.well-known/acme-challenge/ {{ root /var/www/html; }}
}}
server {{
listen 443 ssl http2;
server_name {all_names};
ssl_certificate {ssl_crt};
ssl_certificate_key {ssl_key};
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
add_header X-Frame-Options SAMEORIGIN;
add_header X-Content-Type-Options nosniff;
add_header Strict-Transport-Security "max-age=31536000" always;
client_max_body_size 50m;
location /api/analytics/ {{
limit_req zone=argos burst=40 nodelay;
proxy_pass http://127.0.0.1:8083;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto https;
proxy_read_timeout 120s;
}}
location /feeds/ {{
alias {FEEDS_DIR}/;
autoindex off;
default_type "text/plain; charset=utf-8";
access_log {LOGS_DIR}/nginx-feeds.log;
}}
location /api/ {{
limit_req zone=argos burst=40 nodelay;
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto https;
proxy_read_timeout 120s;
}}
location / {{
root {APP_DIR}/frontend/dist;
try_files $uri $uri/ /index.html;
expires 1h;
}}
access_log {LOGS_DIR}/nginx-access.log;
error_log {LOGS_DIR}/nginx-error.log;
}}
"""
_write_nginx_conf(conf)
def _write_nginx_conf(conf):
with open("/etc/nginx/sites-available/argos", "w") as f:
f.write(conf)
p = Path("/etc/nginx/sites-enabled/argos")
if not p.exists():
p.symlink_to("/etc/nginx/sites-available/argos")
# Rimuovi default e setup temporaneo
for f in ["/etc/nginx/sites-enabled/default", "/etc/nginx/sites-enabled/argos-setup"]:
if Path(f).exists(): Path(f).unlink()
def _write_services():
venv_py = str(APP_DIR / "backend/venv/bin/python3")
backend_dir = str(APP_DIR / "backend")
services = {
"backend": ("services/argos_backend.py", "Backend API"),
"sync": ("services/argos_sync.py", "Sync Daemon"),
"ops": ("services/argos_ops.py", "Ops Daemon"),
"analytics": ("services/argos_analytics.py", "Analytics Daemon"),
}
for svc, (script, desc) in services.items():
unit = f"""[Unit]
Description=ARGOS {desc}
After=network.target
[Service]
Type=simple
User={APP_USER}
Group={APP_USER}
WorkingDirectory={backend_dir}
Environment=PYTHONPATH={backend_dir}
Environment=CONFIG_FILE={CONFIG_DIR}/argos.json
Environment=INTEGRATIONS_FILE={CONFIG_DIR}/integrations.json
Environment=DATA_DIR={DATA_DIR}
Environment=FEEDS_DIR={FEEDS_DIR}
Environment=LOGS_DIR={LOGS_DIR}
ExecStart={venv_py} {backend_dir}/{script}
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
SyslogIdentifier=argos-{svc}
[Install]
WantedBy=multi-user.target
"""
with open(f"/etc/systemd/system/argos-{svc}.service", "w") as f:
f.write(unit)
class SetupHandler(BaseHTTPRequestHandler):
def log_message(self, *args): pass
def do_GET(self):
path = urlparse(self.path).path
if path in ("/", "/setup"):
html_path = Path(__file__).parent / "setup.html"
if not html_path.exists():
self.send_response(404); self.end_headers(); return
html = html_path.read_bytes()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", len(html))
self.end_headers()
self.wfile.write(html)
elif path == "/api/status":
self._json({"done": install_done, "error": install_error, "log": install_log[-60:]})
else:
self.send_response(404); self.end_headers()
def do_POST(self):
path = urlparse(self.path).path
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
if path == "/api/install":
try:
data = json.loads(body)
threading.Thread(target=install, args=(data,), daemon=True).start()
self._json({"ok": True})
except Exception as e:
self._json({"ok": False, "error": str(e)}, 400)
elif path == "/api/upload/cert":
SETUP_DIR.mkdir(parents=True, exist_ok=True)
(SETUP_DIR / "uploaded.crt").write_bytes(body)
self._json({"ok": True})
elif path == "/api/upload/key":
SETUP_DIR.mkdir(parents=True, exist_ok=True)
(SETUP_DIR / "uploaded.key").write_bytes(body)
self._json({"ok": True})
elif path == "/api/upload/logo":
SETUP_DIR.mkdir(parents=True, exist_ok=True)
(SETUP_DIR / "logo_cliente.png").write_bytes(body)
self._json({"ok": True})
else:
self.send_response(404); self.end_headers()
def _json(self, data, code=200):
body = json.dumps(data).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(body))
self.end_headers()
self.wfile.write(body)
if __name__ == "__main__":
print(f"\n{'='*55}")
print(f" ARGOS SOC — Web Installer")
print(f" Tecnotel Servizi SRL")
print(f" Apri: http://<IP_SERVER>:{PORT}")
print(f"{'='*55}\n")
HTTPServer(("0.0.0.0", PORT), SetupHandler).serve_forever()