import os import subprocess import base64 from fastapi import FastAPI, Form, HTTPException, Header, Depends, Request from fastapi.responses import JSONResponse from dotenv import load_dotenv from slowapi import Limiter from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from fastapi.middleware.cors import CORSMiddleware import re # Load environment variables load_dotenv() API_KEY_ENV = os.getenv("API_KEY") WHITELIST_IPS = os.getenv("WHITELIST_IPS", "127.0.0.1").split(",") app = FastAPI() limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter @app.exception_handler(RateLimitExceeded) async def rate_limit_handler(request: Request, exc: RateLimitExceeded): return JSONResponse(status_code=429, content={"detail": "Rate limit exceeded"}) # CORS if needed app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Middleware to check API key and IP whitelist async def verify_request(request: Request, x_api_key: str = Header(...)): client_ip = request.client.host if client_ip not in WHITELIST_IPS: raise HTTPException(status_code=403, detail="IP not allowed") if x_api_key != API_KEY_ENV: raise HTTPException(status_code=403, detail="Invalid API key") # Validate usernames/domains def is_valid_name(value): return re.match(r'^[a-zA-Z0-9_-]+$', value) is not None @app.post("/vesta/add-user") @limiter.limit("10/minute") async def add_user( username: str = Form(...), password: str = Form(...), email: str = Form(...), package: str = Form("default"), auth: None = Depends(verify_request) ): if not all(map(is_valid_name, [username, package])): raise HTTPException(status_code=400, detail="Invalid characters in username or package") cmd = ["/usr/local/hestia/bin/v-add-user", username, password, email, package] try: result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return {"message": result.stdout.decode().strip()} except subprocess.CalledProcessError as e: raise HTTPException(status_code=500, detail=e.stderr.decode()) @app.post("/vesta/add-domain") @limiter.limit("10/minute") async def add_domain( username: str = Form(...), domain: str = Form(...), ssl: bool = Form(True), php: bool = Form(True), auth: None = Depends(verify_request) ): if not all(map(is_valid_name, [username])) or not re.match(r'^[a-zA-Z0-9.-]+$', domain): raise HTTPException(status_code=400, detail="Invalid characters in username or domain") try: subprocess.run(["/usr/local/hestia/bin/v-add-web-domain", username, domain], check=True) if ssl: subprocess.run(["/usr/local/hestia/bin/v-add-web-domain-ssl", username, domain], check=True) if php: subprocess.run(["/usr/local/hestia/bin/v-add-web-php", username, domain], check=True) return {"message": f"Domain {domain} added for user {username}"} except subprocess.CalledProcessError as e: raise HTTPException(status_code=500, detail=e.stderr.decode()) if __name__ == "__main__": import uvicorn uvicorn.run("HestiaCP_API_Server:app", host="0.0.0.0", port=4000)