Skip to content

Commit

Permalink
Clean up logger; make it honor --log-path flag.
Browse files Browse the repository at this point in the history
Add functional options to NewGossipMemberSet.
  • Loading branch information
travisturner committed Mar 27, 2018
1 parent 5a95a49 commit 9a2aeac
Show file tree
Hide file tree
Showing 25 changed files with 410 additions and 356 deletions.
96 changes: 41 additions & 55 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"errors"
"fmt"
"hash/fnv"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
Expand Down Expand Up @@ -268,8 +266,7 @@ type Cluster struct {
closing chan struct{}
prefect SecurityManager

// The writer for any logging.
LogOutput io.Writer
Logger Logger

//
RemoteClient *http.Client
Expand All @@ -289,16 +286,11 @@ func NewCluster() *Cluster {
closing: make(chan struct{}),
joining: make(chan struct{}),

LogOutput: os.Stderr,
prefect: &NopSecurityManager{},
Logger: NopLogger,
prefect: &NopSecurityManager{},
}
}

// logger returns a logger for the cluster.
func (c *Cluster) logger() *log.Logger {
return log.New(c.LogOutput, "", log.LstdFlags)
}

// Coordinator returns the coordinator node.
func (c *Cluster) CoordinatorNode() *Node {
return c.nodeByID(c.Coordinator)
Expand Down Expand Up @@ -359,7 +351,7 @@ func (c *Cluster) UpdateCoordinator(n *Node) bool {
// AddNode adds a node to the Cluster and updates and saves the
// new topology.
func (c *Cluster) AddNode(node *Node) error {
c.logger().Printf("add node %s to cluster on %s", node, c.Node)
c.Logger.Printf("add node %s to cluster on %s", node, c.Node)

// If the node being added is the coordinator, set it for this node.
if node.IsCoordinator {
Expand Down Expand Up @@ -437,7 +429,7 @@ func (c *Cluster) setState(state string) {
return
}

c.logger().Printf("change cluster state from %s to %s on %s", c.state, state, c.Node.ID)
c.Logger.Printf("change cluster state from %s to %s on %s", c.state, state, c.Node.ID)

var doCleanup bool

Expand Down Expand Up @@ -471,7 +463,7 @@ func (c *Cluster) setState(state string) {

// Clean holder.
if err := cleaner.CleanHolder(); err != nil {
c.logger().Printf("holder clean error: err=%s", err)
c.Logger.Printf("holder clean error: err=%s", err)
}
}
}
Expand All @@ -487,7 +479,7 @@ func (c *Cluster) SetNodeState(state string) error {
State: state,
}

c.logger().Printf("Sending State %s (%s)", state, c.Coordinator)
c.Logger.Printf("Sending State %s (%s)", state, c.Coordinator)
if err := c.sendTo(c.CoordinatorNode(), ns); err != nil {
return fmt.Errorf("sending node state error: err=%s", err)
}
Expand All @@ -509,7 +501,7 @@ func (c *Cluster) ReceiveNodeState(nodeID string, state string) error {
}

c.Topology.nodeStates[nodeID] = state
c.logger().Printf("received state %s (%s)", state, nodeID)
c.Logger.Printf("received state %s (%s)", state, nodeID)

// Set cluster state to NORMAL.
if c.haveTopologyAgreement() && c.allNodesReady() {
Expand Down Expand Up @@ -951,9 +943,9 @@ func (c *Cluster) Open() error {
return fmt.Errorf("sending restart NodeJoin: %v", err)
}

c.logger().Printf("wait for joining to complete")
c.Logger.Printf("wait for joining to complete")
<-c.joining
c.logger().Printf("joining has completed")
c.Logger.Printf("joining has completed")
}

return nil
Expand All @@ -968,7 +960,7 @@ func (c *Cluster) Close() error {
}

func (c *Cluster) markAsJoined() {
c.logger().Printf("mark node as joined (received coordinator update)")
c.Logger.Printf("mark node as joined (received coordinator update)")
if !c.joined {
c.joined = true
close(c.joining)
Expand Down Expand Up @@ -1001,9 +993,9 @@ func (c *Cluster) allNodesReady() bool {
func (c *Cluster) handleNodeAction(nodeAction nodeAction) error {
j, err := c.generateResizeJob(nodeAction)
if err != nil {
c.logger().Printf("generateResizeJob error: err=%s", err)
c.Logger.Printf("generateResizeJob error: err=%s", err)
if err := c.setStateAndBroadcast(ClusterStateNormal); err != nil {
c.logger().Printf("setStateAndBroadcast error: err=%s", err)
c.Logger.Printf("setStateAndBroadcast error: err=%s", err)
}
return err
}
Expand All @@ -1017,15 +1009,15 @@ func (c *Cluster) handleNodeAction(nodeAction nodeAction) error {
})

// Wait for the ResizeJob to finish or be aborted.
c.logger().Printf("wait for jobResult")
c.Logger.Printf("wait for jobResult")
jobResult := <-j.result

// Make sure j.Run() didn't return an error.
if eg.Wait() != nil {
return err
}

c.logger().Printf("received jobResult: %s", jobResult)
c.Logger.Printf("received jobResult: %s", jobResult)
switch jobResult {
case ResizeJobStateDone:
if err := c.CompleteCurrentJob(ResizeJobStateDone); err != nil {
Expand All @@ -1048,7 +1040,7 @@ func (c *Cluster) handleNodeAction(nodeAction nodeAction) error {
func (c *Cluster) setStateAndBroadcast(state string) error {
c.SetState(state)
// Broadcast cluster status changes to the cluster.
c.logger().Printf("broadcasting ClusterStatus: %s", state)
c.Logger.Printf("broadcasting ClusterStatus: %s", state)
return c.Broadcaster.SendSync(c.Status())
}

Expand Down Expand Up @@ -1081,7 +1073,7 @@ func (c *Cluster) listenForJoins() {
case nodeAction := <-c.joiningLeavingNodes:
err := c.handleNodeAction(nodeAction)
if err != nil {
c.logger().Printf("handleNodeAction error: err=%s", err)
c.Logger.Printf("handleNodeAction error: err=%s", err)
continue
}
setNormal = true
Expand All @@ -1093,7 +1085,7 @@ func (c *Cluster) listenForJoins() {
if setNormal {
// Put the cluster back to state NORMAL and broadcast.
if err := c.setStateAndBroadcast(ClusterStateNormal); err != nil {
c.logger().Printf("setStateAndBroadcast error: err=%s", err)
c.Logger.Printf("setStateAndBroadcast error: err=%s", err)
}
}

Expand All @@ -1104,7 +1096,7 @@ func (c *Cluster) listenForJoins() {
case nodeAction := <-c.joiningLeavingNodes:
err := c.handleNodeAction(nodeAction)
if err != nil {
c.logger().Printf("handleNodeAction error: err=%s", err)
c.Logger.Printf("handleNodeAction error: err=%s", err)
continue
}
setNormal = true
Expand All @@ -1117,15 +1109,15 @@ func (c *Cluster) listenForJoins() {
// added/removed. It also saves a reference to the ResizeJob in the `jobs` map
// for future lookup by JobID.
func (c *Cluster) generateResizeJob(nodeAction nodeAction) (*ResizeJob, error) {
c.logger().Printf("generateResizeJob: %v", nodeAction)
c.Logger.Printf("generateResizeJob: %v", nodeAction)
c.mu.Lock()
defer c.mu.Unlock()

j, err := c.generateResizeJobByAction(nodeAction)
if err != nil {
return nil, err
}
c.logger().Printf("generated ResizeJob: %d", j.ID)
c.Logger.Printf("generated ResizeJob: %d", j.ID)

// Save job in jobs map for future reference.
c.jobs[j.ID] = j
Expand Down Expand Up @@ -1215,14 +1207,14 @@ func (c *Cluster) CompleteCurrentJob(state string) error {

// FollowResizeInstruction is run by any node that receives a ResizeInstruction.
func (c *Cluster) FollowResizeInstruction(instr *internal.ResizeInstruction) error {
c.logger().Printf("follow resize instruction on %s", c.Node.ID)
c.Logger.Printf("follow resize instruction on %s", c.Node.ID)
// Make sure the cluster status on this node agrees with the Coordinator
// before attempting a resize.
if err := c.MergeClusterStatus(instr.ClusterStatus); err != nil {
return err
}

c.logger().Printf("MergeClusterStatus done, start goroutine")
c.Logger.Printf("MergeClusterStatus done, start goroutine")

// The actual resizing runs in a goroutine because we don't want to block
// the distribution of other ResizeInstructions to the rest of the cluster.
Expand All @@ -1242,7 +1234,7 @@ func (c *Cluster) FollowResizeInstruction(instr *internal.ResizeInstruction) err
if err := func() error {

// Sync the schema received in the resize instruction.
c.logger().Printf("Holder ApplySchema")
c.Logger.Printf("Holder ApplySchema")
if err := c.Holder.ApplySchema(instr.Schema); err != nil {
return err
}
Expand All @@ -1252,7 +1244,7 @@ func (c *Cluster) FollowResizeInstruction(instr *internal.ResizeInstruction) err

// Request each source file in ResizeSources.
for _, src := range instr.Sources {
c.logger().Printf("get slice %d for index %s from host %s", src.Slice, src.Index, src.Node.URI)
c.Logger.Printf("get slice %d for index %s from host %s", src.Slice, src.Index, src.Node.URI)

srcURI := decodeURI(src.Node.URI)

Expand All @@ -1275,7 +1267,7 @@ func (c *Cluster) FollowResizeInstruction(instr *internal.ResizeInstruction) err
}

// Stream slice from remote node.
c.logger().Printf("retrieve slice %d for index %s from host %s", src.Slice, src.Index, src.Node.URI)
c.Logger.Printf("retrieve slice %d for index %s from host %s", src.Slice, src.Index, src.Node.URI)
rd, err := client.RetrieveSliceFromURI(context.Background(), src.Index, src.Frame, src.View, src.Slice, srcURI)
if err != nil {
// For now it is an acceptable error if the fragment is not found
Expand Down Expand Up @@ -1309,7 +1301,7 @@ func (c *Cluster) FollowResizeInstruction(instr *internal.ResizeInstruction) err
}

if err := c.sendTo(DecodeNode(instr.Coordinator), complete); err != nil {
c.logger().Printf("sending resizeInstructionComplete error: err=%s", err)
c.Logger.Printf("sending resizeInstructionComplete error: err=%s", err)
}
}()
return nil
Expand Down Expand Up @@ -1363,13 +1355,7 @@ type ResizeJob struct {
mu sync.RWMutex
state string

// The writer for any logging.
LogOutput io.Writer
}

// logger returns a logger for the resize job.
func (j *ResizeJob) logger() *log.Logger {
return log.New(j.LogOutput, "", log.LstdFlags)
Logger Logger
}

// NewResizeJob returns a new instance of ResizeJob.
Expand Down Expand Up @@ -1397,11 +1383,11 @@ func NewResizeJob(existingNodes []*Node, node *Node, action string) *ResizeJob {
}

return &ResizeJob{
ID: rand.Int63(),
IDs: ids,
action: action,
result: make(chan string),
LogOutput: os.Stderr,
ID: rand.Int63(),
IDs: ids,
action: action,
result: make(chan string),
Logger: NopLogger,
}
}

Expand All @@ -1425,18 +1411,18 @@ func (j *ResizeJob) setState(state string) {

// Run distributes ResizeInstructions.
func (j *ResizeJob) Run() error {
j.logger().Printf("run ResizeJob")
j.Logger.Printf("run ResizeJob")
// Set job state to RUNNING.
j.SetState(ResizeJobStateRunning)

// Job can be considered done in the case where it doesn't require any action.
if !j.nodesArePending() {
j.logger().Printf("ResizeJob contains no pending tasks; mark as done")
j.Logger.Printf("ResizeJob contains no pending tasks; mark as done")
j.result <- ResizeJobStateDone
return nil
}

j.logger().Printf("distribute tasks for ResizeJob")
j.Logger.Printf("distribute tasks for ResizeJob")
err := j.distributeResizeInstructions()
if err != nil {
j.result <- ResizeJobStateAborted
Expand Down Expand Up @@ -1466,7 +1452,7 @@ func (j *ResizeJob) nodesArePending() bool {
}

func (j *ResizeJob) distributeResizeInstructions() error {
j.logger().Printf("distributeResizeInstructions for job %d", j.ID)
j.Logger.Printf("distributeResizeInstructions for job %d", j.ID)
// Loop through the ResizeInstructions in ResizeJob and send to each host.
for _, instr := range j.Instructions {
// Because the node may not be in the cluster yet, create
Expand All @@ -1475,7 +1461,7 @@ func (j *ResizeJob) distributeResizeInstructions() error {
ID: instr.Node.ID,
URI: decodeURI(instr.Node.URI),
}
j.logger().Printf("send resize instructions: %v", instr)
j.Logger.Printf("send resize instructions: %v", instr)
if err := j.Broadcaster.SendTo(node, instr); err != nil {
return err
}
Expand Down Expand Up @@ -1681,7 +1667,7 @@ func (c *Cluster) ReceiveEvent(e *NodeEvent) error {

switch e.Event {
case NodeJoin:
c.logger().Printf("received NodeJoin event: %v", e)
c.Logger.Printf("received NodeJoin event: %v", e)
// Ignore the event if this is not the coordinator.
if !c.IsCoordinator() {
return nil
Expand All @@ -1701,7 +1687,7 @@ func (c *Cluster) nodeJoin(node *Node) error {
// A host that is not part of the topology can't be added to the STARTING cluster.
if !c.Topology.ContainsID(node.ID) {
err := fmt.Sprintf("host is not in topology: %s", node.ID)
c.logger().Print(err)
c.Logger.Printf("%v", err)
return errors.New(err)
}

Expand Down Expand Up @@ -1816,7 +1802,7 @@ func (c *Cluster) nodeLeave(node *Node) error {
}

func (c *Cluster) MergeClusterStatus(cs *internal.ClusterStatus) error {
c.logger().Printf("merge cluster status: %v", cs)
c.Logger.Printf("merge cluster status: %v", cs)
// Ignore status updates from self (coordinator).
if c.IsCoordinator() {
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewServeCmd(stdin io.Reader, stdout, stderr io.Writer) *cobra.Command {
Long: `pilosa server runs Pilosa.
It will load existing data from the configured
directory, and start listening client connections
directory and start listening for client connections
on the configured port.`,
RunE: func(cmd *cobra.Command, args []string) error {
logOutput, err := server.GetLogWriter(Server.Config.LogPath, stderr)
Expand Down
Loading

0 comments on commit 9a2aeac

Please sign in to comment.