import subprocess import base64 from fastapi import FastAPI, Form, HTTPException, Query, Header, Depends, Request from fastapi.responses import JSONResponse from pathlib import Path import uvicorn from dotenv import load_dotenv import os # Rate limit support from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded # Load .env load_dotenv() API_KEY_ENV = os.getenv("API_KEY") WHITELIST = os.getenv("WHITELIST", "").split(",") RATE_LIMIT = os.getenv("RATE_LIMIT", "5/minute") # e.g. 5 requests/minute WG_DIR = Path("/etc/wireguard") SCRIPT_PATH = Path("/etc/wireguard/wg_config.sh") # Setup FastAPI and rate limiter limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # --- Dependencies --- def verify_api_key(x_api_key: str = Header(...)): if x_api_key != API_KEY_ENV: raise HTTPException(status_code=403, detail="Invalid API key") def verify_ip(request: Request): client_ip = request.client.host if client_ip not in WHITELIST: raise HTTPException(status_code=403, detail=f"IP {client_ip} not allowed") def is_valid_name(name): return name.isalnum() # --- Endpoints --- @app.post("/vpn") @limiter.limit(RATE_LIMIT) async def create_vpn_client( request: Request, new: str = Form(...), _: None = Depends(verify_api_key), __: None = Depends(verify_ip) ): client_name = new.strip() if not is_valid_name(client_name): raise HTTPException(status_code=400, detail="Invalid client name") client_dir = WG_DIR / client_name conf_file = client_dir / f"{client_name}.conf" png_file = client_dir / f"{client_name}.png" if client_dir.exists(): return JSONResponse(status_code=409, content={"error": "Client already exists"}) try: subprocess.run( ["bash", str(SCRIPT_PATH), "add", client_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) if not conf_file.exists() or not png_file.exists(): raise FileNotFoundError("Missing config or QR code after creation") config_data = conf_file.read_text() with open(png_file, "rb") as f: qr_base64 = base64.b64encode(f.read()).decode("utf-8") return { "client": client_name, "config": config_data, "qr_base64": qr_base64 } except subprocess.CalledProcessError as e: raise HTTPException(status_code=500, detail=f"Script error: {e.stderr.decode()}") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/vpn") @limiter.limit(RATE_LIMIT) async def remove_vpn_client( request: Request, remove: str = Query(...), _: None = Depends(verify_api_key), __: None = Depends(verify_ip) ): client_name = remove.strip() if not is_valid_name(client_name): raise HTTPException(status_code=400, detail="Invalid client name") client_dir = WG_DIR / client_name if not client_dir.exists(): raise HTTPException(status_code=404, detail="Client not found") try: subprocess.run( ["bash", str(SCRIPT_PATH), "remove", client_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) return {"message": f"Client '{client_name}' removed."} except subprocess.CalledProcessError as e: raise HTTPException(status_code=500, detail=f"Script error: {e.stderr.decode()}") @app.get("/vpn/list") @limiter.limit(RATE_LIMIT) async def list_vpn_clients( request: Request, _: None = Depends(verify_api_key), __: None = Depends(verify_ip) ): clients = [] for dir in WG_DIR.iterdir(): if dir.is_dir(): conf = dir / f"{dir.name}.conf" if conf.exists(): clients.append(dir.name) return {"clients": clients} if __name__ == "__main__": uvicorn.run("wg_api:app", host="0.0.0.0", port=3000) #Run uvicorn wg_api:app --host 0.0.0.0 --port 3000 #File .env #API_KEY=hiugvnifu35345t3jnjnfsufi534j34jiufjf5434sf #WHITELIST=127.0.0.1,192.168.1.10 #RATE_LIMIT=5/minute