Spaces:
Sleeping
Sleeping
File size: 4,230 Bytes
287a0bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
package notification
import (
"context"
"sync/atomic"
"github.com/chroma/chroma-coordinator/internal/common"
"github.com/chroma/chroma-coordinator/internal/model"
"github.com/pingcap/log"
"go.uber.org/zap"
)
type NotificationProcessor interface {
common.Component
Process(ctx context.Context) error
Trigger(ctx context.Context, triggerMsg TriggerMessage)
}
type SimpleNotificationProcessor struct {
ctx context.Context
store NotificationStore
notifer Notifier
channel chan TriggerMessage
doneChannel chan bool
running atomic.Bool
}
type TriggerMessage struct {
Msg model.Notification
ResultChan chan error
}
const triggerChannelSize = 1000
var _ NotificationProcessor = &SimpleNotificationProcessor{}
func NewSimpleNotificationProcessor(ctx context.Context, store NotificationStore, notifier Notifier) *SimpleNotificationProcessor {
return &SimpleNotificationProcessor{
ctx: ctx,
store: store,
notifer: notifier,
channel: make(chan TriggerMessage, triggerChannelSize),
doneChannel: make(chan bool),
}
}
func (n *SimpleNotificationProcessor) Start() error {
// During startup, first sending all pending notifications in the store to the notification topic
log.Info("Starting notification processor")
err := n.sendPendingNotifications(n.ctx)
if err != nil {
log.Error("Failed to send pending notifications", zap.Error(err))
return err
}
n.running.Store(true)
go n.Process(n.ctx)
return nil
}
func (n *SimpleNotificationProcessor) Stop() error {
n.running.Store(false)
n.doneChannel <- true
return nil
}
func (n *SimpleNotificationProcessor) Process(ctx context.Context) error {
log.Info("Waiting for new notifications")
for {
select {
case triggerMsg := <-n.channel:
msg := triggerMsg.Msg
log.Info("Received notification", zap.Any("msg", msg))
running := n.running.Load()
log.Info("Notification processor is running", zap.Bool("running", running))
// We need to block here until the notifications are sent successfully
for running {
// Check the notification store if this notification is already processed
// If it is already processed, just return
// If it is not processed, send notifications and remove from the store
notifications, err := n.store.GetNotifications(ctx, msg.CollectionID)
if err != nil {
log.Error("Failed to get notifications", zap.Error(err))
triggerMsg.ResultChan <- err
continue
}
if len(notifications) == 0 {
log.Info("No pending notifications found")
triggerMsg.ResultChan <- nil
break
}
log.Info("Got notifications from notification store", zap.Any("notifications", notifications))
err = n.notifer.Notify(ctx, notifications)
if err != nil {
log.Error("Failed to send pending notifications", zap.Error(err))
} else {
n.store.RemoveNotifications(ctx, notifications)
log.Info("Rmove notifications from notification store", zap.Any("notifications", notifications))
triggerMsg.ResultChan <- nil
break
}
}
case <-n.doneChannel:
log.Info("Stopping notification processor")
return nil
}
}
}
func (n *SimpleNotificationProcessor) Trigger(ctx context.Context, triggerMsg TriggerMessage) {
log.Info("Triggering notification", zap.Any("msg", triggerMsg.Msg))
if len(n.channel) == triggerChannelSize {
log.Error("Notification channel is full, dropping notification", zap.Any("msg", triggerMsg.Msg))
triggerMsg.ResultChan <- nil
return
}
n.channel <- triggerMsg
}
func (n *SimpleNotificationProcessor) sendPendingNotifications(ctx context.Context) error {
notificationMap, err := n.store.GetAllPendingNotifications(ctx)
if err != nil {
log.Error("Failed to get all pending notifications", zap.Error(err))
return err
}
for collectionID, notifications := range notificationMap {
log.Info("Sending pending notifications", zap.Any("collectionID", collectionID), zap.Any("notifications", notifications))
for {
err = n.notifer.Notify(ctx, notifications)
if err != nil {
log.Error("Failed to send pending notifications", zap.Error(err))
} else {
n.store.RemoveNotifications(ctx, notifications)
break
}
}
}
return nil
}
|