update and curl sample

main
Kar 2025-04-30 11:40:40 +00:00
parent 3f311f59a3
commit 3c4d5badbd
1 changed files with 96 additions and 57 deletions

View File

@ -1,90 +1,129 @@
import os
import re
import subprocess
import base64
from fastapi import FastAPI, Form, HTTPException, Header, Depends, Request
from typing import List
from fastapi import FastAPI, Form, HTTPException, Header, Request, Depends
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
from slowapi import Limiter
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.middleware.cors import CORSMiddleware
import re
# Load environment variables
# Load .env variables
load_dotenv()
API_KEY_ENV = os.getenv("API_KEY")
WHITELIST_IPS = os.getenv("WHITELIST_IPS", "127.0.0.1").split(",")
API_KEY = os.getenv("API_KEY")
WHITELISTED_IPS = os.getenv("WHITELISTED_IPS", "").split(",")
# Constants
VESTA_BIN = "/usr/local/hestia/bin"
DEFAULT_RATE_LIMIT = "10/minute"
# FastAPI setup
app = FastAPI()
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(status_code=429, content={"detail": "Rate limit exceeded"})
# ---------------------- Helpers ----------------------
# CORS if needed
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Middleware to check API key and IP whitelist
async def verify_request(request: Request, x_api_key: str = Header(...)):
client_ip = request.client.host
if client_ip not in WHITELIST_IPS:
raise HTTPException(status_code=403, detail="IP not allowed")
if x_api_key != API_KEY_ENV:
def check_api_key(x_api_key: str = Header(...)):
if x_api_key != API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")
# Validate usernames/domains
def is_valid_name(value):
return re.match(r'^[a-zA-Z0-9_-]+$', value) is not None
def check_whitelisted_ip(request: Request):
ip = request.client.host
if WHITELISTED_IPS and ip not in WHITELISTED_IPS:
raise HTTPException(status_code=403, detail="IP not allowed")
def valid_username(name: str):
return re.match(r"^[a-zA-Z0-9_]+$", name)
def run_cmd(cmd: List[str]):
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
raise HTTPException(status_code=500, detail=f"Command error: {e.stderr.strip()}")
# ---------------------- Endpoints ----------------------
@app.post("/vesta/add-user")
@limiter.limit("10/minute")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def add_user(
username: str = Form(...),
request: Request,
user: str = Form(...),
password: str = Form(...),
email: str = Form(...),
package: str = Form("default"),
auth: None = Depends(verify_request)
auth: None = Depends(check_api_key),
_: None = Depends(check_whitelisted_ip)
):
if not all(map(is_valid_name, [username, package])):
raise HTTPException(status_code=400, detail="Invalid characters in username or package")
if not valid_username(user):
raise HTTPException(status_code=400, detail="Invalid username")
cmd = [f"{VESTA_BIN}/v-add-user", user, password, email, package]
output = run_cmd(cmd)
return {"message": "User created", "output": output}
cmd = ["/usr/local/hestia/bin/v-add-user", username, password, email, package]
try:
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return {"message": result.stdout.decode().strip()}
except subprocess.CalledProcessError as e:
raise HTTPException(status_code=500, detail=e.stderr.decode())
@app.post("/vesta/add-domain")
@limiter.limit("10/minute")
@limiter.limit(DEFAULT_RATE_LIMIT)
async def add_domain(
username: str = Form(...),
request: Request,
user: str = Form(...),
domain: str = Form(...),
ssl: bool = Form(True),
php: bool = Form(True),
auth: None = Depends(verify_request)
ssl: bool = Form(False),
php: bool = Form(False),
auth: None = Depends(check_api_key),
_: None = Depends(check_whitelisted_ip)
):
if not all(map(is_valid_name, [username])) or not re.match(r'^[a-zA-Z0-9.-]+$', domain):
raise HTTPException(status_code=400, detail="Invalid characters in username or domain")
if not valid_username(user):
raise HTTPException(status_code=400, detail="Invalid username")
try:
subprocess.run(["/usr/local/hestia/bin/v-add-web-domain", username, domain], check=True)
if ssl:
subprocess.run(["/usr/local/hestia/bin/v-add-web-domain-ssl", username, domain], check=True)
if php:
subprocess.run(["/usr/local/hestia/bin/v-add-web-php", username, domain], check=True)
return {"message": f"Domain {domain} added for user {username}"}
except subprocess.CalledProcessError as e:
raise HTTPException(status_code=500, detail=e.stderr.decode())
cmd = [f"{VESTA_BIN}/v-add-web-domain", user, domain]
output = run_cmd(cmd)
ssl_out, php_out = None, None
if ssl:
ssl_cmd = [f"{VESTA_BIN}/v-add-web-domain-ssl", user, domain]
ssl_out = run_cmd(ssl_cmd)
if php:
php_cmd = [f"{VESTA_BIN}/v-add-web-php", user, domain]
php_out = run_cmd(php_cmd)
return {
"message": "Domain added",
"domain_output": output,
"ssl_output": ssl_out,
"php_output": php_out
}
# ---------------------- Launch ----------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("HestiaCP_API_Server:app", host="0.0.0.0", port=4000)
uvicorn.run("hestia_runner:app", host="0.0.0.0", port=3001)
# .env
#API_KEY=your-secret-key
#WHITELISTED_IPS=127.0.0.1,192.168.1.10
# cd /root/hestia_runner/ && uvicorn hestia_runner:app --host 0.0.0.0 --port 1002
#curl -X POST http://your-server-ip:3001/vesta/add-user \
# -H "x-api-key: your_api_key_here" \
# -F "user=testuser" \
# -F "password=securepass123" \
# -F "email=test@example.com" \
# -F "package=default"
#curl -X POST http://your-server-ip:3001/vesta/add-domain \
# -H "x-api-key: your_api_key_here" \
# -F "user=testuser" \
# -F "domain=example.com" \
# -F "ssl=true" \
# -F "php=true"