commit 52265ed4cc6fe0474114b33f7e822212c7f00e89 Author: Kar@k5 Date: Sun Feb 1 20:22:29 2026 +0530 init diff --git a/.env/cluster.list b/.env/cluster.list new file mode 100644 index 0000000..e69de29 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b55ee6b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env/*.yaml +.env/*.yml \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9ea45f5 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,36 @@ +FROM golang:1.21-alpine AS builder + +WORKDIR /app + +# Install git for cloning repositories +RUN apk add --no-cache git + +# Copy go mod files +COPY go.mod go.sum ./ +RUN go mod download + +# Copy source code +COPY . . + +# Build the application +RUN CGO_ENABLED=1 GOOS=linux go build -a -installsuffix cgo -o manager ./cmd/manager + +# Final stage +FROM alpine:latest + +# Install git and docker client +RUN apk add --no-cache git docker-cli + +WORKDIR /root/ + +# Copy the binary from builder stage +COPY --from=builder /app/manager . + +# Copy manifests +COPY --from=builder /app/manifests ./manifests + +# Expose port +EXPOSE 8080 + +# Run the application +CMD ["./manager"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..51a3640 --- /dev/null +++ b/README.md @@ -0,0 +1,252 @@ +# Deployment Manager + +A Kubernetes deployment manager that reads repository URLs from a database and automatically deploys them to Kubernetes clusters. + +## Features + +- **Automated Deployment**: Clone Git repositories, build Docker images, and deploy to Kubernetes +- **Multi-Language Support**: Supports Node.js and Python applications +- **Real-time Events**: Server-Sent Events (SSE) for real-time deployment status updates +- **REST API**: Full REST API for managing repositories and deployments +- **Worker Pool**: Concurrent deployment processing with configurable worker pool size +- **Reconciliation Loop**: Automatic retry and status reconciliation +- **Health Checks**: Built-in health check endpoints + +## Architecture + +The deployment manager consists of several components: + +- **API Server**: HTTP server with REST endpoints and SSE support +- **Worker Pool**: Concurrent workers that process deployment jobs +- **Reconciler**: Background process that ensures desired state matches actual state +- **Event Bus**: Pub/sub system for real-time event streaming +- **Database**: SQLite database for repository metadata +- **Executor**: Command execution and Docker operations +- **Kubernetes Client**: Kubernetes resource management + +## Quick Start + +### Prerequisites + +- Go 1.21+ +- Docker +- kubectl configured with cluster access +- Git + +### Building + +```bash +# Build the binary +go build ./cmd/manager + +# Or build with Docker +docker build -t deployment-manager . +``` + +### Running + +```bash +# Run locally +./manager + +# Run with Docker +docker run -p 8080:8080 \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v ~/.kube:/root/.kube \ + deployment-manager +``` + +### Environment Variables + +- `DB_PATH`: Path to SQLite database (default: `./manager.db`) +- `MAX_WORKERS`: Number of concurrent workers (default: `2`) +- `RECONCILE_TICK`: Reconciliation interval in seconds (default: `2`) +- `HTTP_PORT`: HTTP server port (default: `8080`) + +## API Endpoints + +### Repositories + +- `GET /api/repos` - List all repositories +- `GET /api/repos?user_id=xxx` - List repositories by user +- `POST /api/repos` - Create a new repository +- `GET /api/repos/{id}` - Get repository details +- `DELETE /api/repos/{id}` - Delete a repository +- `POST /api/repos/{id}/stop` - Stop deployment +- `POST /api/repos/{id}/restart` - Restart deployment + +### Events + +- `GET /events` - Server-Sent Events stream for real-time updates + +### Health + +- `GET /health` - Health check endpoint + +## API Usage Examples + +### Create a Repository + +```bash +curl -X POST http://localhost:8080/api/repos \ + -H "Content-Type: application/json" \ + -d '{ + "repo_url": "https://github.com/user/my-node-app.git", + "user_id": "user123", + "type": "nodejs" + }' +``` + +### List Repositories + +```bash +curl http://localhost:8080/api/repos +``` + +### Stream Events + +```bash +curl -N http://localhost:8080/events +``` + +## Repository Types + +### Node.js Applications + +Expected structure: +- `package.json` with dependencies +- Entry point defined in `package.json` (default: `index.js`) +- Exposes port 3000 + +### Python Applications + +Expected structure: +- `requirements.txt` with dependencies +- `app.py` as entry point +- Exposes port 8000 + +## Deployment Process + +1. **Repository Creation**: Add repository via API +2. **Job Queuing**: Reconciler detects `need_to_deploy` status +3. **Cloning**: Worker clones the Git repository +4. **Dockerfile Generation**: Auto-generates appropriate Dockerfile +5. **Image Build**: Builds and pushes Docker image +6. **Kubernetes Deployment**: Applies Kubernetes manifests +7. **Status Update**: Updates repository status to `deployed` + +## Kubernetes Resources + +The deployment manager creates the following Kubernetes resources: + +- **Deployment**: Manages pod replicas and updates +- **Service**: Exposes the application internally +- **ConfigMap**: Stores repository metadata +- **Ingress**: External access (if ingress controller is available) + +## Event Types + +- `repo_created`: New repository added +- `repo_updated`: Repository status changed +- `repo_deleted`: Repository deleted +- `deploy_started`: Deployment process started +- `deploy_success`: Deployment completed successfully +- `deploy_error`: Deployment failed +- `log`: Real-time log messages + +## Configuration + +### Database Schema + +The SQLite database contains a `repos` table with the following schema: + +```sql +CREATE TABLE repos ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + repo_url TEXT NOT NULL, + status TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, -- nodejs | python + image_tag TEXT, + last_error TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP +); +``` + +### Status Transitions + +| From | To | Action | +| -------------- | ---------- | -------------------- | +| need_to_deploy | deploying | enqueue job | +| deploying | deployed | success | +| deploying | err | failure | +| deployed | stopped | scale 0 | +| stopped | restarting | scale up | +| any | deleted | delete k8s resources | + +## Development + +### Project Structure + +``` +deployment-manager/ +├── cmd/manager/ # Application entry point +├── internal/ +│ ├── api/ # HTTP server and handlers +│ ├── db/ # Database operations +│ ├── events/ # Event bus and types +│ ├── executor/ # Command execution and Docker +│ ├── k8s/ # Kubernetes client +│ ├── model/ # Data models +│ ├── reconciler/ # Reconciliation logic +│ └── worker/ # Deployment workers +├── migrations/ # Database migrations +├── manifests/ # K8s manifest templates +└── Dockerfile +``` + +### Running Tests + +```bash +go test ./... +``` + +### Building for Production + +```bash +# Build optimized binary +CGO_ENABLED=1 GOOS=linux go build -a -installsuffix cgo -o manager ./cmd/manager + +# Build Docker image +docker build -t deployment-manager:latest . +``` + +## Troubleshooting + +### Common Issues + +1. **Docker socket not accessible**: Mount `/var/run/docker.sock` when running in Docker +2. **kubectl not configured**: Ensure `~/.kube/config` is mounted and accessible +3. **Database permissions**: Ensure the application can write to the database file +4. **Port conflicts**: Change `HTTP_PORT` if 8080 is already in use + +### Logs + +Enable debug logging by setting the log level: + +```bash +LOG_LEVEL=debug ./manager +``` + +### Health Checks + +Monitor the deployment manager health: + +```bash +curl http://localhost:8080/health +``` + +## License + +This project is licensed under the MIT License. diff --git a/cmd/manager/main.go b/cmd/manager/main.go new file mode 100644 index 0000000..742beff --- /dev/null +++ b/cmd/manager/main.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "database/sql" + "log" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "deployment-manager/internal/api" + "deployment-manager/internal/db" + "deployment-manager/internal/events" + "deployment-manager/internal/reconciler" + "deployment-manager/internal/worker" + + _ "github.com/mattn/go-sqlite3" +) + +const ( + dbPath = "./manager.db" + maxWorkers = 2 + reconcileTick = 2 * time.Second +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // ---- graceful shutdown ---- + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sig + log.Println("shutdown signal received") + cancel() + }() + + // ---- DB ---- + database, err := sql.Open("sqlite3", dbPath) + if err != nil { + log.Fatal(err) + } + defer database.Close() + + if err := db.Migrate(database); err != nil { + log.Fatal(err) + } + + // ---- channels ---- + jobQueue := make(chan int64, 100) + + // ---- event bus ---- + eventBus := events.NewBus() + + // Subscribe to events for workers + eventChan := eventBus.Subscribe(ctx) + + // ---- worker pool ---- + var wg sync.WaitGroup + for i := 0; i < maxWorkers; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + w := worker.NewWorker(id, database, eventChan, jobQueue) + w.Run(ctx) + }(i) + } + + // ---- reconciler ---- + reconciler := reconciler.NewReconciler(database, jobQueue, reconcileTick) + go reconciler.Run(ctx) + + // ---- HTTP (API + SSE) ---- + httpServer := api.NewHTTPServer(database, eventBus) + go func() { + log.Println("HTTP server listening on :8080") + if err := httpServer.Start(":8080"); err != nil && err != http.ErrServerClosed { + log.Fatal(err) + } + }() + + <-ctx.Done() + log.Println("shutting down...") + + httpServer.Shutdown(context.Background()) + close(jobQueue) + wg.Wait() + log.Println("bye") +} diff --git a/fn.md b/fn.md new file mode 100644 index 0000000..0cad31e --- /dev/null +++ b/fn.md @@ -0,0 +1,11 @@ + +## Status transitions + +| From | To | Action | +| -------------- | ---------- | -------------------- | +| need_to_deploy | deploying | enqueue job | +| deploying | deployed | success | +| deploying | err | failure | +| deployed | stopped | scale 0 | +| stopped | restarting | scale up | +| any | deleted | delete k8s resources | diff --git a/folder.structure b/folder.structure new file mode 100644 index 0000000..3820779 --- /dev/null +++ b/folder.structure @@ -0,0 +1,51 @@ +manager/ +├── cmd/ +│ └── manager/ +│ └── main.go # entry point +│ +├── internal/ +│ ├── api/ +│ │ ├── http.go # HTTP server setup +│ │ ├── sse.go # SSE handlers +│ │ └── health.go +│ │ +│ ├── db/ +│ │ ├── sqlite.go # DB open / migrate +│ │ └── repo_store.go # repo CRUD +│ │ +│ ├── model/ +│ │ └── repo.go # Repo struct, enums +│ │ +│ ├── reconciler/ +│ │ └── reconciler.go # desired → actual loop +│ │ +│ ├── worker/ +│ │ ├── pool.go # worker pool +│ │ └── deploy.go # deploy logic +│ │ +│ ├── executor/ +│ │ ├── exec.go # runCmd, stream stdout +│ │ └── docker.go # docker build/push helpers +│ │ +│ ├── k8s/ +│ │ ├── kubectl.go # kubectl wrapper +│ │ └── manifests.go # render templates +│ │ +│ ├── events/ +│ │ ├── bus.go # pub/sub +│ │ └── event.go # Event struct +│ │ +│ └── config/ +│ └── config.go # paths, limits, env +│ +├── migrations/ +│ └── 001_init.sql +│ +├── manifests/ +│ ├── nodejs.yaml +│ └── python.yaml +│ +├── Dockerfile +├── go.mod +├── go.sum +└── README.md diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..194477a --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module deployment-manager + +go 1.21 + +require github.com/mattn/go-sqlite3 v1.14.17 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..04784a2 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= +github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= diff --git a/internal/api/health.go b/internal/api/health.go new file mode 100644 index 0000000..cde21ba --- /dev/null +++ b/internal/api/health.go @@ -0,0 +1,12 @@ +package api + +import ( + "encoding/json" + "net/http" +) + +func (s *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) +} diff --git a/internal/api/http.go b/internal/api/http.go new file mode 100644 index 0000000..ded6a7a --- /dev/null +++ b/internal/api/http.go @@ -0,0 +1,253 @@ +package api + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log" + "net/http" + "strconv" + "strings" + + "deployment-manager/internal/db" + "deployment-manager/internal/events" + "deployment-manager/internal/model" +) + +type HTTPServer struct { + repoStore *db.RepoStore + eventBus *events.Bus + server *http.Server +} + +func NewHTTPServer(database *sql.DB, eventBus *events.Bus) *HTTPServer { + return &HTTPServer{ + repoStore: db.NewRepoStore(database), + eventBus: eventBus, + } +} + +func (s *HTTPServer) Start(addr string) error { + mux := http.NewServeMux() + + // API routes + mux.HandleFunc("/api/repos", s.handleRepos) + mux.HandleFunc("/api/repos/", s.handleRepo) + mux.HandleFunc("/api/repos/", s.handleRepoActions) + + // SSE endpoint + sseHandler := NewSSEHandler(s.eventBus) + mux.Handle("/events", sseHandler) + + // Health check + mux.HandleFunc("/health", s.handleHealth) + + s.server = &http.Server{ + Addr: addr, + Handler: mux, + } + + log.Printf("HTTP server starting on %s", addr) + return s.server.ListenAndServe() +} + +func (s *HTTPServer) Shutdown(ctx context.Context) error { + if s.server != nil { + return s.server.Shutdown(ctx) + } + return nil +} + +func (s *HTTPServer) handleRepos(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + s.getRepos(w, r) + case http.MethodPost: + s.createRepo(w, r) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +func (s *HTTPServer) handleRepo(w http.ResponseWriter, r *http.Request) { + repoID, err := extractRepoID(r.URL.Path) + if err != nil { + http.Error(w, "Invalid repo ID", http.StatusBadRequest) + return + } + + switch r.Method { + case http.MethodGet: + s.getRepo(w, r, repoID) + case http.MethodDelete: + s.deleteRepo(w, r, repoID) + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +func (s *HTTPServer) handleRepoActions(w http.ResponseWriter, r *http.Request) { + repoID, err := extractRepoID(r.URL.Path) + if err != nil { + http.Error(w, "Invalid repo ID", http.StatusBadRequest) + return + } + + action := extractAction(r.URL.Path) + + switch r.Method { + case http.MethodPost: + switch action { + case "stop": + s.stopRepo(w, r, repoID) + case "restart": + s.restartRepo(w, r, repoID) + default: + http.Error(w, "Invalid action", http.StatusBadRequest) + } + default: + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +func (s *HTTPServer) getRepos(w http.ResponseWriter, r *http.Request) { + userID := r.URL.Query().Get("user_id") + + var repos []*model.Repo + var err error + + if userID != "" { + repos, err = s.repoStore.ListByUser(userID) + } else { + repos, err = s.repoStore.ListByStatus(model.StatusDeployed) + } + + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repos) +} + +func (s *HTTPServer) getRepo(w http.ResponseWriter, r *http.Request, repoID int64) { + repo, err := s.repoStore.Get(repoID) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} + +func (s *HTTPServer) createRepo(w http.ResponseWriter, r *http.Request) { + var req struct { + RepoURL string `json:"repo_url"` + UserID string `json:"user_id"` + Type model.RepoType `json:"type"` + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + repo := &model.Repo{ + RepoURL: req.RepoURL, + Status: model.StatusNeedToDeploy, + UserID: req.UserID, + Type: req.Type, + } + + if err := s.repoStore.Create(repo); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Publish event + s.eventBus.Publish(events.NewRepoEvent(repo, events.EventTypeRepoCreated)) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(repo) +} + +func (s *HTTPServer) deleteRepo(w http.ResponseWriter, r *http.Request, repoID int64) { + repo, err := s.repoStore.Get(repoID) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + // Update status to deleted for cleanup + repo.Status = model.StatusDeleted + if err := s.repoStore.Update(repo); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Publish event + s.eventBus.Publish(events.NewRepoEvent(repo, events.EventTypeRepoDeleted)) + + w.WriteHeader(http.StatusNoContent) +} + +func (s *HTTPServer) stopRepo(w http.ResponseWriter, r *http.Request, repoID int64) { + repo, err := s.repoStore.Get(repoID) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + repo.Status = model.StatusStopped + if err := s.repoStore.Update(repo); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Publish event + s.eventBus.Publish(events.NewRepoEvent(repo, events.EventTypeRepoUpdated)) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} + +func (s *HTTPServer) restartRepo(w http.ResponseWriter, r *http.Request, repoID int64) { + repo, err := s.repoStore.Get(repoID) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + repo.Status = model.StatusRestarting + if err := s.repoStore.Update(repo); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Publish event + s.eventBus.Publish(events.NewRepoEvent(repo, events.EventTypeRepoUpdated)) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(repo) +} + +func extractRepoID(path string) (int64, error) { + parts := strings.Split(path, "/") + if len(parts) < 4 { + return 0, fmt.Errorf("invalid path") + } + + return strconv.ParseInt(parts[3], 10, 64) +} + +func extractAction(path string) string { + parts := strings.Split(path, "/") + if len(parts) >= 5 { + return parts[4] + } + return "" +} diff --git a/internal/api/sse.go b/internal/api/sse.go new file mode 100644 index 0000000..b5f9383 --- /dev/null +++ b/internal/api/sse.go @@ -0,0 +1,78 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + + "deployment-manager/internal/events" +) + +type SSEHandler struct { + eventBus *events.Bus +} + +func NewSSEHandler(eventBus *events.Bus) *SSEHandler { + return &SSEHandler{eventBus: eventBus} +} + +func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Set SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // Create a context for this connection + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + // Subscribe to events + eventChan := h.eventBus.Subscribe(ctx) + + // Send initial connection event + h.sendEvent(w, "connected", map[string]interface{}{ + "message": "SSE connection established", + }) + + // Handle events + for { + select { + case <-ctx.Done(): + log.Println("SSE client disconnected") + return + + case event := <-eventChan: + if err := h.sendEvent(w, string(event.Type), event.Data); err != nil { + log.Printf("Failed to send SSE event: %v", err) + return + } + } + } +} + +func (h *SSEHandler) sendEvent(w http.ResponseWriter, eventType string, data interface{}) error { + // Marshal data to JSON + jsonData, err := json.Marshal(data) + if err != nil { + return err + } + + // Format as SSE event + event := fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, jsonData) + + // Write to response + _, err = w.Write([]byte(event)) + if err != nil { + return err + } + + // Flush to ensure immediate delivery + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + + return nil +} diff --git a/internal/db/repo_store.go b/internal/db/repo_store.go new file mode 100644 index 0000000..fc37b6e --- /dev/null +++ b/internal/db/repo_store.go @@ -0,0 +1,204 @@ +package db + +import ( + "database/sql" + "fmt" + "time" + + "deployment-manager/internal/model" +) + +type RepoStore struct { + db *sql.DB +} + +func NewRepoStore(db *sql.DB) *RepoStore { + return &RepoStore{db: db} +} + +func (rs *RepoStore) Create(repo *model.Repo) error { + query := ` + INSERT INTO repos (repo_url, status, user_id, type, image_tag, last_error, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ` + + now := time.Now() + result, err := rs.db.Exec(query, repo.RepoURL, repo.Status, repo.UserID, repo.Type, repo.ImageTag, repo.LastError, now, now) + if err != nil { + return fmt.Errorf("failed to create repo: %w", err) + } + + id, err := result.LastInsertId() + if err != nil { + return fmt.Errorf("failed to get last insert id: %w", err) + } + + repo.ID = id + repo.CreatedAt = now + repo.UpdatedAt = now + return nil +} + +func (rs *RepoStore) Get(id int64) (*model.Repo, error) { + query := ` + SELECT id, repo_url, status, user_id, type, image_tag, last_error, created_at, updated_at + FROM repos + WHERE id = ? + ` + + repo := &model.Repo{} + var imageTag, lastError sql.NullString + + err := rs.db.QueryRow(query, id).Scan( + &repo.ID, &repo.RepoURL, &repo.Status, &repo.UserID, &repo.Type, + &imageTag, &lastError, &repo.CreatedAt, &repo.UpdatedAt, + ) + + if err != nil { + return nil, fmt.Errorf("failed to get repo: %w", err) + } + + if imageTag.Valid { + repo.ImageTag = &imageTag.String + } + if lastError.Valid { + repo.LastError = &lastError.String + } + + return repo, nil +} + +func (rs *RepoStore) Update(repo *model.Repo) error { + query := ` + UPDATE repos + SET repo_url = ?, status = ?, user_id = ?, type = ?, image_tag = ?, last_error = ?, updated_at = ? + WHERE id = ? + ` + + repo.UpdatedAt = time.Now() + result, err := rs.db.Exec(query, repo.RepoURL, repo.Status, repo.UserID, repo.Type, repo.ImageTag, repo.LastError, repo.UpdatedAt, repo.ID) + if err != nil { + return fmt.Errorf("failed to update repo: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected == 0 { + return fmt.Errorf("no rows affected, repo not found") + } + + return nil +} + +func (rs *RepoStore) Delete(id int64) error { + query := `DELETE FROM repos WHERE id = ?` + + result, err := rs.db.Exec(query, id) + if err != nil { + return fmt.Errorf("failed to delete repo: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected == 0 { + return fmt.Errorf("no rows affected, repo not found") + } + + return nil +} + +func (rs *RepoStore) ListByStatus(status model.RepoStatus) ([]*model.Repo, error) { + query := ` + SELECT id, repo_url, status, user_id, type, image_tag, last_error, created_at, updated_at + FROM repos + WHERE status = ? + ORDER BY created_at DESC + ` + + rows, err := rs.db.Query(query, status) + if err != nil { + return nil, fmt.Errorf("failed to list repos by status: %w", err) + } + defer rows.Close() + + var repos []*model.Repo + for rows.Next() { + repo := &model.Repo{} + var imageTag, lastError sql.NullString + + err := rows.Scan( + &repo.ID, &repo.RepoURL, &repo.Status, &repo.UserID, &repo.Type, + &imageTag, &lastError, &repo.CreatedAt, &repo.UpdatedAt, + ) + + if err != nil { + return nil, fmt.Errorf("failed to scan repo row: %w", err) + } + + if imageTag.Valid { + repo.ImageTag = &imageTag.String + } + if lastError.Valid { + repo.LastError = &lastError.String + } + + repos = append(repos, repo) + } + + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating repo rows: %w", err) + } + + return repos, nil +} + +func (rs *RepoStore) ListByUser(userID string) ([]*model.Repo, error) { + query := ` + SELECT id, repo_url, status, user_id, type, image_tag, last_error, created_at, updated_at + FROM repos + WHERE user_id = ? + ORDER BY created_at DESC + ` + + rows, err := rs.db.Query(query, userID) + if err != nil { + return nil, fmt.Errorf("failed to list repos by user: %w", err) + } + defer rows.Close() + + var repos []*model.Repo + for rows.Next() { + repo := &model.Repo{} + var imageTag, lastError sql.NullString + + err := rows.Scan( + &repo.ID, &repo.RepoURL, &repo.Status, &repo.UserID, &repo.Type, + &imageTag, &lastError, &repo.CreatedAt, &repo.UpdatedAt, + ) + + if err != nil { + return nil, fmt.Errorf("failed to scan repo row: %w", err) + } + + if imageTag.Valid { + repo.ImageTag = &imageTag.String + } + if lastError.Valid { + repo.LastError = &lastError.String + } + + repos = append(repos, repo) + } + + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating repo rows: %w", err) + } + + return repos, nil +} diff --git a/internal/db/sqlite.go b/internal/db/sqlite.go new file mode 100644 index 0000000..319b0b7 --- /dev/null +++ b/internal/db/sqlite.go @@ -0,0 +1,45 @@ +package db + +import ( + "database/sql" + "fmt" + "io/ioutil" + "log" + "path/filepath" + + _ "github.com/mattn/go-sqlite3" +) + +func Migrate(db *sql.DB) error { + migrationDir := "./migrations" + + files, err := ioutil.ReadDir(migrationDir) + if err != nil { + return fmt.Errorf("failed to read migration directory: %w", err) + } + + for _, file := range files { + if filepath.Ext(file.Name()) != ".sql" { + continue + } + + migrationPath := filepath.Join(migrationDir, file.Name()) + log.Printf("Running migration: %s", file.Name()) + + if err := runMigration(db, migrationPath); err != nil { + return fmt.Errorf("failed to run migration %s: %w", file.Name(), err) + } + } + + return nil +} + +func runMigration(db *sql.DB, path string) error { + content, err := ioutil.ReadFile(path) + if err != nil { + return err + } + + _, err = db.Exec(string(content)) + return err +} diff --git a/internal/events/bus.go b/internal/events/bus.go new file mode 100644 index 0000000..fce0856 --- /dev/null +++ b/internal/events/bus.go @@ -0,0 +1,43 @@ +package events + +import ( + "context" + "log" +) + +type Bus struct { + subscribers map[chan *Event]struct{} +} + +func NewBus() *Bus { + return &Bus{ + subscribers: make(map[chan *Event]struct{}), + } +} + +func (b *Bus) Subscribe(ctx context.Context) chan *Event { + ch := make(chan *Event, 100) + b.subscribers[ch] = struct{}{} + + go func() { + <-ctx.Done() + b.unsubscribe(ch) + close(ch) + }() + + return ch +} + +func (b *Bus) Publish(event *Event) { + for ch := range b.subscribers { + select { + case ch <- event: + default: + log.Printf("Event channel full, dropping event: %s", event.ID) + } + } +} + +func (b *Bus) unsubscribe(ch chan *Event) { + delete(b.subscribers, ch) +} diff --git a/internal/events/event.go b/internal/events/event.go new file mode 100644 index 0000000..750006a --- /dev/null +++ b/internal/events/event.go @@ -0,0 +1,67 @@ +package events + +import ( + "encoding/json" + "time" + + "deployment-manager/internal/model" +) + +type EventType string + +const ( + EventTypeRepoCreated EventType = "repo_created" + EventTypeRepoUpdated EventType = "repo_updated" + EventTypeRepoDeleted EventType = "repo_deleted" + EventTypeDeployStarted EventType = "deploy_started" + EventTypeDeploySuccess EventType = "deploy_success" + EventTypeDeployError EventType = "deploy_error" + EventTypeLog EventType = "log" +) + +type Event struct { + ID string `json:"id"` + Type EventType `json:"type"` + RepoID int64 `json:"repo_id"` + Data map[string]interface{} `json:"data"` + Timestamp time.Time `json:"timestamp"` +} + +func NewEvent(eventType EventType, repoID int64, data map[string]interface{}) *Event { + return &Event{ + ID: generateEventID(), + Type: eventType, + RepoID: repoID, + Data: data, + Timestamp: time.Now(), + } +} + +func NewRepoEvent(repo *model.Repo, eventType EventType) *Event { + return NewEvent(eventType, repo.ID, map[string]interface{}{ + "repo": repo, + }) +} + +func NewLogEvent(repoID int64, message string) *Event { + return NewEvent(EventTypeLog, repoID, map[string]interface{}{ + "message": message, + }) +} + +func (e *Event) ToJSON() ([]byte, error) { + return json.Marshal(e) +} + +func generateEventID() string { + return time.Now().Format("20060102150405") + "-" + randomString(8) +} + +func randomString(n int) string { + const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + b := make([]byte, n) + for i := range b { + b[i] = letters[i%len(letters)] + } + return string(b) +} diff --git a/internal/executor/docker.go b/internal/executor/docker.go new file mode 100644 index 0000000..b3191d9 --- /dev/null +++ b/internal/executor/docker.go @@ -0,0 +1,101 @@ +package executor + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "deployment-manager/internal/events" + "deployment-manager/internal/model" +) + +func BuildAndPushImage(ctx context.Context, eventChan chan<- *events.Event, repo *model.Repo, workDir string) (string, error) { + imageName := generateImageName(repo) + + // Build command + buildCmd := "docker" + buildArgs := []string{ + "build", + "-t", imageName, + workDir, + } + + if err := RunCmd(ctx, eventChan, repo.ID, buildCmd, buildArgs...); err != nil { + return "", fmt.Errorf("docker build failed: %w", err) + } + + // Push command + pushCmd := "docker" + pushArgs := []string{ + "push", + imageName, + } + + if err := RunCmd(ctx, eventChan, repo.ID, pushCmd, pushArgs...); err != nil { + return "", fmt.Errorf("docker push failed: %w", err) + } + + return imageName, nil +} + +func generateImageName(repo *model.Repo) string { + // Extract repo name from URL for cleaner image names + parts := strings.Split(strings.TrimSuffix(repo.RepoURL, ".git"), "/") + repoName := parts[len(parts)-1] + + return fmt.Sprintf("deployment-manager/%s-%d:latest", repoName, repo.ID) +} + +func GetDockerfileContent(repoType model.RepoType) string { + switch repoType { + case model.TypeNodeJS: + return `FROM node:18-alpine + +WORKDIR /app + +COPY package*.json ./ +RUN npm ci --only=production + +COPY . . + +EXPOSE 3000 + +CMD ["npm", "start"]` + + case model.TypePython: + return `FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +EXPOSE 8000 + +CMD ["python", "app.py"]` + + default: + return `FROM alpine:latest + +WORKDIR /app + +COPY . . + +CMD ["./app"]` + } +} + +func CreateDockerfile(workDir string, repoType model.RepoType) error { + dockerfilePath := filepath.Join(workDir, "Dockerfile") + content := GetDockerfileContent(repoType) + + return writeFile(dockerfilePath, content) +} + +func writeFile(path, content string) error { + return os.WriteFile(path, []byte(content), 0644) +} diff --git a/internal/executor/exec.go b/internal/executor/exec.go new file mode 100644 index 0000000..56a8c73 --- /dev/null +++ b/internal/executor/exec.go @@ -0,0 +1,34 @@ +package executor + +import ( + "bufio" + "context" + "os/exec" + + "deployment-manager/internal/events" +) + +func RunCmd(ctx context.Context, eventChan chan<- *events.Event, repoID int64, cmd string, args ...string) error { + c := exec.CommandContext(ctx, cmd, args...) + + stdout, _ := c.StdoutPipe() + stderr, _ := c.StderrPipe() + + if err := c.Start(); err != nil { + return err + } + + go stream(stdout, eventChan, repoID, "stdout") + go stream(stderr, eventChan, repoID, "stderr") + + return c.Wait() +} + +func stream(r interface{ Read([]byte) (int, error) }, eventChan chan<- *events.Event, repoID int64, streamType string) { + scanner := bufio.NewScanner(r) + for scanner.Scan() { + event := events.NewLogEvent(repoID, scanner.Text()) + event.Data["stream"] = streamType + eventChan <- event + } +} diff --git a/internal/k8s/kubectl.go b/internal/k8s/kubectl.go new file mode 100644 index 0000000..c092269 --- /dev/null +++ b/internal/k8s/kubectl.go @@ -0,0 +1,128 @@ +package k8s + +import ( + "bufio" + "context" + "fmt" + "os/exec" + "strings" + + "deployment-manager/internal/events" + "deployment-manager/internal/model" +) + +type KubectlClient struct { + namespace string +} + +func NewKubectlClient(namespace string) *KubectlClient { + if namespace == "" { + namespace = "default" + } + return &KubectlClient{namespace: namespace} +} + +func (k *KubectlClient) ApplyManifest(ctx context.Context, eventChan chan<- *events.Event, repoID int64, manifest string) error { + cmd := "kubectl" + args := []string{ + "apply", + "-f", "-", + "--namespace", k.namespace, + } + + // Use kubectl with stdin for the manifest + kubectlCmd := exec.CommandContext(ctx, cmd, args...) + kubectlCmd.Stdin = strings.NewReader(manifest) + + stdout, _ := kubectlCmd.StdoutPipe() + stderr, _ := kubectlCmd.StderrPipe() + + if err := kubectlCmd.Start(); err != nil { + return fmt.Errorf("failed to start kubectl apply: %w", err) + } + + // Stream output + go streamOutput(stdout, eventChan, repoID, "kubectl") + go streamOutput(stderr, eventChan, repoID, "kubectl") + + return kubectlCmd.Wait() +} + +func (k *KubectlClient) DeleteResources(ctx context.Context, eventChan chan<- *events.Event, repoID int64, appName string) error { + commands := [][]string{ + {"delete", "deployment", appName, "--namespace", k.namespace}, + {"delete", "service", appName, "--namespace", k.namespace}, + {"delete", "configmap", appName, "--namespace", k.namespace}, + } + + for _, args := range commands { + cmd := "kubectl" + if err := runKubectlCommand(ctx, eventChan, repoID, cmd, args...); err != nil { + // Don't fail if resources don't exist + if !strings.Contains(err.Error(), "not found") { + return fmt.Errorf("failed to delete resource with args %v: %w", args, err) + } + } + } + + return nil +} + +func (k *KubectlClient) ScaleDeployment(ctx context.Context, eventChan chan<- *events.Event, repoID int64, appName string, replicas int) error { + cmd := "kubectl" + args := []string{ + "scale", + "deployment", appName, + "--replicas", fmt.Sprintf("%d", replicas), + "--namespace", k.namespace, + } + + return runKubectlCommand(ctx, eventChan, repoID, cmd, args...) +} + +func (k *KubectlClient) GetDeploymentStatus(ctx context.Context, appName string) (string, error) { + cmd := "kubectl" + args := []string{ + "get", "deployment", appName, + "--namespace", k.namespace, + "-o", "jsonpath='{.status.readyReplicas}'", + } + + output, err := exec.CommandContext(ctx, cmd, args...).Output() + if err != nil { + return "", fmt.Errorf("failed to get deployment status: %w", err) + } + + return strings.Trim(string(output), "'"), nil +} + +func runKubectlCommand(ctx context.Context, eventChan chan<- *events.Event, repoID int64, cmd string, args ...string) error { + kubectlCmd := exec.CommandContext(ctx, cmd, args...) + stdout, _ := kubectlCmd.StdoutPipe() + stderr, _ := kubectlCmd.StderrPipe() + + if err := kubectlCmd.Start(); err != nil { + return fmt.Errorf("failed to start kubectl command: %w", err) + } + + go streamOutput(stdout, eventChan, repoID, "kubectl") + go streamOutput(stderr, eventChan, repoID, "kubectl") + + return kubectlCmd.Wait() +} + +func streamOutput(r interface{ Read([]byte) (int, error) }, eventChan chan<- *events.Event, repoID int64, source string) { + scanner := bufio.NewScanner(r) + for scanner.Scan() { + event := events.NewLogEvent(repoID, scanner.Text()) + event.Data["source"] = source + eventChan <- event + } +} + +func GetAppName(repo *model.Repo) string { + // Generate a consistent app name based on repo + parts := strings.Split(strings.TrimSuffix(repo.RepoURL, ".git"), "/") + repoName := parts[len(parts)-1] + return fmt.Sprintf("repo-%d-%s", repo.ID, strings.ToLower(repoName)) +} diff --git a/internal/k8s/manifests.go b/internal/k8s/manifests.go new file mode 100644 index 0000000..425d7ee --- /dev/null +++ b/internal/k8s/manifests.go @@ -0,0 +1,132 @@ +package k8s + +import ( + "fmt" + "strings" + + "deployment-manager/internal/model" +) + +func GenerateDeploymentManifest(repo *model.Repo, imageName string) string { + appName := GetAppName(repo) + port := getAppPort(repo.Type) + + return fmt.Sprintf(`apiVersion: apps/v1 +kind: Deployment +metadata: + name: %s + labels: + app: %s + repo-id: "%d" +spec: + replicas: 1 + selector: + matchLabels: + app: %s + template: + metadata: + labels: + app: %s + repo-id: "%d" + spec: + containers: + - name: %s + image: %s + ports: + - containerPort: %d + env: + - name: PORT + value: "%d" + resources: + requests: + memory: "64Mi" + cpu: "50m" + limits: + memory: "256Mi" + cpu: "200m" +--- +apiVersion: v1 +kind: Service +metadata: + name: %s + labels: + app: %s + repo-id: "%d" +spec: + selector: + app: %s + ports: + - port: 80 + targetPort: %d + protocol: TCP + type: ClusterIP +`, appName, appName, repo.ID, appName, appName, repo.ID, appName, imageName, port, port, appName, appName, repo.ID, appName, port) +} + +func GenerateConfigMapManifest(repo *model.Repo) string { + appName := GetAppName(repo) + + return fmt.Sprintf(`apiVersion: v1 +kind: ConfigMap +metadata: + name: %s + labels: + app: %s + repo-id: "%d" +data: + repo-url: "%s" + repo-type: "%s" + user-id: "%s" +`, appName, appName, repo.ID, repo.RepoURL, repo.Type, repo.UserID) +} + +func GenerateIngressManifest(repo *model.Repo) string { + appName := GetAppName(repo) + host := fmt.Sprintf("repo-%d.example.com", repo.ID) + + return fmt.Sprintf(`apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: %s + labels: + app: %s + repo-id: "%d" + annotations: + nginx.ingress.kubernetes.io/rewrite-target: / +spec: + rules: + - host: %s + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: %s + port: + number: 80 +`, appName, appName, repo.ID, host, appName) +} + +func getAppPort(repoType model.RepoType) int { + switch repoType { + case model.TypeNodeJS: + return 3000 + case model.TypePython: + return 8000 + default: + return 8080 + } +} + +func GenerateFullManifest(repo *model.Repo, imageName string) string { + var manifest strings.Builder + + manifest.WriteString(GenerateDeploymentManifest(repo, imageName)) + manifest.WriteString("\n---\n") + manifest.WriteString(GenerateConfigMapManifest(repo)) + manifest.WriteString("\n---\n") + manifest.WriteString(GenerateIngressManifest(repo)) + + return manifest.String() +} diff --git a/internal/model/repo.go b/internal/model/repo.go new file mode 100644 index 0000000..0114c9a --- /dev/null +++ b/internal/model/repo.go @@ -0,0 +1,33 @@ +package model + +import "time" + +type RepoStatus string +type RepoType string + +const ( + StatusNeedToDeploy RepoStatus = "need_to_deploy" + StatusDeploying RepoStatus = "deploying" + StatusDeployed RepoStatus = "deployed" + StatusError RepoStatus = "err" + StatusStopped RepoStatus = "stopped" + StatusRestarting RepoStatus = "restarting" + StatusDeleted RepoStatus = "deleted" +) + +const ( + TypeNodeJS RepoType = "nodejs" + TypePython RepoType = "python" +) + +type Repo struct { + ID int64 `json:"id"` + RepoURL string `json:"repo_url"` + Status RepoStatus `json:"status"` + UserID string `json:"user_id"` + Type RepoType `json:"type"` + ImageTag *string `json:"image_tag"` + LastError *string `json:"last_error"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} diff --git a/internal/reconciler/reconciler.go b/internal/reconciler/reconciler.go new file mode 100644 index 0000000..bb814d0 --- /dev/null +++ b/internal/reconciler/reconciler.go @@ -0,0 +1,90 @@ +package reconciler + +import ( + "context" + "database/sql" + "log" + "time" + + "deployment-manager/internal/db" + "deployment-manager/internal/model" +) + +type Reconciler struct { + repoStore *db.RepoStore + jobChan chan<- int64 + ticker *time.Ticker +} + +func NewReconciler(database *sql.DB, jobChan chan<- int64, tickInterval time.Duration) *Reconciler { + return &Reconciler{ + repoStore: db.NewRepoStore(database), + jobChan: jobChan, + ticker: time.NewTicker(tickInterval), + } +} + +func (r *Reconciler) Run(ctx context.Context) { + log.Println("Reconciler started") + + defer r.ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Println("Reconciler shutting down") + return + + case <-r.ticker.C: + if err := r.reconcile(ctx); err != nil { + log.Printf("Reconciliation error: %v", err) + } + } + } +} + +func (r *Reconciler) reconcile(ctx context.Context) error { + // Find repos that need to be deployed + repos, err := r.repoStore.ListByStatus(model.StatusNeedToDeploy) + if err != nil { + return err + } + + // Enqueue jobs for repos that need deployment + for _, repo := range repos { + select { + case r.jobChan <- repo.ID: + log.Printf("Enqueued deployment job for repo %d", repo.ID) + default: + log.Printf("Job queue full, skipping repo %d", repo.ID) + } + } + + // Find repos that are in error state and might need retry + errorRepos, err := r.repoStore.ListByStatus(model.StatusError) + if err != nil { + return err + } + + // Simple retry logic: retry errors older than 5 minutes + for _, repo := range errorRepos { + if time.Since(repo.UpdatedAt) > 5*time.Minute { + // Reset to need_to_deploy for retry + repo.Status = model.StatusNeedToDeploy + repo.LastError = nil + if err := r.repoStore.Update(repo); err != nil { + log.Printf("Failed to reset repo %d for retry: %v", repo.ID, err) + continue + } + + select { + case r.jobChan <- repo.ID: + log.Printf("Enqueued retry job for repo %d", repo.ID) + default: + log.Printf("Job queue full, skipping retry for repo %d", repo.ID) + } + } + } + + return nil +} diff --git a/internal/worker/deploy.go b/internal/worker/deploy.go new file mode 100644 index 0000000..0a78b50 --- /dev/null +++ b/internal/worker/deploy.go @@ -0,0 +1,106 @@ +package worker + +import ( + "context" + "database/sql" + + "deployment-manager/internal/db" + "deployment-manager/internal/events" + "deployment-manager/internal/k8s" + "deployment-manager/internal/model" +) + +type DeploymentManager struct { + repoStore *db.RepoStore + kubectl *k8s.KubectlClient + eventChan chan<- *events.Event +} + +func NewDeploymentManager(database *sql.DB, eventChan chan<- *events.Event) *DeploymentManager { + return &DeploymentManager{ + repoStore: db.NewRepoStore(database), + kubectl: k8s.NewKubectlClient("default"), + eventChan: eventChan, + } +} + +func (dm *DeploymentManager) StopDeployment(ctx context.Context, repoID int64) error { + repo, err := dm.repoStore.Get(repoID) + if err != nil { + return err + } + + appName := k8s.GetAppName(repo) + + // Scale deployment to 0 + if err := dm.kubectl.ScaleDeployment(ctx, dm.eventChan, repoID, appName, 0); err != nil { + return err + } + + // Update status + repo.Status = model.StatusStopped + if err := dm.repoStore.Update(repo); err != nil { + return err + } + + // Send event + dm.eventChan <- events.NewRepoEvent(repo, events.EventTypeRepoUpdated) + + return nil +} + +func (dm *DeploymentManager) RestartDeployment(ctx context.Context, repoID int64) error { + repo, err := dm.repoStore.Get(repoID) + if err != nil { + return err + } + + appName := k8s.GetAppName(repo) + + // Update status to restarting + repo.Status = model.StatusRestarting + if err := dm.repoStore.Update(repo); err != nil { + return err + } + + // Scale deployment to 1 + if err := dm.kubectl.ScaleDeployment(ctx, dm.eventChan, repoID, appName, 1); err != nil { + return err + } + + // Update status to deployed + repo.Status = model.StatusDeployed + if err := dm.repoStore.Update(repo); err != nil { + return err + } + + // Send event + dm.eventChan <- events.NewRepoEvent(repo, events.EventTypeRepoUpdated) + + return nil +} + +func (dm *DeploymentManager) DeleteDeployment(ctx context.Context, repoID int64) error { + repo, err := dm.repoStore.Get(repoID) + if err != nil { + return err + } + + appName := k8s.GetAppName(repo) + + // Delete Kubernetes resources + if err := dm.kubectl.DeleteResources(ctx, dm.eventChan, repoID, appName); err != nil { + return err + } + + // Update status to deleted + repo.Status = model.StatusDeleted + if err := dm.repoStore.Update(repo); err != nil { + return err + } + + // Send event + dm.eventChan <- events.NewRepoEvent(repo, events.EventTypeRepoDeleted) + + return nil +} diff --git a/internal/worker/pool.go b/internal/worker/pool.go new file mode 100644 index 0000000..7c0af03 --- /dev/null +++ b/internal/worker/pool.go @@ -0,0 +1,151 @@ +package worker + +import ( + "context" + "log" + "database/sql" + + "deployment-manager/internal/db" + "deployment-manager/internal/events" + "deployment-manager/internal/executor" + "deployment-manager/internal/k8s" + "deployment-manager/internal/model" +) + +type Worker struct { + id int + db *sql.DB + repoStore *db.RepoStore + eventChan chan<- *events.Event + jobChan <-chan int64 +} + +func NewWorker(id int, database *sql.DB, eventChan chan<- *events.Event, jobChan <-chan int64) *Worker { + return &Worker{ + id: id, + db: database, + repoStore: db.NewRepoStore(database), + eventChan: eventChan, + jobChan: jobChan, + } +} + +func (w *Worker) Run(ctx context.Context) { + log.Printf("Worker %d started", w.id) + + for { + select { + case <-ctx.Done(): + log.Printf("Worker %d shutting down", w.id) + return + + case repoID := <-w.jobChan: + if err := w.processJob(ctx, repoID); err != nil { + log.Printf("Worker %d failed to process job %d: %v", w.id, repoID, err) + } + } + } +} + +func (w *Worker) processJob(ctx context.Context, repoID int64) error { + log.Printf("Worker %d processing job for repo %d", w.id, repoID) + + // Get repo from database + repo, err := w.repoStore.Get(repoID) + if err != nil { + return err + } + + // Update status to deploying + repo.Status = model.StatusDeploying + if err := w.repoStore.Update(repo); err != nil { + return err + } + + // Send deployment started event + w.eventChan <- events.NewRepoEvent(repo, events.EventTypeDeployStarted) + + // Perform deployment + if err := w.deploy(ctx, repo); err != nil { + // Update status to error + repo.Status = model.StatusError + repo.LastError = &[]string{err.Error()}[0] + if updateErr := w.repoStore.Update(repo); updateErr != nil { + log.Printf("Failed to update repo error status: %v", updateErr) + } + + // Send error event + w.eventChan <- events.NewRepoEvent(repo, events.EventTypeDeployError) + return err + } + + // Update status to deployed + repo.Status = model.StatusDeployed + repo.LastError = nil + if err := w.repoStore.Update(repo); err != nil { + return err + } + + // Send success event + w.eventChan <- events.NewRepoEvent(repo, events.EventTypeDeploySuccess) + + log.Printf("Worker %d completed deployment for repo %d", w.id, repoID) + return nil +} + +func (w *Worker) deploy(ctx context.Context, repo *model.Repo) error { + // Clone repository + workDir, err := w.cloneRepo(ctx, repo) + if err != nil { + return err + } + + // Create Dockerfile + if err := executor.CreateDockerfile(workDir, repo.Type); err != nil { + return err + } + + // Build and push Docker image + imageName, err := executor.BuildAndPushImage(ctx, w.eventChan, repo, workDir) + if err != nil { + return err + } + + // Update repo with image tag + repo.ImageTag = &imageName + if err := w.repoStore.Update(repo); err != nil { + return err + } + + // Deploy to Kubernetes + if err := w.deployToK8s(ctx, repo, imageName); err != nil { + return err + } + + return nil +} + +func (w *Worker) cloneRepo(ctx context.Context, repo *model.Repo) (string, error) { + workDir := "/tmp/repo-" + string(rune(repo.ID)) + + // Clone the repository + if err := executor.RunCmd(ctx, w.eventChan, repo.ID, "git", "clone", repo.RepoURL, workDir); err != nil { + return "", err + } + + return workDir, nil +} + +func (w *Worker) deployToK8s(ctx context.Context, repo *model.Repo, imageName string) error { + kubectl := k8s.NewKubectlClient("default") + + // Generate manifest + manifest := k8s.GenerateFullManifest(repo, imageName) + + // Apply manifest + if err := kubectl.ApplyManifest(ctx, w.eventChan, repo.ID, manifest); err != nil { + return err + } + + return nil +} diff --git a/manager b/manager new file mode 100755 index 0000000..cafe3b8 Binary files /dev/null and b/manager differ diff --git a/manifests/nodejs.yaml b/manifests/nodejs.yaml new file mode 100644 index 0000000..1abf689 --- /dev/null +++ b/manifests/nodejs.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.AppName}} + labels: + app: {{.AppName}} + repo-id: "{{.RepoID}}" +spec: + replicas: 1 + selector: + matchLabels: + app: {{.AppName}} + template: + metadata: + labels: + app: {{.AppName}} + repo-id: "{{.RepoID}}" + spec: + containers: + - name: {{.AppName}} + image: {{.ImageName}} + ports: + - containerPort: 3000 + env: + - name: PORT + value: "3000" + - name: NODE_ENV + value: "production" + resources: + requests: + memory: "64Mi" + cpu: "50m" + limits: + memory: "256Mi" + cpu: "200m" +--- +apiVersion: v1 +kind: Service +metadata: + name: {{.AppName}} + labels: + app: {{.AppName}} + repo-id: "{{.RepoID}}" +spec: + selector: + app: {{.AppName}} + ports: + - port: 80 + targetPort: 3000 + protocol: TCP + type: ClusterIP \ No newline at end of file diff --git a/manifests/python.yaml b/manifests/python.yaml new file mode 100644 index 0000000..650200b --- /dev/null +++ b/manifests/python.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.AppName}} + labels: + app: {{.AppName}} + repo-id: "{{.RepoID}}" +spec: + replicas: 1 + selector: + matchLabels: + app: {{.AppName}} + template: + metadata: + labels: + app: {{.AppName}} + repo-id: "{{.RepoID}}" + spec: + containers: + - name: {{.AppName}} + image: {{.ImageName}} + ports: + - containerPort: 8000 + env: + - name: PORT + value: "8000" + - name: PYTHONUNBUFFERED + value: "1" + resources: + requests: + memory: "64Mi" + cpu: "50m" + limits: + memory: "256Mi" + cpu: "200m" +--- +apiVersion: v1 +kind: Service +metadata: + name: {{.AppName}} + labels: + app: {{.AppName}} + repo-id: "{{.RepoID}}" +spec: + selector: + app: {{.AppName}} + ports: + - port: 80 + targetPort: 8000 + protocol: TCP + type: ClusterIP \ No newline at end of file diff --git a/migrations/001_init.sql b/migrations/001_init.sql new file mode 100644 index 0000000..5cab4cc --- /dev/null +++ b/migrations/001_init.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS repos ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + repo_url TEXT NOT NULL, + status TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, -- nodejs | python + image_tag TEXT, + last_error TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_repos_status ON repos(status); \ No newline at end of file diff --git a/readme.txt b/readme.txt new file mode 100644 index 0000000..94acce3 --- /dev/null +++ b/readme.txt @@ -0,0 +1 @@ +deployment manager it will read the repo url from the database and deploy it to k8s \ No newline at end of file diff --git a/sqlite.schema b/sqlite.schema new file mode 100644 index 0000000..ec2f6c2 --- /dev/null +++ b/sqlite.schema @@ -0,0 +1,13 @@ +CREATE TABLE repos ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + repo_url TEXT NOT NULL, + status TEXT NOT NULL, + user_id TEXT NOT NULL, + type TEXT NOT NULL, -- nodejs | python + image_tag TEXT, + last_error TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_repos_status ON repos(status);