Skip to content

Commit

Permalink
0.1.3 release
Browse files Browse the repository at this point in the history
New features:
- Disaster recovery: befw will set latest working rules if something
goes wrong
- Consisntency check: if somehow ipset or rules content is changed, befw
will restore it
  • Loading branch information
annmuor committed Oct 11, 2019
1 parent a4964a6 commit e961bd7
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 50 deletions.
14 changes: 2 additions & 12 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,2 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out
*.conf
befw-*
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
### 0.1.3
- befw now checks if its rules is consistent
- befw now can recover firewall access if consul is dead with a hard-coded ( TOTO: configured ) networks

### 0.1.2
- fix 0.0.0.0/0 centos7 ipset bug
- befw-sync now wipes old records
- documentation fixes
- befw-sync timeouts & races fixed

### 0.1.1
- Uses short hostnames instead of FQDN
- Additional sleep(s) if errors repeat
- Fix ipset refresh
- Fix static ipset aliases

### 0.1.0
- a huge documentation update
- befw-firewalld now supports configuration file
Expand All @@ -23,4 +39,4 @@
### 0.0.2
- Alias (befw/$alias$/*) support
### 0.0.1
- Initial version
- Initial version
10 changes: 9 additions & 1 deletion src/befw/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
type config struct {
ConsulAddr string
ConsulDC string
NodeName string
NodeDC string
ConsulToken string
ServicesDir string
IPSetDir string
Expand All @@ -44,7 +46,7 @@ type serviceClient struct {
}

type service struct {
ServiceName string `json:"Name"`
ServiceName string `json:"name"`
ServiceProtocol befwServiceProto `json:"protocol"`
ServicePort uint16 `json:"port"`
ServicePorts []port `json:"ports"`
Expand Down Expand Up @@ -116,6 +118,12 @@ func createConfig(configFile string) *config {
if v, ok := kv["rules"]; ok {
ret.RulesPath = v
}
if v, ok := kv["nodename"]; ok {
ret.NodeName = v
}
if v, ok := kv["nodedc"]; ok {
ret.NodeDC = v
}
if _, ok := kv["fail"]; ok {
LogError("[Config] you must edit your Config file before proceed")
}
Expand Down
2 changes: 1 addition & 1 deletion src/befw/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func cutIPSet(ipsetName string) string {
if len(ipsetName) > 31 { // max size of links
parts := strings.Split(ipsetName, "_")
leftLength := 31 - len(parts[len(parts)-1]) // we can't reduce last part
maxPartLen := int((leftLength/(len(parts)-1) - 1))
maxPartLen := int(leftLength/(len(parts)-1) - 1)
for i := 0; i < len(parts)-1; i++ {
if len(parts[i]) > maxPartLen {
parts[i] = string([]byte(parts[i])[0:maxPartLen]) // trim to size
Expand Down
17 changes: 12 additions & 5 deletions src/befw/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,18 @@ func newState(configFile string) *state {
if self, e := state.consulClient.Agent().Self(); e != nil {
LogError("Can't connect to consul cluster. Error:", e.Error())
} else {
state.nodeDC = self["Config"]["Datacenter"].(string)
state.nodeName = self["Config"]["NodeName"].(string)
if cfg.NodeDC != "" {
state.nodeDC = cfg.NodeDC
} else {
state.nodeDC = self["Config"]["Datacenter"].(string)
}
if cfg.NodeName != "" {
state.nodeName = cfg.NodeName
} else {
state.nodeName = self["Config"]["NodeName"].(string)
}
state.nodeDC = strings.ToLower(state.nodeDC)
state.nodeName = strings.ToLower(strings.Split(state.nodeName, ".")[0])
}
state.NodeServices = make([]service, 0)
return state
Expand Down Expand Up @@ -335,9 +345,6 @@ func (state *state) generateKVPaths(newServiceName string) []string {
fmt.Sprintf("befw/%s/%s/", state.nodeDC, newServiceName),
fmt.Sprintf("befw/%s/", newServiceName),
}
if idx := strings.Index(state.nodeName, "."); idx > 0 {
ret = append(ret, fmt.Sprintf("befw/%s/%s/%s/", state.nodeDC, state.nodeName[:idx], newServiceName))
}
return ret
}

Expand Down
4 changes: 2 additions & 2 deletions src/befw/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestGenerateKVPaths(t *testing.T) {
if len(p1) != 3 {
t.Error("Len != 3 in p1")
}
if len(p2) != 4 {
t.Error("Len != 4 in p2")
if len(p2) != 3 {
t.Error("Len != 3 in p2")
}
}
6 changes: 6 additions & 0 deletions src/puppetdbsync/puppetdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ func (conf *syncConfig) newSyncData(message string) *syncData {
return nil
}
elems := strings.Split(message, "@")
for i := 0; i < len(elems); i++ {
elems[i] = strings.ToLower(elems[i]) // tolower
}
switch len(elems) {
case 2:
ret.service = elems[0]
Expand All @@ -173,6 +176,9 @@ func (conf *syncConfig) newSyncData(message string) *syncData {
default:
return nil
}
if ret.node != "" {
ret.node = strings.Split(ret.node, ".")[0] // remove ..xxx
}
if conf.validate(ret) {
return ret
}
Expand Down
5 changes: 5 additions & 0 deletions src/puppetdbsync/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func keepLock(config *syncConfig) {
for {
canRunMutex.Lock()
canRun = config.manageSessionLock()
if lastState != canRun {
config.lastCounter = 999
befw.LogInfo("[Syncer] We got lock - refreshing puppetdb")
}
lastState = canRun // state changed
canRunMutex.Unlock()
select {
case <-exitChan:
Expand Down
101 changes: 73 additions & 28 deletions src/puppetdbsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ func (conf *syncConfig) makeHotCache() {
return
} else {
for _, node := range nodes {
conf.cache.nodes[dc+"@"+node.Node] = nil
nodeName := strings.Split(node.Node, ".")[0]
conf.cache.nodes[dc+"@"+nodeName] = nil
}
}
}
Expand All @@ -154,9 +155,7 @@ func (conf *syncConfig) writeSyncData(data *syncData) {
defer conf.servicesWG.Done()
path := fmt.Sprintf("%s/%s", data.service, data.value)
if data.node != "" {
//
nodeShortHostName := data.node[:strings.Index(data.node, ".")]
path = fmt.Sprintf("%s/%s", nodeShortHostName, path)
path = fmt.Sprintf("%s/%s", data.node, path)
}
if data.dc != "" {
path = fmt.Sprintf("%s/%s", data.dc, path)
Expand All @@ -178,45 +177,91 @@ func (conf *syncConfig) writeSyncData(data *syncData) {
}

func (conf *syncConfig) manageSession() {
nodeName := fmt.Sprintf("%s.%s", conf.consulDC, conf.nodeName)
// 1. register main node
conf.consulClient.Catalog().Register(&api.CatalogRegistration{
Node: nodeName,
Address: conf.nodeAddr,
Datacenter: conf.consulDC,
SkipNodeUpdate: true,
}, nil)
if conf.sessionID == "" {
if sess, _, e := conf.consulClient.Session().CreateNoChecks(
&api.SessionEntry{
Node: nodeName,
Name: "befw-sync",
TTL: "30s",
}, nil); e == nil {
conf.sessionID = sess
//nodeName := fmt.Sprintf("%s.%s", conf.consulDC, conf.nodeName)
errcount := 0
for errcount < 10 {
//// 1. register main node
if _, e := conf.consulClient.Catalog().Register(&api.CatalogRegistration{
Node: conf.nodeName,
Address: conf.nodeAddr,
Datacenter: conf.consulDC,
}, nil); e != nil {
befw.LogWarning("[Syncer] can't register a node!")
}
} else {
if se, _, e := conf.consulClient.Session().Info(conf.sessionID, nil); e != nil || se == nil {
befw.LogDebug("[Syncer] Can't find session:", conf.sessionID)
conf.sessionID = ""
conf.manageSession() // a bit recursive
befw.LogDebug("[Syncer] starting session creation")
if conf.sessionID == "" {
if sess, _, e := conf.consulClient.Session().CreateNoChecks(
&api.SessionEntry{
Node: conf.nodeName,
Name: "befw-sync",
TTL: "40s",
}, &api.WriteOptions{Datacenter: conf.consulDC}); e == nil {
conf.sessionID = sess
} else {
befw.LogWarning("[Syncer] Can't create session: ", e.Error())
errcount++
continue
}
} else {
conf.consulClient.Session().Renew(conf.sessionID, nil)
if se, _, e := conf.consulClient.Session().Info(conf.sessionID, nil); e != nil {
conf.sessionID = ""
befw.LogDebug("[Syncer] error while getting session: ", conf.sessionID, ", ", e.Error())
errcount++
continue
} else if se == nil {
conf.sessionID = ""
befw.LogDebug("[Syncer] Can't find session:", conf.sessionID)
errcount++
continue
}
if se, _, e := conf.consulClient.Session().Renew(conf.sessionID, nil); e != nil {
conf.sessionID = ""
befw.LogDebug("[Syncer] error while renewning session: ", conf.sessionID, ", ", e.Error())
errcount++
continue
} else if se == nil {
conf.sessionID = ""
befw.LogDebug("[Syncer] Can't find session:", conf.sessionID)
errcount++
continue
}
}
break
}
befw.LogDebug("[Syncer] got session ", conf.sessionID)
}

func (conf *syncConfig) getSessionHolder(session string) string {
if se, _, e := conf.consulClient.Session().Info(session, nil); e == nil && se != nil {
return fmt.Sprintf("%s@%s", se.Name, se.Node)
}
return ""
}

var lastState bool

func (conf *syncConfig) manageSessionLock() bool {
conf.manageSession()
if conf.sessionID != "" {
if v, _, e := conf.consulClient.KV().Acquire(
&api.KVPair{Key: "befw/.lock",
Value: []byte("ok"),
Value: []byte(conf.nodeName),
Session: conf.sessionID,
}, nil); e != nil {
}, &api.WriteOptions{Datacenter: conf.consulDC}); e != nil {
befw.LogWarning("[Syncer] Can't create lock:", e.Error())
return false
} else {
if !v {
if kv, _, e := conf.consulClient.KV().Get("befw/.lock", nil); e == nil {
if kv.Session != "" {
if si := conf.getSessionHolder(kv.Session); si != "" {
befw.LogInfo("[Syncer] key is locked by ", si)
}
}
}
} else {
befw.LogInfo("[Syncer] Lock acquired by me")
}
return v
}
}
Expand Down

0 comments on commit e961bd7

Please sign in to comment.