Files
deployment-manager/internal/worker/pool.go
2026-02-01 20:22:29 +05:30

152 lines
3.5 KiB
Go

package worker
import (
"context"
"log"
"database/sql"
"deployment-manager/internal/db"
"deployment-manager/internal/events"
"deployment-manager/internal/executor"
"deployment-manager/internal/k8s"
"deployment-manager/internal/model"
)
type Worker struct {
id int
db *sql.DB
repoStore *db.RepoStore
eventChan chan<- *events.Event
jobChan <-chan int64
}
func NewWorker(id int, database *sql.DB, eventChan chan<- *events.Event, jobChan <-chan int64) *Worker {
return &Worker{
id: id,
db: database,
repoStore: db.NewRepoStore(database),
eventChan: eventChan,
jobChan: jobChan,
}
}
func (w *Worker) Run(ctx context.Context) {
log.Printf("Worker %d started", w.id)
for {
select {
case <-ctx.Done():
log.Printf("Worker %d shutting down", w.id)
return
case repoID := <-w.jobChan:
if err := w.processJob(ctx, repoID); err != nil {
log.Printf("Worker %d failed to process job %d: %v", w.id, repoID, err)
}
}
}
}
func (w *Worker) processJob(ctx context.Context, repoID int64) error {
log.Printf("Worker %d processing job for repo %d", w.id, repoID)
// Get repo from database
repo, err := w.repoStore.Get(repoID)
if err != nil {
return err
}
// Update status to deploying
repo.Status = model.StatusDeploying
if err := w.repoStore.Update(repo); err != nil {
return err
}
// Send deployment started event
w.eventChan <- events.NewRepoEvent(repo, events.EventTypeDeployStarted)
// Perform deployment
if err := w.deploy(ctx, repo); err != nil {
// Update status to error
repo.Status = model.StatusError
repo.LastError = &[]string{err.Error()}[0]
if updateErr := w.repoStore.Update(repo); updateErr != nil {
log.Printf("Failed to update repo error status: %v", updateErr)
}
// Send error event
w.eventChan <- events.NewRepoEvent(repo, events.EventTypeDeployError)
return err
}
// Update status to deployed
repo.Status = model.StatusDeployed
repo.LastError = nil
if err := w.repoStore.Update(repo); err != nil {
return err
}
// Send success event
w.eventChan <- events.NewRepoEvent(repo, events.EventTypeDeploySuccess)
log.Printf("Worker %d completed deployment for repo %d", w.id, repoID)
return nil
}
func (w *Worker) deploy(ctx context.Context, repo *model.Repo) error {
// Clone repository
workDir, err := w.cloneRepo(ctx, repo)
if err != nil {
return err
}
// Create Dockerfile
if err := executor.CreateDockerfile(workDir, repo.Type); err != nil {
return err
}
// Build and push Docker image
imageName, err := executor.BuildAndPushImage(ctx, w.eventChan, repo, workDir)
if err != nil {
return err
}
// Update repo with image tag
repo.ImageTag = &imageName
if err := w.repoStore.Update(repo); err != nil {
return err
}
// Deploy to Kubernetes
if err := w.deployToK8s(ctx, repo, imageName); err != nil {
return err
}
return nil
}
func (w *Worker) cloneRepo(ctx context.Context, repo *model.Repo) (string, error) {
workDir := "/tmp/repo-" + string(rune(repo.ID))
// Clone the repository
if err := executor.RunCmd(ctx, w.eventChan, repo.ID, "git", "clone", repo.RepoURL, workDir); err != nil {
return "", err
}
return workDir, nil
}
func (w *Worker) deployToK8s(ctx context.Context, repo *model.Repo, imageName string) error {
kubectl := k8s.NewKubectlClient("default")
// Generate manifest
manifest := k8s.GenerateFullManifest(repo, imageName)
// Apply manifest
if err := kubectl.ApplyManifest(ctx, w.eventChan, repo.ID, manifest); err != nil {
return err
}
return nil
}