Skip to content

Commit

Permalink
Remove the log watcher.
Browse files Browse the repository at this point in the history
  • Loading branch information
jaqx0r committed Jan 12, 2021
1 parent 734d94c commit 9b788b8
Show file tree
Hide file tree
Showing 11 changed files with 12 additions and 687 deletions.
7 changes: 1 addition & 6 deletions cmd/mtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/google/mtail/internal/metrics"
"github.com/google/mtail/internal/mtail"
"github.com/google/mtail/internal/waker"
"github.com/google/mtail/internal/watcher"
"go.opencensus.io/trace"
)

Expand Down Expand Up @@ -155,10 +154,6 @@ func main() {
cancel()
}()

w, err := watcher.NewLogWatcher(ctx, *pollInterval)
if err != nil {
glog.Exitf("Failure to create log watcher: %s", err)
}
opts := []mtail.Option{
mtail.ProgramPath(*progs),
mtail.LogPathPatterns(logs...),
Expand Down Expand Up @@ -211,7 +206,7 @@ func main() {
if *expiredMetricGcTickInterval > 0 {
store.StartGcLoop(ctx, *expiredMetricGcTickInterval)
}
m, err := mtail.New(ctx, store, w, opts...)
m, err := mtail.New(ctx, store, opts...)
if err != nil {
glog.Error(err)
os.Exit(1)
Expand Down
10 changes: 3 additions & 7 deletions internal/mtail/examples_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/google/mtail/internal/mtail/golden"
"github.com/google/mtail/internal/testutil"
"github.com/google/mtail/internal/waker"
"github.com/google/mtail/internal/watcher"
)

var exampleProgramTests = []struct {
Expand Down Expand Up @@ -155,11 +154,10 @@ func TestExamplePrograms(t *testing.T) {
for _, tc := range exampleProgramTests {
t.Run(fmt.Sprintf("%s on %s", tc.programfile, tc.logfile), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
w := watcher.NewFakeWatcher()
waker, awaken := waker.NewTest(0)
store := metrics.NewStore()
programFile := path.Join("../..", tc.programfile)
mtail, err := mtail.New(ctx, store, w, mtail.ProgramPath(programFile), mtail.LogPathPatterns(tc.logfile), mtail.OneShot, mtail.OmitMetricSource, mtail.DumpAstTypes, mtail.DumpBytecode, mtail.OmitDumpMetricStore, mtail.LogPatternPollWaker(waker), mtail.LogstreamPollWaker(waker))
mtail, err := mtail.New(ctx, store, mtail.ProgramPath(programFile), mtail.LogPathPatterns(tc.logfile), mtail.OneShot, mtail.OmitMetricSource, mtail.DumpAstTypes, mtail.DumpBytecode, mtail.OmitDumpMetricStore, mtail.LogPatternPollWaker(waker), mtail.LogstreamPollWaker(waker))
testutil.FatalIfErr(t, err)

awaken()
Expand Down Expand Up @@ -196,9 +194,8 @@ func TestCompileExamplePrograms(t *testing.T) {
name := filepath.Base(tc)
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
w := watcher.NewFakeWatcher()
s := metrics.NewStore()
mtail, err := mtail.New(ctx, s, w, mtail.ProgramPath(tc), mtail.CompileOnly, mtail.OmitMetricSource, mtail.DumpAstTypes, mtail.DumpBytecode, mtail.OmitDumpMetricStore)
mtail, err := mtail.New(ctx, s, mtail.ProgramPath(tc), mtail.CompileOnly, mtail.OmitMetricSource, mtail.DumpAstTypes, mtail.DumpBytecode, mtail.OmitDumpMetricStore)
testutil.FatalIfErr(t, err)
// Ensure that run shuts down for CompileOnly
testutil.FatalIfErr(t, mtail.Run())
Expand All @@ -216,12 +213,11 @@ func BenchmarkProgram(b *testing.B) {
defer rmLogDir()
logFile := path.Join(logDir, "test.log")
log := testutil.TestOpenFile(b, logFile)
w := watcher.NewFakeWatcher()
waker, awaken := waker.NewTest(0)
store := metrics.NewStore()
programFile := path.Join("../..", bm.programfile)
ctx, cancel := context.WithCancel(context.Background())
mtail, err := mtail.New(ctx, store, w, mtail.ProgramPath(programFile), mtail.LogPathPatterns(log.Name()), mtail.LogstreamPollWaker(waker))
mtail, err := mtail.New(ctx, store, mtail.ProgramPath(programFile), mtail.LogPathPatterns(log.Name()), mtail.LogstreamPollWaker(waker))
testutil.FatalIfErr(b, err)

var wg sync.WaitGroup
Expand Down
7 changes: 2 additions & 5 deletions internal/mtail/mtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/google/mtail/internal/tailer"
"github.com/google/mtail/internal/vm"
"github.com/google/mtail/internal/waker"
"github.com/google/mtail/internal/watcher"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -35,10 +34,9 @@ import (
type Server struct {
ctx context.Context
store *metrics.Store // Metrics storage
w watcher.Watcher
wg sync.WaitGroup // wait for main processes to shutdown

t *tailer.Tailer // t tails the watched files and sends lines to the VMs
t *tailer.Tailer // t manages log patterns and log streams, which sends lines to the VMs
l *vm.Loader // l loads programs and manages the VM lifecycle
e *exporter.Exporter // e manages the export of metrics from the store

Expand Down Expand Up @@ -224,11 +222,10 @@ func (m *Server) initHttpServer() error {
// each log file from start to finish.
// TODO(jaq): this doesn't need to be a constructor anymore, it could start and
// block until quit, once TestServer.PollWatched is addressed.
func New(ctx context.Context, store *metrics.Store, w watcher.Watcher, options ...Option) (*Server, error) {
func New(ctx context.Context, store *metrics.Store, options ...Option) (*Server, error) {
m := &Server{
ctx: ctx,
store: store,
w: w,
lines: make(chan *logline.LogLine),
// Using a non-pedantic registry means we can be looser with metrics that
// are not fully specified at startup.
Expand Down
10 changes: 2 additions & 8 deletions internal/mtail/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/google/mtail/internal/metrics/datum"
"github.com/google/mtail/internal/testutil"
"github.com/google/mtail/internal/waker"
"github.com/google/mtail/internal/watcher"
)

const timeoutMultiplier = 3
Expand All @@ -27,7 +26,6 @@ const defaultDoOrTimeoutDeadline = 10 * time.Second
type TestServer struct {
*Server

w *watcher.LogWatcher
waker waker.Waker // for idle logstreams; others are polled explicitly in PollWatched
awaken func()

Expand All @@ -54,15 +52,13 @@ func TestMakeServer(tb testing.TB, pollInterval time.Duration, wakers int, optio
expvar.Get("prog_loads_total").(*expvar.Map).Init()

ctx, cancel := context.WithCancel(context.Background())
w, err := watcher.NewLogWatcher(ctx, pollInterval)
testutil.FatalIfErr(tb, err)
waker, awaken := waker.NewTest(wakers)
options = append(options,
LogstreamPollWaker(waker),
)
m, err := New(ctx, metrics.NewStore(), w, options...)
m, err := New(ctx, metrics.NewStore(), options...)
testutil.FatalIfErr(tb, err)
return &TestServer{Server: m, w: w, waker: waker, awaken: awaken, tb: tb, cancel: cancel}
return &TestServer{Server: m, waker: waker, awaken: awaken, tb: tb, cancel: cancel}
}

// TestStartServer creates a new TestServer and starts it running. It
Expand Down Expand Up @@ -100,8 +96,6 @@ func (ts *TestServer) Start() func() {
// Poll all watched objects for updates.
func (ts *TestServer) PollWatched() {
glog.Info("Testserver starting poll")
glog.Info("TestServer polling watched objects")
ts.w.Poll()
glog.Infof("TestServer polling filesystem patterns")
if err := ts.t.Poll(); err != nil {
glog.Info(err)
Expand Down
7 changes: 2 additions & 5 deletions internal/mtail/unix_socket_export_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@ import (
"github.com/google/mtail/internal/metrics"
"github.com/google/mtail/internal/mtail"
"github.com/google/mtail/internal/testutil"
"github.com/google/mtail/internal/watcher"
)

// makeServer makes a new Server for use in tests, but does not start
// the server. It returns the server, or any errors the new server creates.
func makeServer(tb testing.TB, pollInterval time.Duration, options ...mtail.Option) (*mtail.Server, error) {
tb.Helper()
ctx := context.Background()
w, err := watcher.NewLogWatcher(ctx, pollInterval)
testutil.FatalIfErr(tb, err)

return mtail.New(ctx, metrics.NewStore(), w, options...)
return mtail.New(ctx, metrics.NewStore(), options...)
}

// startUNIXSocketServer creates a new Server serving through a UNIX
Expand Down Expand Up @@ -128,7 +125,7 @@ func TestBasicUNIXSockets(t *testing.T) {
unixSocket := "/var/run/mtail_test.socket"

if testing.Verbose() {
defer testutil.TestSetFlag(t, "vmodule", "tail=2,log_watcher=2")()
defer testutil.TestSetFlag(t, "vmodule", "tail=2,filestream=2")()
}
logDir, rmLogDir := testutil.TestTempDir(t)
defer rmLogDir()
Expand Down
8 changes: 2 additions & 6 deletions internal/tailer/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
// and extracting new log lines to be passed into the virtual machines.
package tailer

// For regular files, mtail gets notified on modifications (i.e. appends) to
// log files that are being watched, in order to read the new lines. Log files
// can also be rotated, so mtail is also notified of creates in the log file
// directory.

import (
"context"
"errors"
Expand All @@ -33,7 +28,8 @@ var (
logCount = expvar.NewInt("log_count")
)

// Tailer polls the filesystem for log sources that match given `LogPathPatterns` and creates `LogStream`s to tail them.
// Tailer polls the filesystem for log sources that match given
// `LogPathPatterns` and creates `LogStream`s to tail them.
type Tailer struct {
ctx context.Context
wg sync.WaitGroup // Wait for our subroutines to finish
Expand Down
117 changes: 0 additions & 117 deletions internal/watcher/fake_watcher.go

This file was deleted.

Loading

0 comments on commit 9b788b8

Please sign in to comment.