Skip to content

Commit

Permalink
Merge pull request #2 from klauspost/log-timeout-caller
Browse files Browse the repository at this point in the history
Log long running notifier.
  • Loading branch information
klauspost authored Aug 22, 2016
2 parents 0c3f52e + 49a1ef8 commit d40c155
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 42 deletions.
124 changes: 83 additions & 41 deletions shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,14 @@ var (
// When you have performed your shutdown actions close the channel you are given.
type Notifier chan chan struct{}

// Internal notifier
type iNotifier struct {
n Notifier
calledFrom string
}
type fnNotify struct {
client Notifier
internal Notifier
internal iNotifier
cancel chan struct{}
}

Expand All @@ -66,7 +71,7 @@ type logWrapper struct {
}

func (l logWrapper) Printf(format string, v ...interface{}) {
l.w(format, v)
l.w(format, v...)
}

// SetLogPrinter will use the specified function to write logging information.
Expand All @@ -77,7 +82,7 @@ func SetLogPrinter(fn func(format string, v ...interface{})) {

//TODO(klauspost): These should be added to a struct for easier testing.
var sqM sync.Mutex // Mutex for below
var shutdownQueue [4][]Notifier
var shutdownQueue [4][]iNotifier
var shutdownFnQueue [4][]fnNotify
var shutdownFinished = make(chan struct{}, 0) // Closed when shutdown has finished

Expand Down Expand Up @@ -107,16 +112,19 @@ func SetTimeoutN(s Stage, d time.Duration) {
// Cancel a Notifier.
// This will remove a notifier from the shutdown queue,
// and it will not be signalled when shutdown starts.
// If the shutdown has already started this will not have any effect.
// If the shutdown has already started this will not have any effect,
// but a goroutine will wait for the notifier to be triggered.
func (s *Notifier) Cancel() {
srM.RLock()
if shutdownRequested {
srM.RUnlock()
// Wait until we get the notification and close it:
select {
case v := <-*s:
close(v)
}
go func() {
select {
case v := <-*s:
close(v)
}
}()
return
}
srM.RUnlock()
Expand All @@ -126,7 +134,7 @@ func (s *Notifier) Cancel() {
a = *s
for n, sdq := range shutdownQueue {
for i, qi := range sdq {
b = qi
b = qi.n
if a == b {
shutdownQueue[n] = append(shutdownQueue[n][:i], shutdownQueue[n][i+1:]...)
}
Expand All @@ -136,8 +144,8 @@ func (s *Notifier) Cancel() {
if a == b {
// Find the matching internal and remove that.
for i := range shutdownQueue[n] {
b = shutdownQueue[n][i]
if fn.internal == b {
b = shutdownQueue[n][i].n
if fn.internal.n == b {
shutdownQueue[n] = append(shutdownQueue[n][:i], shutdownQueue[n][i+1:]...)
}
}
Expand All @@ -161,7 +169,7 @@ func (s *Notifier) CancelWait() {
a = *s
for n, sdq := range shutdownQueue {
for i, qi := range sdq {
b = qi
b = qi.n
if a == b {
shutdownQueue[n] = append(shutdownQueue[n][:i], shutdownQueue[n][i+1:]...)
}
Expand All @@ -171,11 +179,11 @@ func (s *Notifier) CancelWait() {
if a == b {
// Find the matching internal and remove that.
for i := range shutdownQueue[n] {
if len(shutdownQueue[n]) < i {
if len(shutdownQueue[n]) <= i {
continue
}
b = shutdownQueue[n][i]
if fn.internal == b {
b = shutdownQueue[n][i].n
if fn.internal.n == b {
shutdownQueue[n] = append(shutdownQueue[n][:i], shutdownQueue[n][i+1:]...)
}
}
Expand Down Expand Up @@ -205,63 +213,64 @@ func (s *Notifier) CancelWait() {
// is signalled, before locks are released.
// This allows to for instance send signals to upstream servers not to send more requests.
func PreShutdown() Notifier {
return onShutdown(0)
return onShutdown(0, 1).n
}

// PreShutdownFn registers a function that will be called as soon as the shutdown
// is signalled, before locks are released.
// This allows to for instance send signals to upstream servers not to send more requests.
func PreShutdownFn(fn func()) Notifier {
return onFunc(0, fn)
return onFunc(0, 1, fn)
}

// First returns a notifier that will be called in the first stage of shutdowns
func First() Notifier {
return onShutdown(1)
return onShutdown(1, 1).n
}

// FirstFn executes a function in the first stage of the shutdown
func FirstFn(fn func()) Notifier {
return onFunc(1, fn)
return onFunc(1, 1, fn)
}

// Second returns a notifier that will be called in the second stage of shutdowns
func Second() Notifier {
return onShutdown(2)
return onShutdown(2, 1).n
}

// SecondFn executes a function in the second stage of the shutdown
func SecondFn(fn func()) Notifier {
return onFunc(2, fn)
return onFunc(2, 1, fn)
}

// Third returns a notifier that will be called in the third stage of shutdowns
func Third() Notifier {
return onShutdown(3)
return onShutdown(3, 1).n
}

// ThirdFn executes a function in the third stage of the shutdown
// The returned Notifier is only really useful for cancelling the shutdown function
func ThirdFn(fn func()) Notifier {
return onFunc(3, fn)
return onFunc(3, 1, fn)
}

// Create a function notifier.
func onFunc(prio int, fn func()) Notifier {
// depth is the call depth of the caller.
func onFunc(prio, depth int, fn func()) Notifier {
f := fnNotify{
internal: onShutdown(prio),
internal: onShutdown(prio, depth+1),
cancel: make(chan struct{}),
client: make(Notifier, 1),
}
go func() {
select {
case <-f.cancel:
return
case c := <-f.internal:
case c := <-f.internal.n:
{
defer func() {
if r := recover(); r != nil {
Logger.Printf("Error: Panic in shutdown function: %v", r)
Logger.Printf("Error: Panic in shutdown function: %v (%v)", r, f.internal.calledFrom)
Logger.Printf("%s", string(debug.Stack()))
}
if c != nil {
Expand All @@ -279,12 +288,18 @@ func onFunc(prio int, fn func()) Notifier {
}

// onShutdown will request a shutdown notifier.
func onShutdown(prio int) Notifier {
// depth is the call depth of the caller.
func onShutdown(prio, depth int) iNotifier {
sqM.Lock()
n := make(Notifier, 1)
shutdownQueue[prio] = append(shutdownQueue[prio], n)
in := iNotifier{n: n}
if LogLockTimeouts {
_, file, line, _ := runtime.Caller(depth + 1)
in.calledFrom = fmt.Sprintf("%s:%d", file, line)
}
shutdownQueue[prio] = append(shutdownQueue[prio], in)
sqM.Unlock()
return n
return in
}

// OnSignal will start the shutdown when any of the given signals arrive
Expand All @@ -297,7 +312,7 @@ func OnSignal(exitCode int, sig ...os.Signal) {
c := make(chan os.Signal, 1)
signal.Notify(c, sig...)
go func() {
for _ = range c {
for range c {
Shutdown()
os.Exit(exitCode)
}
Expand Down Expand Up @@ -345,11 +360,17 @@ func Shutdown() {
Logger.Printf("Shutdown stage %v", stage)
}
wait := make([]chan struct{}, len(queue))

var calledFrom []string
if LogLockTimeouts {
calledFrom = make([]string, len(queue))
}
// Send notification to all waiting
for i := range queue {
for i, n := range queue {
wait[i] = make(chan struct{})
queue[i] <- wait[i]
if LogLockTimeouts {
calledFrom[i] = n.calledFrom
}
queue[i].n <- wait[i]
}

// Send notification to all function notifiers, but don't wait
Expand All @@ -366,17 +387,32 @@ func Shutdown() {

brwait:
for i := range wait {
select {
case <-wait[i]:
case <-timeout:
Logger.Printf("timeout waiting to shutdown, forcing shutdown stage %v", stage)
break brwait
var tick <-chan time.Time
if LogLockTimeouts {
tick = time.Tick(StatusTimer)
}
wloop:
for {
select {
case <-wait[i]:
break wloop
case <-timeout:
if len(calledFrom) > 0 {
Logger.Printf("Timed out notifier created at %s", calledFrom[i])
}
Logger.Printf("Timeout waiting to shutdown, forcing shutdown stage %v.", stage)
break brwait
case <-tick:
if len(calledFrom) > 0 {
Logger.Printf("Stage %d, waiting for notifier (%s)", stage, calledFrom[i])
}
}
}
}
sqM.Lock()
}
// Reset - mainly for tests.
shutdownQueue = [4][]Notifier{}
shutdownQueue = [4][]iNotifier{}
shutdownFnQueue = [4][]fnNotify{}
close(shutdownFinished)
sqM.Unlock()
Expand All @@ -401,9 +437,15 @@ func Wait() {
<-shutdownFinished
}

// LogLockTimeouts can be disabled to disable log timeout warnings.
// LogLockTimeouts enables log timeout warnings
// and notifier status updates.
// Should not be changed once shutdown has started.
var LogLockTimeouts = true

// StatusTimer is the time between logging which notifiers are waiting to finish.
// Should not be changed once shutdown has started.
var StatusTimer = time.Minute

// Lock will signal that you have a function running,
// that you do not want to be interrupted by a shutdown.
//
Expand Down
Loading

0 comments on commit d40c155

Please sign in to comment.