package api import ( "context" "encoding/json" "fmt" "log" "net/http" "deployment-manager/internal/events" ) type SSEHandler struct { eventBus *events.Bus } func NewSSEHandler(eventBus *events.Bus) *SSEHandler { return &SSEHandler{eventBus: eventBus} } func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") // Create a context for this connection ctx, cancel := context.WithCancel(r.Context()) defer cancel() // Subscribe to events eventChan := h.eventBus.Subscribe(ctx) // Send initial connection event h.sendEvent(w, "connected", map[string]interface{}{ "message": "SSE connection established", }) // Handle events for { select { case <-ctx.Done(): log.Println("SSE client disconnected") return case event := <-eventChan: if err := h.sendEvent(w, string(event.Type), event.Data); err != nil { log.Printf("Failed to send SSE event: %v", err) return } } } } func (h *SSEHandler) sendEvent(w http.ResponseWriter, eventType string, data interface{}) error { // Marshal data to JSON jsonData, err := json.Marshal(data) if err != nil { return err } // Format as SSE event event := fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, jsonData) // Write to response _, err = w.Write([]byte(event)) if err != nil { return err } // Flush to ensure immediate delivery if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } return nil }