File size: 4,234 Bytes
7def60a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
package model
import (
"sync"
"time"
process "github.com/mudler/go-processmanager"
"github.com/rs/zerolog/log"
)
// All GRPC Clients created by ModelLoader should have an associated injected
// watchdog that will keep track of the state of each backend (busy or not)
// and for how much time it has been busy.
// If a backend is busy for too long, the watchdog will kill the process and
// force a reload of the model
// The watchdog runs as a separate go routine,
// and the GRPC client talks to it via a channel to send status updates
type WatchDog struct {
sync.Mutex
timetable map[string]time.Time
idleTime map[string]time.Time
timeout, idletimeout time.Duration
addressMap map[string]*process.Process
addressModelMap map[string]string
pm ProcessManager
stop chan bool
busyCheck, idleCheck bool
}
type ProcessManager interface {
ShutdownModel(modelName string) error
}
func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy, idle bool) *WatchDog {
return &WatchDog{
timeout: timeoutBusy,
idletimeout: timeoutIdle,
pm: pm,
timetable: make(map[string]time.Time),
idleTime: make(map[string]time.Time),
addressMap: make(map[string]*process.Process),
busyCheck: busy,
idleCheck: idle,
addressModelMap: make(map[string]string),
}
}
func (wd *WatchDog) Shutdown() {
wd.Lock()
defer wd.Unlock()
wd.stop <- true
}
func (wd *WatchDog) AddAddressModelMap(address string, model string) {
wd.Lock()
defer wd.Unlock()
wd.addressModelMap[address] = model
}
func (wd *WatchDog) Add(address string, p *process.Process) {
wd.Lock()
defer wd.Unlock()
wd.addressMap[address] = p
}
func (wd *WatchDog) Mark(address string) {
wd.Lock()
defer wd.Unlock()
wd.timetable[address] = time.Now()
delete(wd.idleTime, address)
}
func (wd *WatchDog) UnMark(ModelAddress string) {
wd.Lock()
defer wd.Unlock()
delete(wd.timetable, ModelAddress)
wd.idleTime[ModelAddress] = time.Now()
}
func (wd *WatchDog) Run() {
log.Info().Msg("[WatchDog] starting watchdog")
for {
select {
case <-wd.stop:
log.Info().Msg("[WatchDog] Stopping watchdog")
return
case <-time.After(30 * time.Second):
if !wd.busyCheck && !wd.idleCheck {
log.Info().Msg("[WatchDog] No checks enabled, stopping watchdog")
return
}
if wd.busyCheck {
wd.checkBusy()
}
if wd.idleCheck {
wd.checkIdle()
}
}
}
}
func (wd *WatchDog) checkIdle() {
wd.Lock()
defer wd.Unlock()
log.Debug().Msg("[WatchDog] Watchdog checks for idle connections")
for address, t := range wd.idleTime {
log.Debug().Msgf("[WatchDog] %s: idle connection", address)
if time.Since(t) > wd.idletimeout {
log.Warn().Msgf("[WatchDog] Address %s is idle for too long, killing it", address)
model, ok := wd.addressModelMap[address]
if ok {
if err := wd.pm.ShutdownModel(model); err != nil {
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
}
log.Debug().Msgf("[WatchDog] model shut down: %s", address)
delete(wd.idleTime, address)
delete(wd.addressModelMap, address)
delete(wd.addressMap, address)
} else {
log.Warn().Msgf("[WatchDog] Address %s unresolvable", address)
delete(wd.idleTime, address)
}
}
}
}
func (wd *WatchDog) checkBusy() {
wd.Lock()
defer wd.Unlock()
log.Debug().Msg("[WatchDog] Watchdog checks for busy connections")
for address, t := range wd.timetable {
log.Debug().Msgf("[WatchDog] %s: active connection", address)
if time.Since(t) > wd.timeout {
model, ok := wd.addressModelMap[address]
if ok {
log.Warn().Msgf("[WatchDog] Model %s is busy for too long, killing it", model)
if err := wd.pm.ShutdownModel(model); err != nil {
log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
}
log.Debug().Msgf("[WatchDog] model shut down: %s", address)
delete(wd.timetable, address)
delete(wd.addressModelMap, address)
delete(wd.addressMap, address)
} else {
log.Warn().Msgf("[WatchDog] Address %s unresolvable", address)
delete(wd.timetable, address)
}
}
}
}
|