forked from jpillora/overseer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproc_slave.go
164 lines (153 loc) · 4 KB
/
proc_slave.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package overseer
import (
"fmt"
"log"
"net"
"os"
"os/signal"
"strconv"
"syscall"
"time"
)
var (
//DisabledState is a placeholder state for when
//overseer is disabled and the program function
//is run manually.
DisabledState = State{Enabled: false}
)
type State struct {
//whether overseer is running enabled. When enabled,
//this program will be running in a child process and
//overseer will perform rolling upgrades.
Enabled bool
//ID is a SHA-1 hash of the current running binary
ID string
//StartedAt records the start time of the program
StartedAt time.Time
//Listener is the first net.Listener in Listeners
Listener net.Listener
//Listeners are the set of acquired sockets by the master
//process. These are all passed into this program in the
//same order they are specified in Config.Addresses.
Listeners []net.Listener
//Program's first listening address
Address string
//Program's listening addresses
Addresses []string
//GracefulShutdown will be filled when its time to perform
//a graceful shutdown.
GracefulShutdown chan bool
}
//a overseer slave process
type slave struct {
*Config
id string
listeners []*upListener
masterPid int
masterProc *os.Process
state State
}
func (sp *slave) run() error {
sp.id = os.Getenv(envSlaveID)
sp.debugf("run")
sp.state.Enabled = true
sp.state.ID = os.Getenv(envBinID)
sp.state.StartedAt = time.Now()
sp.state.Address = sp.Config.Address
sp.state.Addresses = sp.Config.Addresses
sp.state.GracefulShutdown = make(chan bool, 1)
if err := sp.watchParent(); err != nil {
return err
}
if err := sp.initFileDescriptors(); err != nil {
return err
}
sp.watchSignal()
//run program with state
sp.debugf("start program")
sp.Config.Program(sp.state)
return nil
}
func (sp *slave) watchParent() error {
sp.masterPid = os.Getppid()
proc, err := os.FindProcess(sp.masterPid)
if err != nil {
return fmt.Errorf("master process: %s", err)
}
sp.masterProc = proc
go func() {
//send signal 0 to master process forever
for {
//should not error as long as the process is alive
if err := sp.masterProc.Signal(syscall.Signal(0)); err != nil {
os.Exit(1)
}
time.Sleep(2 * time.Second)
}
}()
return nil
}
func (sp *slave) initFileDescriptors() error {
//inspect file descriptors
numFDs, err := strconv.Atoi(os.Getenv(envNumFDs))
if err != nil {
return fmt.Errorf("invalid %s integer", envNumFDs)
}
sp.listeners = make([]*upListener, numFDs)
sp.state.Listeners = make([]net.Listener, numFDs)
for i := 0; i < numFDs; i++ {
f := os.NewFile(uintptr(3+i), "")
l, err := net.FileListener(f)
if err != nil {
return fmt.Errorf("failed to inherit file descriptor: %d", i)
}
u := newUpListener(l)
sp.listeners[i] = u
sp.state.Listeners[i] = u
}
if len(sp.state.Listeners) > 0 {
sp.state.Listener = sp.state.Listeners[0]
}
return nil
}
func (sp *slave) watchSignal() {
signals := make(chan os.Signal)
signal.Notify(signals, sp.Config.RestartSignal)
go func() {
<-signals
signal.Stop(signals)
sp.debugf("graceful shutdown requested")
//master wants to restart,
close(sp.state.GracefulShutdown)
//release any sockets and notify master
if len(sp.listeners) > 0 {
//perform graceful shutdown
for _, l := range sp.listeners {
l.release(sp.Config.TerminateTimeout)
}
//signal release of held sockets, allows master to start
//a new process before this child has actually exited.
//early restarts not supported with restarts disabled.
if !sp.NoRestart {
sp.masterProc.Signal(SIGUSR1)
}
//listeners should be waiting on connections to close...
}
//start death-timer
go func() {
time.Sleep(sp.Config.TerminateTimeout)
sp.debugf("timeout. forceful shutdown")
os.Exit(1)
}()
}()
}
func (sp *slave) debugf(f string, args ...interface{}) {
if sp.Config.Debug {
log.Printf("[overseer slave#"+sp.id+"] "+f, args...)
}
}
func (sp *slave) warnf(f string, args ...interface{}) {
if sp.Config.Debug || !sp.Config.NoWarn {
log.Printf("[overseer slave#"+sp.id+"] "+f, args...)
}
}