Skip to content

Commit

Permalink
TT-2479 hostchecker sample backport (TykTechnologies#3672)
Browse files Browse the repository at this point in the history
* backporting hostchecker sample changes to master
  • Loading branch information
tbuchaillot authored Oct 6, 2021
1 parent bd80c30 commit b839dae
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 38 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ type PoliciesConfig struct {
// If you set this value to `true`, then the id parameter in a stored policy (or imported policy using the Dashboard API), will be used instead of the internal ID.
//
// This option should only be used when moving an installation to a new database.
AllowExplicitPolicyID bool `json:"allow_explicit_policy_id"`
AllowExplicitPolicyID bool `json:"allow_explicit_policy_id"`
// This option is used for storing a policies if `policies.policy_source` is set to `file`.
// it should be some existing file path on hard drive
PolicyPath string `json:"policy_path"`
PolicyPath string `json:"policy_path"`
}

type DBAppConfOptionsConfig struct {
Expand Down
87 changes: 54 additions & 33 deletions gateway/host_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/Jeffail/tunny"
proxyproto "github.com/pires/go-proxyproto"
cache "github.com/pmylund/go-cache"

"github.com/TykTechnologies/tyk/apidef"
)
Expand Down Expand Up @@ -45,6 +44,11 @@ type HostHealthReport struct {
IsTCPError bool
}

type HostSample struct {
count int
reachedLimit bool
}

type HostUptimeChecker struct {
cb HostCheckCallBacks
workerPoolSize int
Expand All @@ -54,11 +58,11 @@ type HostUptimeChecker struct {
unHealthyList map[string]bool
pool *tunny.WorkPool

errorChan chan HostHealthReport
okChan chan HostHealthReport
sampleCache *cache.Cache
stopLoop bool
muStopLoop sync.RWMutex
errorChan chan HostHealthReport
okChan chan HostHealthReport
samples *sync.Map
stopLoop bool
muStopLoop sync.RWMutex

resetListMu sync.Mutex
doResetList bool
Expand Down Expand Up @@ -146,48 +150,62 @@ func (h *HostUptimeChecker) HostReporter(ctx context.Context) {
}
return
case okHost := <-h.okChan:
// Clear host from unhealthylist if it exists
if h.unHealthyList[okHost.CheckURL] {
newVal := 1
if count, found := h.sampleCache.Get(okHost.CheckURL); found {
newVal = count.(int) - 1
}

if newVal <= 0 {
// Reset the count
h.sampleCache.Delete(okHost.CheckURL)
log.Warning("[HOST CHECKER] [HOST UP]: ", okHost.CheckURL)
if h.cb.Up != nil {
// check if the the host url is in the sample map
if hostSample, found := h.samples.Load(okHost.CheckURL); found {
sample := hostSample.(HostSample)
//if it reached the h.sampleTriggerLimit, we're going to start decreasing the count value
if sample.reachedLimit {
newCount := sample.count - 1

if newCount <= 0 {
//if the count-1 is equals to zero, it means that the host is fully up.

h.samples.Delete(okHost.CheckURL)
log.Warning("[HOST CHECKER] [HOST UP]: ", okHost.CheckURL)
go h.cb.Up(ctx, okHost)
} else {
//in another case, we are one step closer. We just update the count number
sample.count = newCount
log.Warning("[HOST CHECKER] [HOST UP BUT NOT REACHED LIMIT]: ", okHost.CheckURL)
h.samples.Store(okHost.CheckURL, sample)
}
delete(h.unHealthyList, okHost.CheckURL)
} else {
log.Warning("[HOST CHECKER] [HOST UP BUT NOT REACHED LIMIT]: ", okHost.CheckURL)
h.sampleCache.Set(okHost.CheckURL, newVal, cache.DefaultExpiration)
}
}
if h.cb.Ping != nil {
go h.cb.Ping(ctx, okHost)
}

case failedHost := <-h.errorChan:
newVal := 1
if count, found := h.sampleCache.Get(failedHost.CheckURL); found {
newVal = count.(int) + 1
sample := HostSample{
count: 1,
}

//If a host fails, we check if it has failed already
if hostSample, found := h.samples.Load(failedHost.CheckURL); found {
sample = hostSample.(HostSample)
// we add THIS failure to the count
sample.count = sample.count + 1
}

if newVal >= h.sampleTriggerLimit {
if sample.count >= h.sampleTriggerLimit {
// if it reached the h.sampleTriggerLimit, it means the host is down for us. We update the reachedLimit flag and store it in the sample map
log.Warning("[HOST CHECKER] [HOST DOWN]: ", failedHost.CheckURL)
// track it
h.unHealthyList[failedHost.CheckURL] = true
// Call the custom callback hook
if h.cb.Fail != nil {
go h.cb.Fail(ctx, failedHost)

//if this is the first time it reached the h.sampleTriggerLimit, the value of the reachedLimit flag is stored with the new count
if sample.reachedLimit == false {
sample.reachedLimit = true
h.samples.Store(failedHost.CheckURL, sample)
}

//we call the failureCallback to keep the redis key and the host checker manager updated
go h.cb.Fail(ctx, failedHost)

} else {
//if it failed but not reached the h.sampleTriggerLimit yet, we just add the counter to the map.
log.Warning("[HOST CHECKER] [HOST DOWN BUT NOT REACHED LIMIT]: ", failedHost.CheckURL)
h.sampleCache.Set(failedHost.CheckURL, newVal, cache.DefaultExpiration)
h.samples.Store(failedHost.CheckURL, sample)
}

if h.cb.Ping != nil {
go h.cb.Ping(ctx, failedHost)
}
Expand Down Expand Up @@ -324,7 +342,7 @@ type HostCheckCallBacks struct {
}

func (h *HostUptimeChecker) Init(workers, triggerLimit, timeout int, hostList map[string]HostData, cb HostCheckCallBacks) {
h.sampleCache = cache.New(30*time.Second, 30*time.Second)
h.samples = new(sync.Map)
h.errorChan = make(chan HostHealthReport)
h.okChan = make(chan HostHealthReport)
h.HostList = hostList
Expand Down Expand Up @@ -377,6 +395,9 @@ func (h *HostUptimeChecker) Start(ctx context.Context) {
func (h *HostUptimeChecker) Stop() {
if !h.getStopLoop() {
h.setStopLoop(true)
h.muStopLoop.Lock()
h.samples = new(sync.Map)
h.muStopLoop.Unlock()
log.Info("[HOST CHECKER] Stopping poller")
h.pool.Close()
}
Expand Down
216 changes: 214 additions & 2 deletions gateway/host_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (w *testEventHandler) HandleEvent(em config.EventMessage) {
}

//// ToDo check why it blocks

func TestHostChecker(t *testing.T) {
ts := StartTest(func(globalConf *config.Config) {
globalConf.UptimeTests.PollerGroup = uuid.NewV4().String()
Expand Down Expand Up @@ -523,6 +522,8 @@ func TestProxyWhenHostIsDown(t *testing.T) {
func TestChecker_triggerSampleLimit(t *testing.T) {
ts := StartTest(nil)
defer ts.Close()
ts.Gw.setTestMode(false)
defer ts.Gw.setTestMode(true)

l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand All @@ -534,7 +535,7 @@ func TestChecker_triggerSampleLimit(t *testing.T) {
var wg sync.WaitGroup
wg.Add(5)

ts.Gw.setTestMode(false)
//ts.Gw.setTestMode(false)

var (
limit = 4
Expand Down Expand Up @@ -568,3 +569,214 @@ func TestChecker_triggerSampleLimit(t *testing.T) {
assert.Equal(t, limit, ping.Load().(int), "ping count is wrong")
assert.Equal(t, 1, failed.Load().(int), "expected host down to be fired once")
}

func TestChecker_HostReporter_up_then_down(t *testing.T) {
ts := StartTest(nil)
defer ts.Close()
ts.Gw.setTestMode(false)
defer ts.Gw.setTestMode(true)

l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
data := HostData{
CheckURL: l.Addr().String(),
Protocol: "tcp",
EnableProxyProtocol: true,
Commands: []apidef.CheckCommand{
{
Name: "send", Message: "ping",
}, {
Name: "expect", Message: "pong",
},
},
}
defer l.Close()

changeResponse := make(chan bool)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func(ls net.Listener, change chan bool) {
ls = &proxyproto.Listener{Listener: ls}
accept := false
for {
select {
case <-change:
accept = true
default:
s, err := ls.Accept()
if err != nil {
return
}
buf := make([]byte, 4)
_, err = s.Read(buf)
if err != nil {
return
}
if !accept {
s.Write([]byte("pong"))
} else {
s.Write([]byte("unknown"))
}
}

}
}(l, changeResponse)

var (
limit = 2
ping atomic.Value
failed atomic.Value
)
failed.Store(0)
ping.Store(0)

hs := &HostUptimeChecker{Gw: ts.Gw}
hs.Init(1, limit, 1, map[string]HostData{
l.Addr().String(): data,
},
HostCheckCallBacks{
Fail: func(_ context.Context, _ HostHealthReport) {
failed.Store(failed.Load().(int) + 1)
},
Up: func(_ context.Context, _ HostHealthReport) {
},
Ping: func(_ context.Context, _ HostHealthReport) {
ping.Store(ping.Load().(int) + 1)
},
},
)

go hs.Start(ctx)
defer hs.Stop()

for {
val := ping.Load()
if val != nil && val == 1 {
break
}
}

changeResponse <- true
for {
val := failed.Load()
if val != nil && val.(int) == 1 {
break
}
}

val, found := hs.samples.Load(data.CheckURL)
assert.Equal(t, true, found, "the host url should be in samples")
assert.Equal(t, 1, failed.Load().(int), "expected host down to be fired once")

samples := val.(HostSample)
assert.Equal(t, true, samples.reachedLimit, "the host failures should have reached the error limit")
assert.Equal(t, 2, samples.count, "samples count should be 2")
}

func TestChecker_HostReporter_down_then_up(t *testing.T) {
ts := StartTest(nil)
defer ts.Close()
ts.Gw.setTestMode(false)
defer ts.Gw.setTestMode(true)

l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
data := HostData{
CheckURL: l.Addr().String(),
Protocol: "tcp",
EnableProxyProtocol: true,
Commands: []apidef.CheckCommand{
{
Name: "send", Message: "ping",
}, {
Name: "expect", Message: "pong",
},
},
}
defer l.Close()

changeResponse := make(chan bool)

go func(ls net.Listener, change chan bool) {
ls = &proxyproto.Listener{Listener: ls}
accept := false
for {
select {
case <-change:
accept = true
default:
s, err := ls.Accept()
if err != nil {
return
}
buf := make([]byte, 4)
_, err = s.Read(buf)
if err != nil {
return
}
if accept {
s.Write([]byte("pong"))
} else {
s.Write([]byte("unknown"))
}
}

}
}(l, changeResponse)

var (
limit = 2
up atomic.Value
failed atomic.Value
)
failed.Store(0)
up.Store(0)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hs := &HostUptimeChecker{Gw: ts.Gw}
hs.Init(1, limit, 1, map[string]HostData{
l.Addr().String(): data,
}, HostCheckCallBacks{
Fail: func(_ context.Context, _ HostHealthReport) {
failed.Store(failed.Load().(int) + 1)
},
Up: func(_ context.Context, _ HostHealthReport) {
up.Store(up.Load().(int) + 1)
},
Ping: func(_ context.Context, _ HostHealthReport) {
},
},
)

go hs.Start(ctx)
defer hs.Stop()

for {
val := failed.Load()
if val != nil && val.(int) == 1 {
break
}
}

changeResponse <- true

for {
val := up.Load()
if val != nil && val == 1 {
break
}
}

_, found := hs.samples.Load(data.CheckURL)
assert.Equal(t, false, found, "the host url should be in samples")
assert.Equal(t, 2, failed.Load().(int), "expected host down to be fired twice")
assert.Equal(t, 1, up.Load().(int), "expected host up to be fired once")

}
Loading

0 comments on commit b839dae

Please sign in to comment.