File size: 4,234 Bytes
7def60a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package model

import (
	"sync"
	"time"

	process "github.com/mudler/go-processmanager"
	"github.com/rs/zerolog/log"
)

// All GRPC Clients created by ModelLoader should have an associated injected
// watchdog that will keep track of the state of each backend (busy or not)
// and for how much time it has been busy.
// If a backend is busy for too long, the watchdog will kill the process and
// force a reload of the model
// The watchdog runs as a separate go routine,
// and the GRPC client talks to it via a channel to send status updates

type WatchDog struct {
	sync.Mutex
	timetable            map[string]time.Time
	idleTime             map[string]time.Time
	timeout, idletimeout time.Duration
	addressMap           map[string]*process.Process
	addressModelMap      map[string]string
	pm                   ProcessManager
	stop                 chan bool

	busyCheck, idleCheck bool
}

type ProcessManager interface {
	ShutdownModel(modelName string) error
}

func NewWatchDog(pm ProcessManager, timeoutBusy, timeoutIdle time.Duration, busy, idle bool) *WatchDog {
	return &WatchDog{
		timeout:         timeoutBusy,
		idletimeout:     timeoutIdle,
		pm:              pm,
		timetable:       make(map[string]time.Time),
		idleTime:        make(map[string]time.Time),
		addressMap:      make(map[string]*process.Process),
		busyCheck:       busy,
		idleCheck:       idle,
		addressModelMap: make(map[string]string),
	}
}

func (wd *WatchDog) Shutdown() {
	wd.Lock()
	defer wd.Unlock()
	wd.stop <- true
}

func (wd *WatchDog) AddAddressModelMap(address string, model string) {
	wd.Lock()
	defer wd.Unlock()
	wd.addressModelMap[address] = model

}
func (wd *WatchDog) Add(address string, p *process.Process) {
	wd.Lock()
	defer wd.Unlock()
	wd.addressMap[address] = p
}

func (wd *WatchDog) Mark(address string) {
	wd.Lock()
	defer wd.Unlock()
	wd.timetable[address] = time.Now()
	delete(wd.idleTime, address)
}

func (wd *WatchDog) UnMark(ModelAddress string) {
	wd.Lock()
	defer wd.Unlock()
	delete(wd.timetable, ModelAddress)
	wd.idleTime[ModelAddress] = time.Now()
}

func (wd *WatchDog) Run() {
	log.Info().Msg("[WatchDog] starting watchdog")

	for {
		select {
		case <-wd.stop:
			log.Info().Msg("[WatchDog] Stopping watchdog")
			return
		case <-time.After(30 * time.Second):
			if !wd.busyCheck && !wd.idleCheck {
				log.Info().Msg("[WatchDog] No checks enabled, stopping watchdog")
				return
			}
			if wd.busyCheck {
				wd.checkBusy()
			}
			if wd.idleCheck {
				wd.checkIdle()
			}
		}
	}
}

func (wd *WatchDog) checkIdle() {
	wd.Lock()
	defer wd.Unlock()
	log.Debug().Msg("[WatchDog] Watchdog checks for idle connections")
	for address, t := range wd.idleTime {
		log.Debug().Msgf("[WatchDog] %s: idle connection", address)
		if time.Since(t) > wd.idletimeout {
			log.Warn().Msgf("[WatchDog] Address %s is idle for too long, killing it", address)
			model, ok := wd.addressModelMap[address]
			if ok {
				if err := wd.pm.ShutdownModel(model); err != nil {
					log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
				}
				log.Debug().Msgf("[WatchDog] model shut down: %s", address)
				delete(wd.idleTime, address)
				delete(wd.addressModelMap, address)
				delete(wd.addressMap, address)
			} else {
				log.Warn().Msgf("[WatchDog] Address %s unresolvable", address)
				delete(wd.idleTime, address)
			}
		}
	}
}

func (wd *WatchDog) checkBusy() {
	wd.Lock()
	defer wd.Unlock()
	log.Debug().Msg("[WatchDog] Watchdog checks for busy connections")

	for address, t := range wd.timetable {
		log.Debug().Msgf("[WatchDog] %s: active connection", address)

		if time.Since(t) > wd.timeout {

			model, ok := wd.addressModelMap[address]
			if ok {
				log.Warn().Msgf("[WatchDog] Model %s is busy for too long, killing it", model)
				if err := wd.pm.ShutdownModel(model); err != nil {
					log.Error().Err(err).Str("model", model).Msg("[watchdog] error shutting down model")
				}
				log.Debug().Msgf("[WatchDog] model shut down: %s", address)
				delete(wd.timetable, address)
				delete(wd.addressModelMap, address)
				delete(wd.addressMap, address)
			} else {
				log.Warn().Msgf("[WatchDog] Address %s unresolvable", address)
				delete(wd.timetable, address)
			}
		}
	}
}