package reconciler import ( "context" "database/sql" "log" "time" "deployment-manager/internal/db" "deployment-manager/internal/model" ) type Reconciler struct { repoStore *db.RepoStore jobChan chan<- int64 ticker *time.Ticker } func NewReconciler(database *sql.DB, jobChan chan<- int64, tickInterval time.Duration) *Reconciler { return &Reconciler{ repoStore: db.NewRepoStore(database), jobChan: jobChan, ticker: time.NewTicker(tickInterval), } } func (r *Reconciler) Run(ctx context.Context) { log.Println("Reconciler started") defer r.ticker.Stop() for { select { case <-ctx.Done(): log.Println("Reconciler shutting down") return case <-r.ticker.C: if err := r.reconcile(ctx); err != nil { log.Printf("Reconciliation error: %v", err) } } } } func (r *Reconciler) reconcile(ctx context.Context) error { // Find repos that need to be deployed repos, err := r.repoStore.ListByStatus(model.StatusNeedToDeploy) if err != nil { return err } // Enqueue jobs for repos that need deployment for _, repo := range repos { select { case r.jobChan <- repo.ID: log.Printf("Enqueued deployment job for repo %d", repo.ID) default: log.Printf("Job queue full, skipping repo %d", repo.ID) } } // Find repos that are in error state and might need retry errorRepos, err := r.repoStore.ListByStatus(model.StatusError) if err != nil { return err } // Simple retry logic: retry errors older than 5 minutes for _, repo := range errorRepos { if time.Since(repo.UpdatedAt) > 5*time.Minute { // Reset to need_to_deploy for retry repo.Status = model.StatusNeedToDeploy repo.LastError = nil if err := r.repoStore.Update(repo); err != nil { log.Printf("Failed to reset repo %d for retry: %v", repo.ID, err) continue } select { case r.jobChan <- repo.ID: log.Printf("Enqueued retry job for repo %d", repo.ID) default: log.Printf("Job queue full, skipping retry for repo %d", repo.ID) } } } return nil }