File size: 4,230 Bytes
287a0bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package notification

import (
	"context"
	"sync/atomic"

	"github.com/chroma/chroma-coordinator/internal/common"
	"github.com/chroma/chroma-coordinator/internal/model"
	"github.com/pingcap/log"
	"go.uber.org/zap"
)

type NotificationProcessor interface {
	common.Component
	Process(ctx context.Context) error
	Trigger(ctx context.Context, triggerMsg TriggerMessage)
}

type SimpleNotificationProcessor struct {
	ctx         context.Context
	store       NotificationStore
	notifer     Notifier
	channel     chan TriggerMessage
	doneChannel chan bool
	running     atomic.Bool
}

type TriggerMessage struct {
	Msg        model.Notification
	ResultChan chan error
}

const triggerChannelSize = 1000

var _ NotificationProcessor = &SimpleNotificationProcessor{}

func NewSimpleNotificationProcessor(ctx context.Context, store NotificationStore, notifier Notifier) *SimpleNotificationProcessor {
	return &SimpleNotificationProcessor{
		ctx:         ctx,
		store:       store,
		notifer:     notifier,
		channel:     make(chan TriggerMessage, triggerChannelSize),
		doneChannel: make(chan bool),
	}
}

func (n *SimpleNotificationProcessor) Start() error {
	// During startup, first sending all pending notifications in the store to the notification topic
	log.Info("Starting notification processor")
	err := n.sendPendingNotifications(n.ctx)
	if err != nil {
		log.Error("Failed to send pending notifications", zap.Error(err))
		return err
	}
	n.running.Store(true)
	go n.Process(n.ctx)
	return nil
}

func (n *SimpleNotificationProcessor) Stop() error {
	n.running.Store(false)
	n.doneChannel <- true
	return nil
}

func (n *SimpleNotificationProcessor) Process(ctx context.Context) error {
	log.Info("Waiting for new notifications")
	for {
		select {
		case triggerMsg := <-n.channel:
			msg := triggerMsg.Msg
			log.Info("Received notification", zap.Any("msg", msg))
			running := n.running.Load()
			log.Info("Notification processor is running", zap.Bool("running", running))
			// We need to block here until the notifications are sent successfully
			for running {
				// Check the notification store if this notification is already processed
				// If it is already processed, just return
				// If it is not processed, send notifications and remove from the store
				notifications, err := n.store.GetNotifications(ctx, msg.CollectionID)
				if err != nil {
					log.Error("Failed to get notifications", zap.Error(err))
					triggerMsg.ResultChan <- err
					continue
				}
				if len(notifications) == 0 {
					log.Info("No pending notifications found")
					triggerMsg.ResultChan <- nil
					break
				}
				log.Info("Got notifications from notification store", zap.Any("notifications", notifications))
				err = n.notifer.Notify(ctx, notifications)
				if err != nil {
					log.Error("Failed to send pending notifications", zap.Error(err))
				} else {
					n.store.RemoveNotifications(ctx, notifications)
					log.Info("Rmove notifications from notification store", zap.Any("notifications", notifications))
					triggerMsg.ResultChan <- nil
					break
				}
			}
		case <-n.doneChannel:
			log.Info("Stopping notification processor")
			return nil
		}
	}
}

func (n *SimpleNotificationProcessor) Trigger(ctx context.Context, triggerMsg TriggerMessage) {
	log.Info("Triggering notification", zap.Any("msg", triggerMsg.Msg))
	if len(n.channel) == triggerChannelSize {
		log.Error("Notification channel is full, dropping notification", zap.Any("msg", triggerMsg.Msg))
		triggerMsg.ResultChan <- nil
		return
	}
	n.channel <- triggerMsg
}

func (n *SimpleNotificationProcessor) sendPendingNotifications(ctx context.Context) error {
	notificationMap, err := n.store.GetAllPendingNotifications(ctx)
	if err != nil {
		log.Error("Failed to get all pending notifications", zap.Error(err))
		return err
	}
	for collectionID, notifications := range notificationMap {
		log.Info("Sending pending notifications", zap.Any("collectionID", collectionID), zap.Any("notifications", notifications))
		for {
			err = n.notifer.Notify(ctx, notifications)
			if err != nil {
				log.Error("Failed to send pending notifications", zap.Error(err))
			} else {
				n.store.RemoveNotifications(ctx, notifications)
				break
			}
		}
	}
	return nil
}