91 lines
2.0 KiB
Go
91 lines
2.0 KiB
Go
package reconciler
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"log"
|
|
"time"
|
|
|
|
"deployment-manager/internal/db"
|
|
"deployment-manager/internal/model"
|
|
)
|
|
|
|
type Reconciler struct {
|
|
repoStore *db.RepoStore
|
|
jobChan chan<- int64
|
|
ticker *time.Ticker
|
|
}
|
|
|
|
func NewReconciler(database *sql.DB, jobChan chan<- int64, tickInterval time.Duration) *Reconciler {
|
|
return &Reconciler{
|
|
repoStore: db.NewRepoStore(database),
|
|
jobChan: jobChan,
|
|
ticker: time.NewTicker(tickInterval),
|
|
}
|
|
}
|
|
|
|
func (r *Reconciler) Run(ctx context.Context) {
|
|
log.Println("Reconciler started")
|
|
|
|
defer r.ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Println("Reconciler shutting down")
|
|
return
|
|
|
|
case <-r.ticker.C:
|
|
if err := r.reconcile(ctx); err != nil {
|
|
log.Printf("Reconciliation error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Reconciler) reconcile(ctx context.Context) error {
|
|
// Find repos that need to be deployed
|
|
repos, err := r.repoStore.ListByStatus(model.StatusNeedToDeploy)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Enqueue jobs for repos that need deployment
|
|
for _, repo := range repos {
|
|
select {
|
|
case r.jobChan <- repo.ID:
|
|
log.Printf("Enqueued deployment job for repo %d", repo.ID)
|
|
default:
|
|
log.Printf("Job queue full, skipping repo %d", repo.ID)
|
|
}
|
|
}
|
|
|
|
// Find repos that are in error state and might need retry
|
|
errorRepos, err := r.repoStore.ListByStatus(model.StatusError)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Simple retry logic: retry errors older than 5 minutes
|
|
for _, repo := range errorRepos {
|
|
if time.Since(repo.UpdatedAt) > 5*time.Minute {
|
|
// Reset to need_to_deploy for retry
|
|
repo.Status = model.StatusNeedToDeploy
|
|
repo.LastError = nil
|
|
if err := r.repoStore.Update(repo); err != nil {
|
|
log.Printf("Failed to reset repo %d for retry: %v", repo.ID, err)
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case r.jobChan <- repo.ID:
|
|
log.Printf("Enqueued retry job for repo %d", repo.ID)
|
|
default:
|
|
log.Printf("Job queue full, skipping retry for repo %d", repo.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|