package worker import ( "context" "log" "database/sql" "deployment-manager/internal/db" "deployment-manager/internal/events" "deployment-manager/internal/executor" "deployment-manager/internal/k8s" "deployment-manager/internal/model" ) type Worker struct { id int db *sql.DB repoStore *db.RepoStore eventChan chan<- *events.Event jobChan <-chan int64 kubeconfig string namespace string } func NewWorker(id int, database *sql.DB, eventChan chan<- *events.Event, jobChan <-chan int64, kubeconfig, namespace string) *Worker { return &Worker{ id: id, db: database, repoStore: db.NewRepoStore(database), eventChan: eventChan, jobChan: jobChan, kubeconfig: kubeconfig, namespace: namespace, } } func (w *Worker) Run(ctx context.Context) { log.Printf("Worker %d started", w.id) for { select { case <-ctx.Done(): log.Printf("Worker %d shutting down", w.id) return case repoID := <-w.jobChan: if err := w.processJob(ctx, repoID); err != nil { log.Printf("Worker %d failed to process job %d: %v", w.id, repoID, err) } } } } func (w *Worker) processJob(ctx context.Context, repoID int64) error { log.Printf("Worker %d processing job for repo %d", w.id, repoID) // Get repo from database repo, err := w.repoStore.Get(repoID) if err != nil { return err } // Update status to deploying repo.Status = model.StatusDeploying if err := w.repoStore.Update(repo); err != nil { return err } // Send deployment started event w.eventChan <- events.NewRepoEvent(repo, events.EventTypeDeployStarted) // Perform deployment if err := w.deploy(ctx, repo); err != nil { // Update status to error repo.Status = model.StatusError repo.LastError = &[]string{err.Error()}[0] if updateErr := w.repoStore.Update(repo); updateErr != nil { log.Printf("Failed to update repo error status: %v", updateErr) } // Send error event w.eventChan <- events.NewRepoEvent(repo, events.EventTypeDeployError) return err } // Update status to deployed repo.Status = model.StatusDeployed repo.LastError = nil if err := w.repoStore.Update(repo); err != nil { return err } // Send success event w.eventChan <- events.NewRepoEvent(repo, events.EventTypeDeploySuccess) log.Printf("Worker %d completed deployment for repo %d", w.id, repoID) return nil } func (w *Worker) deploy(ctx context.Context, repo *model.Repo) error { // Clone repository workDir, err := w.cloneRepo(ctx, repo) if err != nil { return err } // Create Dockerfile if err := executor.CreateDockerfile(workDir, repo.Type); err != nil { return err } // Build and push Docker image imageName, err := executor.BuildAndPushImage(ctx, w.eventChan, repo, workDir) if err != nil { return err } // Update repo with image tag repo.ImageTag = &imageName if err := w.repoStore.Update(repo); err != nil { return err } // Deploy to Kubernetes if err := w.deployToK8s(ctx, repo, imageName); err != nil { return err } return nil } func (w *Worker) cloneRepo(ctx context.Context, repo *model.Repo) (string, error) { workDir := "/tmp/repo-" + string(rune(repo.ID)) // Clone the repository if err := executor.RunCmd(ctx, w.eventChan, repo.ID, "git", "clone", repo.RepoURL, workDir); err != nil { return "", err } return workDir, nil } func (w *Worker) deployToK8s(ctx context.Context, repo *model.Repo, imageName string) error { kubectl := k8s.NewKubectlClient(w.namespace, w.kubeconfig) // Generate manifest manifest := k8s.GenerateFullManifest(repo, imageName) // Apply manifest if err := kubectl.ApplyManifest(ctx, w.eventChan, repo.ID, manifest); err != nil { return err } return nil }