package notification

import (
	"context"
	"sync/atomic"

	"github.com/chroma/chroma-coordinator/internal/common"
	"github.com/chroma/chroma-coordinator/internal/model"
	"github.com/pingcap/log"
	"go.uber.org/zap"
)

type NotificationProcessor interface {
	common.Component
	Process(ctx context.Context) error
	Trigger(ctx context.Context, triggerMsg TriggerMessage)
}

type SimpleNotificationProcessor struct {
	ctx         context.Context
	store       NotificationStore
	notifer     Notifier
	channel     chan TriggerMessage
	doneChannel chan bool
	running     atomic.Bool
}

type TriggerMessage struct {
	Msg        model.Notification
	ResultChan chan error
}

const triggerChannelSize = 1000

var _ NotificationProcessor = &SimpleNotificationProcessor{}

func NewSimpleNotificationProcessor(ctx context.Context, store NotificationStore, notifier Notifier) *SimpleNotificationProcessor {
	return &SimpleNotificationProcessor{
		ctx:         ctx,
		store:       store,
		notifer:     notifier,
		channel:     make(chan TriggerMessage, triggerChannelSize),
		doneChannel: make(chan bool),
	}
}

func (n *SimpleNotificationProcessor) Start() error {
	// During startup, first sending all pending notifications in the store to the notification topic
	log.Info("Starting notification processor")
	err := n.sendPendingNotifications(n.ctx)
	if err != nil {
		log.Error("Failed to send pending notifications", zap.Error(err))
		return err
	}
	n.running.Store(true)
	go n.Process(n.ctx)
	return nil
}

func (n *SimpleNotificationProcessor) Stop() error {
	n.running.Store(false)
	n.doneChannel <- true
	return nil
}

func (n *SimpleNotificationProcessor) Process(ctx context.Context) error {
	log.Info("Waiting for new notifications")
	for {
		select {
		case triggerMsg := <-n.channel:
			msg := triggerMsg.Msg
			log.Info("Received notification", zap.Any("msg", msg))
			running := n.running.Load()
			log.Info("Notification processor is running", zap.Bool("running", running))
			// We need to block here until the notifications are sent successfully
			for running {
				// Check the notification store if this notification is already processed
				// If it is already processed, just return
				// If it is not processed, send notifications and remove from the store
				notifications, err := n.store.GetNotifications(ctx, msg.CollectionID)
				if err != nil {
					log.Error("Failed to get notifications", zap.Error(err))
					triggerMsg.ResultChan <- err
					continue
				}
				if len(notifications) == 0 {
					log.Info("No pending notifications found")
					triggerMsg.ResultChan <- nil
					break
				}
				log.Info("Got notifications from notification store", zap.Any("notifications", notifications))
				err = n.notifer.Notify(ctx, notifications)
				if err != nil {
					log.Error("Failed to send pending notifications", zap.Error(err))
				} else {
					n.store.RemoveNotifications(ctx, notifications)
					log.Info("Rmove notifications from notification store", zap.Any("notifications", notifications))
					triggerMsg.ResultChan <- nil
					break
				}
			}
		case <-n.doneChannel:
			log.Info("Stopping notification processor")
			return nil
		}
	}
}

func (n *SimpleNotificationProcessor) Trigger(ctx context.Context, triggerMsg TriggerMessage) {
	log.Info("Triggering notification", zap.Any("msg", triggerMsg.Msg))
	if len(n.channel) == triggerChannelSize {
		log.Error("Notification channel is full, dropping notification", zap.Any("msg", triggerMsg.Msg))
		triggerMsg.ResultChan <- nil
		return
	}
	n.channel <- triggerMsg
}

func (n *SimpleNotificationProcessor) sendPendingNotifications(ctx context.Context) error {
	notificationMap, err := n.store.GetAllPendingNotifications(ctx)
	if err != nil {
		log.Error("Failed to get all pending notifications", zap.Error(err))
		return err
	}
	for collectionID, notifications := range notificationMap {
		log.Info("Sending pending notifications", zap.Any("collectionID", collectionID), zap.Any("notifications", notifications))
		for {
			err = n.notifer.Notify(ctx, notifications)
			if err != nil {
				log.Error("Failed to send pending notifications", zap.Error(err))
			} else {
				n.store.RemoveNotifications(ctx, notifications)
				break
			}
		}
	}
	return nil
}