diff --git a/config/config.go b/config/config.go index 8bcc7e62b2b..9909f3ffc58 100644 --- a/config/config.go +++ b/config/config.go @@ -85,10 +85,10 @@ type PoliciesConfig struct { // If you set this value to `true`, then the id parameter in a stored policy (or imported policy using the Dashboard API), will be used instead of the internal ID. // // This option should only be used when moving an installation to a new database. - AllowExplicitPolicyID bool `json:"allow_explicit_policy_id"` + AllowExplicitPolicyID bool `json:"allow_explicit_policy_id"` // This option is used for storing a policies if `policies.policy_source` is set to `file`. // it should be some existing file path on hard drive - PolicyPath string `json:"policy_path"` + PolicyPath string `json:"policy_path"` } type DBAppConfOptionsConfig struct { diff --git a/gateway/host_checker.go b/gateway/host_checker.go index 996aa73f28b..099c4c6f767 100644 --- a/gateway/host_checker.go +++ b/gateway/host_checker.go @@ -14,7 +14,6 @@ import ( "github.com/Jeffail/tunny" proxyproto "github.com/pires/go-proxyproto" - cache "github.com/pmylund/go-cache" "github.com/TykTechnologies/tyk/apidef" ) @@ -45,6 +44,11 @@ type HostHealthReport struct { IsTCPError bool } +type HostSample struct { + count int + reachedLimit bool +} + type HostUptimeChecker struct { cb HostCheckCallBacks workerPoolSize int @@ -54,11 +58,11 @@ type HostUptimeChecker struct { unHealthyList map[string]bool pool *tunny.WorkPool - errorChan chan HostHealthReport - okChan chan HostHealthReport - sampleCache *cache.Cache - stopLoop bool - muStopLoop sync.RWMutex + errorChan chan HostHealthReport + okChan chan HostHealthReport + samples *sync.Map + stopLoop bool + muStopLoop sync.RWMutex resetListMu sync.Mutex doResetList bool @@ -146,24 +150,25 @@ func (h *HostUptimeChecker) HostReporter(ctx context.Context) { } return case okHost := <-h.okChan: - // Clear host from unhealthylist if it exists - if h.unHealthyList[okHost.CheckURL] { - newVal := 1 - if count, found := h.sampleCache.Get(okHost.CheckURL); found { - newVal = count.(int) - 1 - } - - if newVal <= 0 { - // Reset the count - h.sampleCache.Delete(okHost.CheckURL) - log.Warning("[HOST CHECKER] [HOST UP]: ", okHost.CheckURL) - if h.cb.Up != nil { + // check if the the host url is in the sample map + if hostSample, found := h.samples.Load(okHost.CheckURL); found { + sample := hostSample.(HostSample) + //if it reached the h.sampleTriggerLimit, we're going to start decreasing the count value + if sample.reachedLimit { + newCount := sample.count - 1 + + if newCount <= 0 { + //if the count-1 is equals to zero, it means that the host is fully up. + + h.samples.Delete(okHost.CheckURL) + log.Warning("[HOST CHECKER] [HOST UP]: ", okHost.CheckURL) go h.cb.Up(ctx, okHost) + } else { + //in another case, we are one step closer. We just update the count number + sample.count = newCount + log.Warning("[HOST CHECKER] [HOST UP BUT NOT REACHED LIMIT]: ", okHost.CheckURL) + h.samples.Store(okHost.CheckURL, sample) } - delete(h.unHealthyList, okHost.CheckURL) - } else { - log.Warning("[HOST CHECKER] [HOST UP BUT NOT REACHED LIMIT]: ", okHost.CheckURL) - h.sampleCache.Set(okHost.CheckURL, newVal, cache.DefaultExpiration) } } if h.cb.Ping != nil { @@ -171,23 +176,36 @@ func (h *HostUptimeChecker) HostReporter(ctx context.Context) { } case failedHost := <-h.errorChan: - newVal := 1 - if count, found := h.sampleCache.Get(failedHost.CheckURL); found { - newVal = count.(int) + 1 + sample := HostSample{ + count: 1, + } + + //If a host fails, we check if it has failed already + if hostSample, found := h.samples.Load(failedHost.CheckURL); found { + sample = hostSample.(HostSample) + // we add THIS failure to the count + sample.count = sample.count + 1 } - if newVal >= h.sampleTriggerLimit { + if sample.count >= h.sampleTriggerLimit { + // if it reached the h.sampleTriggerLimit, it means the host is down for us. We update the reachedLimit flag and store it in the sample map log.Warning("[HOST CHECKER] [HOST DOWN]: ", failedHost.CheckURL) - // track it - h.unHealthyList[failedHost.CheckURL] = true - // Call the custom callback hook - if h.cb.Fail != nil { - go h.cb.Fail(ctx, failedHost) + + //if this is the first time it reached the h.sampleTriggerLimit, the value of the reachedLimit flag is stored with the new count + if sample.reachedLimit == false { + sample.reachedLimit = true + h.samples.Store(failedHost.CheckURL, sample) } + + //we call the failureCallback to keep the redis key and the host checker manager updated + go h.cb.Fail(ctx, failedHost) + } else { + //if it failed but not reached the h.sampleTriggerLimit yet, we just add the counter to the map. log.Warning("[HOST CHECKER] [HOST DOWN BUT NOT REACHED LIMIT]: ", failedHost.CheckURL) - h.sampleCache.Set(failedHost.CheckURL, newVal, cache.DefaultExpiration) + h.samples.Store(failedHost.CheckURL, sample) } + if h.cb.Ping != nil { go h.cb.Ping(ctx, failedHost) } @@ -324,7 +342,7 @@ type HostCheckCallBacks struct { } func (h *HostUptimeChecker) Init(workers, triggerLimit, timeout int, hostList map[string]HostData, cb HostCheckCallBacks) { - h.sampleCache = cache.New(30*time.Second, 30*time.Second) + h.samples = new(sync.Map) h.errorChan = make(chan HostHealthReport) h.okChan = make(chan HostHealthReport) h.HostList = hostList @@ -377,6 +395,9 @@ func (h *HostUptimeChecker) Start(ctx context.Context) { func (h *HostUptimeChecker) Stop() { if !h.getStopLoop() { h.setStopLoop(true) + h.muStopLoop.Lock() + h.samples = new(sync.Map) + h.muStopLoop.Unlock() log.Info("[HOST CHECKER] Stopping poller") h.pool.Close() } diff --git a/gateway/host_checker_test.go b/gateway/host_checker_test.go index 96ff272ed08..8d2fb18ed97 100644 --- a/gateway/host_checker_test.go +++ b/gateway/host_checker_test.go @@ -67,7 +67,6 @@ func (w *testEventHandler) HandleEvent(em config.EventMessage) { } //// ToDo check why it blocks - func TestHostChecker(t *testing.T) { ts := StartTest(func(globalConf *config.Config) { globalConf.UptimeTests.PollerGroup = uuid.NewV4().String() @@ -523,6 +522,8 @@ func TestProxyWhenHostIsDown(t *testing.T) { func TestChecker_triggerSampleLimit(t *testing.T) { ts := StartTest(nil) defer ts.Close() + ts.Gw.setTestMode(false) + defer ts.Gw.setTestMode(true) l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -534,7 +535,7 @@ func TestChecker_triggerSampleLimit(t *testing.T) { var wg sync.WaitGroup wg.Add(5) - ts.Gw.setTestMode(false) + //ts.Gw.setTestMode(false) var ( limit = 4 @@ -568,3 +569,214 @@ func TestChecker_triggerSampleLimit(t *testing.T) { assert.Equal(t, limit, ping.Load().(int), "ping count is wrong") assert.Equal(t, 1, failed.Load().(int), "expected host down to be fired once") } + +func TestChecker_HostReporter_up_then_down(t *testing.T) { + ts := StartTest(nil) + defer ts.Close() + ts.Gw.setTestMode(false) + defer ts.Gw.setTestMode(true) + + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + data := HostData{ + CheckURL: l.Addr().String(), + Protocol: "tcp", + EnableProxyProtocol: true, + Commands: []apidef.CheckCommand{ + { + Name: "send", Message: "ping", + }, { + Name: "expect", Message: "pong", + }, + }, + } + defer l.Close() + + changeResponse := make(chan bool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func(ls net.Listener, change chan bool) { + ls = &proxyproto.Listener{Listener: ls} + accept := false + for { + select { + case <-change: + accept = true + default: + s, err := ls.Accept() + if err != nil { + return + } + buf := make([]byte, 4) + _, err = s.Read(buf) + if err != nil { + return + } + if !accept { + s.Write([]byte("pong")) + } else { + s.Write([]byte("unknown")) + } + } + + } + }(l, changeResponse) + + var ( + limit = 2 + ping atomic.Value + failed atomic.Value + ) + failed.Store(0) + ping.Store(0) + + hs := &HostUptimeChecker{Gw: ts.Gw} + hs.Init(1, limit, 1, map[string]HostData{ + l.Addr().String(): data, + }, + HostCheckCallBacks{ + Fail: func(_ context.Context, _ HostHealthReport) { + failed.Store(failed.Load().(int) + 1) + }, + Up: func(_ context.Context, _ HostHealthReport) { + }, + Ping: func(_ context.Context, _ HostHealthReport) { + ping.Store(ping.Load().(int) + 1) + }, + }, + ) + + go hs.Start(ctx) + defer hs.Stop() + + for { + val := ping.Load() + if val != nil && val == 1 { + break + } + } + + changeResponse <- true + for { + val := failed.Load() + if val != nil && val.(int) == 1 { + break + } + } + + val, found := hs.samples.Load(data.CheckURL) + assert.Equal(t, true, found, "the host url should be in samples") + assert.Equal(t, 1, failed.Load().(int), "expected host down to be fired once") + + samples := val.(HostSample) + assert.Equal(t, true, samples.reachedLimit, "the host failures should have reached the error limit") + assert.Equal(t, 2, samples.count, "samples count should be 2") +} + +func TestChecker_HostReporter_down_then_up(t *testing.T) { + ts := StartTest(nil) + defer ts.Close() + ts.Gw.setTestMode(false) + defer ts.Gw.setTestMode(true) + + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + data := HostData{ + CheckURL: l.Addr().String(), + Protocol: "tcp", + EnableProxyProtocol: true, + Commands: []apidef.CheckCommand{ + { + Name: "send", Message: "ping", + }, { + Name: "expect", Message: "pong", + }, + }, + } + defer l.Close() + + changeResponse := make(chan bool) + + go func(ls net.Listener, change chan bool) { + ls = &proxyproto.Listener{Listener: ls} + accept := false + for { + select { + case <-change: + accept = true + default: + s, err := ls.Accept() + if err != nil { + return + } + buf := make([]byte, 4) + _, err = s.Read(buf) + if err != nil { + return + } + if accept { + s.Write([]byte("pong")) + } else { + s.Write([]byte("unknown")) + } + } + + } + }(l, changeResponse) + + var ( + limit = 2 + up atomic.Value + failed atomic.Value + ) + failed.Store(0) + up.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hs := &HostUptimeChecker{Gw: ts.Gw} + hs.Init(1, limit, 1, map[string]HostData{ + l.Addr().String(): data, + }, HostCheckCallBacks{ + Fail: func(_ context.Context, _ HostHealthReport) { + failed.Store(failed.Load().(int) + 1) + }, + Up: func(_ context.Context, _ HostHealthReport) { + up.Store(up.Load().(int) + 1) + }, + Ping: func(_ context.Context, _ HostHealthReport) { + }, + }, + ) + + go hs.Start(ctx) + defer hs.Stop() + + for { + val := failed.Load() + if val != nil && val.(int) == 1 { + break + } + } + + changeResponse <- true + + for { + val := up.Load() + if val != nil && val == 1 { + break + } + } + + _, found := hs.samples.Load(data.CheckURL) + assert.Equal(t, false, found, "the host url should be in samples") + assert.Equal(t, 2, failed.Load().(int), "expected host down to be fired twice") + assert.Equal(t, 1, up.Load().(int), "expected host up to be fired once") + +} diff --git a/gateway/server.go b/gateway/server.go index 365277f0fd7..e030916c62e 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -583,7 +583,7 @@ func (gw *Gateway) loadControlAPIEndpoints(muxer *mux.Router) { r.HandleFunc("/apis/{apiID}", gw.apiHandler).Methods("GET", "POST", "PUT", "DELETE") r.HandleFunc("/health", gw.healthCheckhandler).Methods("GET") r.HandleFunc("/policies", gw.polHandler).Methods("GET", "POST", "PUT", "DELETE") - r.HandleFunc("/policies/{polID}",gw.polHandler).Methods("GET", "POST", "PUT", "DELETE") + r.HandleFunc("/policies/{polID}", gw.polHandler).Methods("GET", "POST", "PUT", "DELETE") r.HandleFunc("/oauth/clients/create", gw.createOauthClient).Methods("POST") r.HandleFunc("/oauth/clients/{apiID}/{keyName:[^/]*}", gw.oAuthClientHandler).Methods("PUT") r.HandleFunc("/oauth/clients/{apiID}/{keyName:[^/]*}/rotate", gw.rotateOauthClientHandler).Methods("PUT")