diff --git a/pkg/eventlog/disposable/disposable.go b/pkg/eventlog/disposable/disposable.go new file mode 100644 index 000000000..932fd10e2 --- /dev/null +++ b/pkg/eventlog/disposable/disposable.go @@ -0,0 +1,50 @@ +package disposable + +import ( + "github.com/gomods/athens/pkg/eventlog" +) + +// Log is event log fetched from olympus server +type Log struct { + e eventlog.Eventlog +} + +// NewLog creates log reader from remote log, log gets cleared after each read +func NewLog(log eventlog.Eventlog) (*Log, error) { + return &Log{e: log}, nil +} + +// Read reads all events in event log. +func (c *Log) Read() ([]eventlog.Event, error) { + ee, err := c.e.Read() + if err != nil { + return ee, err + } + + if len(ee) > 0 { + last := ee[len(ee)-1] + return ee, c.e.Clear(last.ID) + } + + return ee, nil +} + +// ReadFrom reads all events from the log starting at event with specified id (excluded). +// If id is not found behaves like Read(). +func (c *Log) ReadFrom(id string) ([]eventlog.Event, error) { + ee, err := c.e.ReadFrom(id) + if err != nil { + return ee, err + } + + if len(ee) > 0 { + return ee, c.e.Clear(id) + } + + return ee, nil +} + +// Append appends Event to event log and returns its ID. +func (c *Log) Append(event eventlog.Event) (string, error) { + return c.e.Append(event) +} diff --git a/pkg/eventlog/event.go b/pkg/eventlog/event.go new file mode 100644 index 000000000..ae7dc61f9 --- /dev/null +++ b/pkg/eventlog/event.go @@ -0,0 +1,17 @@ +package eventlog + +import ( + "time" +) + +// Event is entry of event log specifying demand for a module. +type Event struct { + // ID is identifier, also used as a pointer reference target. + ID string `json:"_id" bson:"_id,omitempty"` + // Time is cache-miss created/handled time. + Time time.Time `json:"time_created" bson:"time_created"` + // Module is module name. + Module string `json:"module" bson:"module"` + // Version is version of a module e.g. "1.10", "1.10-deprecated" + Version string `json:"version" bson:"version"` +} diff --git a/pkg/eventlog/eventlog.go b/pkg/eventlog/eventlog.go new file mode 100644 index 000000000..1dae96d9b --- /dev/null +++ b/pkg/eventlog/eventlog.go @@ -0,0 +1,29 @@ +package eventlog + +// Eventlog is append only log of Events. +type Eventlog interface { + Reader + Appender + Clearer +} + +// Reader is reader of append only event log.s +type Reader interface { + // Read reads all events in event log. + Read() ([]Event, error) + + // ReadFrom reads all events from the log starting at event with specified id (excluded). + // If id is not found behaves like Read(). + ReadFrom(id string) ([]Event, error) +} + +// Appender is writer to append only event log. +type Appender interface { + // Append appends Event to event log and returns its ID. + Append(event Event) (string, error) +} + +// Clearer is interface used to clear state of event log +type Clearer interface { + Clear(id string) error +} diff --git a/pkg/eventlog/mongo/mongo.go b/pkg/eventlog/mongo/mongo.go new file mode 100644 index 000000000..611ab0302 --- /dev/null +++ b/pkg/eventlog/mongo/mongo.go @@ -0,0 +1,84 @@ +package mongo + +import ( + "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" + "github.com/gomods/athens/pkg/eventlog" +) + +// Log is event log fetched from backing mongo database +type Log struct { + s *mgo.Session + d string // database + c string // collection + url string +} + +// NewLog creates event log from backing mongo database +func NewLog(url string) (*Log, error) { + return NewLogWithCollection(url, "eventlog") +} + +// NewLogWithCollection creates event log from backing mongo database +func NewLogWithCollection(url, collection string) (*Log, error) { + m := &Log{ + url: url, + c: collection, + d: "athens", + } + return m, m.Connect() +} + +// Connect establishes a session to the mongo cluster. +func (m *Log) Connect() error { + s, err := mgo.Dial(m.url) + if err != nil { + return err + } + m.s = s + + return nil +} + +// Read reads all events in event log. +func (m *Log) Read() ([]eventlog.Event, error) { + var events []eventlog.Event + + c := m.s.DB(m.d).C(m.c) + err := c.Find(nil).All(&events) + + return events, err +} + +// ReadFrom reads all events from the log starting at event with specified id (excluded). +// If id is not found behaves like Read(). +func (m *Log) ReadFrom(id string) ([]eventlog.Event, error) { + var events []eventlog.Event + + c := m.s.DB(m.d).C(m.c) + err := c.Find(bson.M{"_id": bson.M{"$gt": id}}).All(&events) + + return events, err +} + +// Append appends Event to event log and returns its ID. +func (m *Log) Append(event eventlog.Event) (string, error) { + event.ID = bson.NewObjectId().Hex() + c := m.s.DB(m.d).C(m.c) + err := c.Insert(event) + + return event.ID, err +} + +// Clear is a method for clearing entire state of event log +func (m *Log) Clear(id string) error { + c := m.s.DB(m.d).C(m.c) + + if id == "" { + _, err := c.RemoveAll(nil) + return err + } + + _, err := c.RemoveAll(bson.M{"_id": bson.M{"$lte": id}}) + return err +} diff --git a/pkg/eventlog/mongo/mongo_test.go b/pkg/eventlog/mongo/mongo_test.go new file mode 100644 index 000000000..8ea51cdf9 --- /dev/null +++ b/pkg/eventlog/mongo/mongo_test.go @@ -0,0 +1,109 @@ +package mongo + +import ( + "testing" + "time" + + "github.com/gomods/athens/pkg/eventlog" + "github.com/stretchr/testify/suite" +) + +type MongoTests struct { + suite.Suite + log *Log +} + +func TestMongo(t *testing.T) { + suite.Run(t, new(MongoTests)) +} + +func (m *MongoTests) SetupTest() { + store, err := NewLog("mongodb://127.0.0.1:27017") + if err != nil { + panic(err) + } + + store.Connect() + + store.s.DB(store.d).C(store.c).RemoveAll(nil) + m.log = store +} + +func (m *MongoTests) TestRead() { + r := m.Require() + versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"} + for _, version := range versions { + _, err := m.log.Append(eventlog.Event{Module: "m1", Version: version, Time: time.Now()}) + r.NoError(err) + } + + retVersions, err := m.log.Read() + r.NoError(err) + r.Equal(versions[0], retVersions[0].Version) + r.Equal(versions[1], retVersions[1].Version) + r.Equal(versions[2], retVersions[2].Version) +} + +func (m *MongoTests) TestReadFrom() { + r := m.Require() + versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"} + pointers := make(map[string]string) + for _, version := range versions { + p, _ := m.log.Append(eventlog.Event{Module: "m1", Version: version, Time: time.Now()}) + pointers[version] = p + } + + retVersions, err := m.log.ReadFrom(pointers[versions[0]]) + r.NoError(err) + r.Equal(versions[1], retVersions[0].Version) + r.Equal(versions[2], retVersions[1].Version) + + retVersions, err = m.log.ReadFrom(pointers[versions[1]]) + r.NoError(err) + r.Equal(versions[2], retVersions[0].Version) + + retVersions, err = m.log.ReadFrom(pointers[versions[2]]) + r.NoError(err) + r.Equal(0, len(retVersions)) +} + +func (m *MongoTests) TestClear() { + r := m.Require() + versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"} + for _, version := range versions { + m.log.Append(eventlog.Event{Module: "m1", Version: version, Time: time.Now()}) + } + + retVersions, err := m.log.Read() + r.NoError(err) + r.Equal(3, len(retVersions)) + + err = m.log.Clear("") + r.NoError(err) + + retVersions, err = m.log.Read() + r.NoError(err) + r.Equal(0, len(retVersions)) +} + +func (m *MongoTests) TestClearFrom() { + r := m.Require() + versions := []string{"v1.0.0", "v1.1.0", "v1.2.0"} + pointers := make(map[string]string) + for _, version := range versions { + p, _ := m.log.Append(eventlog.Event{Module: "m1", Version: version, Time: time.Now()}) + pointers[version] = p + } + + retVersions, err := m.log.Read() + r.NoError(err) + r.Equal(3, len(retVersions)) + + err = m.log.Clear(pointers[versions[1]]) + r.NoError(err) + + retVersions, err = m.log.Read() + r.NoError(err) + r.Equal(1, len(retVersions)) + r.Equal(versions[2], retVersions[0].Version) +} diff --git a/pkg/eventlog/multireader.go b/pkg/eventlog/multireader.go new file mode 100644 index 000000000..f492fe2a6 --- /dev/null +++ b/pkg/eventlog/multireader.go @@ -0,0 +1,106 @@ +package eventlog + +import "github.com/gomods/athens/pkg/storage" + +type multiReader struct { + logs []SequencedLog + checker storage.Checker +} + +// SequencedLog is collection of event logs with specified starting pointers used by ReadFrom function. +type SequencedLog struct { + Log Eventlog + Index string +} + +// NewMultiReader creates composite reader of specified readers. +// Order of readers matters in a way how Events are deduplicated. +// Initial state: +// - InMemory [A, B] - as im.A, im.B +// R1: [C,D,E] - as r1.C... +// R2: [A,D,F] +// R3: [B, G] +// result [r1.C, r1.D, r1.E, r2.F, r3.G] +// r2.A, r2.D, r3.B - skipped due to deduplication checks +func NewMultiReader(ch storage.Checker, ll ...Eventlog) Reader { + logs := make([]SequencedLog, 0, len(ll)) + for _, l := range ll { + // init to -1, not 0, 0 might mean first item and as this is excluding pointer we might lose it + logs = append(logs, SequencedLog{Log: l}) + } + + return NewMultiReaderFrom(ch, logs...) +} + +// NewMultiReaderFrom creates composite reader of specified readers. +// Order of readers matters in a way how Events are deduplicated. +// Initial state: +// - InMemory [A, B] - as im.A, im.B +// R1: [B,C,E] - as r1.C... - pointer to D +// R2: [A,D,F] - pointer to A +// R3: [B, G] - pointer to B +// result [r1.E, r2.D, r2.F, r3.G] +func NewMultiReaderFrom(ch storage.Checker, l ...SequencedLog) Reader { + return &multiReader{ + logs: l, + checker: ch, + } +} + +func (mr *multiReader) Read() ([]Event, error) { + events := make([]Event, 0) + + for _, r := range mr.logs { + ee, err := r.Log.Read() + if err != nil { + return nil, err + } + + for _, e := range ee { + if exists(e, events, mr.checker) { + continue + } + events = append(events, e) + } + } + + return events, nil +} + +func (mr *multiReader) ReadFrom(index string) ([]Event, error) { + events := make([]Event, 0) + + for _, r := range mr.logs { + var ee []Event + var err error + + if r.Index == "" { + ee, err = r.Log.Read() + } else { + ee, err = r.Log.ReadFrom(r.Index) + } + + if err != nil { + return nil, err + } + + for _, e := range ee { + if exists(e, events, mr.checker) { + continue + } + events = append(events, e) + } + } + + return events, nil +} + +func exists(event Event, log []Event, checker storage.Checker) bool { + for _, e := range log { + if e.Module == event.Module && e.Version == event.Version { + return true + } + } + + return checker.Exists(event.Module, event.Version) +} diff --git a/pkg/eventlog/multireader_test.go b/pkg/eventlog/multireader_test.go new file mode 100644 index 000000000..d751a2954 --- /dev/null +++ b/pkg/eventlog/multireader_test.go @@ -0,0 +1,154 @@ +package eventlog + +import ( + "testing" + + "github.com/globalsign/mgo/bson" + "github.com/stretchr/testify/suite" +) + +type MultiReaderTests struct { + suite.Suite +} + +func (m *MultiReaderTests) TestDedupRead() { + inMemReader1 := &InMemoryReader{[]Event{ + {ID: bson.NewObjectId().Hex(), Module: "c", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "d", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "e", Version: "v1"}, + }} + inMemReader2 := &InMemoryReader{[]Event{ + {ID: bson.NewObjectId().Hex(), Module: "a", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "d", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "f", Version: "v1"}, + }} + inMemReader3 := &InMemoryReader{[]Event{ + {ID: bson.NewObjectId().Hex(), Module: "b", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "e", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "c", Version: "v2"}, + }} + + storageChecker := ModuleStorageChecker{Module: "f"} + + mr := NewMultiReader(storageChecker, inMemReader1, inMemReader2, inMemReader3) + + r := m.Require() + + result, err := mr.Read() + + r.Equal(nil, err) + r.Equal(6, len(result), "Retrieved result %v", result) + + r.Equal("c", result[0].Module) + r.Equal("v1", result[0].Version) + + r.Equal("d", result[1].Module) + r.Equal("v1", result[1].Version) + + r.Equal("e", result[2].Module) + r.Equal("v1", result[2].Version) + + r.Equal("a", result[3].Module) + r.Equal("v1", result[3].Version) + + r.Equal("b", result[4].Module) + r.Equal("v1", result[4].Version) + + r.Equal("c", result[5].Module) + r.Equal("v2", result[5].Version) +} + +func (m *MultiReaderTests) TestDedupReadFrom() { + pointer1 := bson.NewObjectId().Hex() + inMemReader1 := &InMemoryReader{[]Event{ + {ID: pointer1, Module: "c", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "d", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "e", Version: "v1"}, + }} + pointer2 := bson.NewObjectId().Hex() + inMemReader2 := &InMemoryReader{[]Event{ + {ID: bson.NewObjectId().Hex(), Module: "a", Version: "v1"}, + {ID: pointer2, Module: "d", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "f", Version: "v1"}, + }} + pointer3 := bson.NewObjectId().Hex() + inMemReader3 := &InMemoryReader{[]Event{ + {ID: bson.NewObjectId().Hex(), Module: "b", Version: "v1"}, + {ID: pointer3, Module: "e", Version: "v1"}, + {ID: bson.NewObjectId().Hex(), Module: "c", Version: "v2"}, + }} + + storageChecker := ModuleStorageChecker{Module: "f"} + + sequencedLog1 := SequencedLog{Index: pointer1, Log: inMemReader1} + sequencedLog2 := SequencedLog{Index: pointer2, Log: inMemReader2} + sequencedLog3 := SequencedLog{Index: pointer3, Log: inMemReader3} + + mr := NewMultiReaderFrom(storageChecker, sequencedLog1, sequencedLog2, sequencedLog3) + + r := m.Require() + + result, err := mr.ReadFrom("") + + r.Equal(nil, err) + r.Equal(3, len(result), "Retrieved result %v", result) + + r.Equal("d", result[0].Module) + r.Equal("v1", result[0].Version) + + r.Equal("e", result[1].Module) + r.Equal("v1", result[1].Version) + + r.Equal("c", result[2].Module) + r.Equal("v2", result[2].Version) +} + +func TestDiskStorage(t *testing.T) { + suite.Run(t, new(MultiReaderTests)) +} + +type InMemoryReader struct { + mem []Event +} + +// Read reads all events in event log. +func (m *InMemoryReader) Read() ([]Event, error) { + return m.mem, nil +} + +// ReadFrom reads all events from the log starting at event with specified id (excluded). +// If id is not found behaves like Read(). +func (m *InMemoryReader) ReadFrom(id string) ([]Event, error) { + var index int + + for i, e := range m.mem { + if e.ID == id { + index = i + break + } + } + + return m.mem[index+1:], nil +} + +// Append appends Event to event log and returns its ID. +func (m *InMemoryReader) Append(event Event) (string, error) { + event.ID = bson.NewObjectId().Hex() + m.mem = append(m.mem, event) + + return event.ID, nil + +} + +func (m *InMemoryReader) Clear(id string) error { + m.mem = make([]Event, 0) + return nil +} + +type ModuleStorageChecker struct { + Module string +} + +func (s ModuleStorageChecker) Exists(module, version string) bool { + return module == s.Module +} diff --git a/pkg/eventlog/olympus/olympus.go b/pkg/eventlog/olympus/olympus.go new file mode 100644 index 000000000..5ee8f2256 --- /dev/null +++ b/pkg/eventlog/olympus/olympus.go @@ -0,0 +1,38 @@ +package olympus + +import "github.com/gomods/athens/pkg/eventlog" + +// Log represents event log fetched from remote olympus server +type Log struct { + uri string +} + +// NewLog creates log reader from remote olympus log +func NewLog(uri string) eventlog.Eventlog { + return &Log{uri: uri} +} + +// Read reads all events in event log. +func (o *Log) Read() ([]eventlog.Event, error) { + // TODO: implement read from endpoint + return nil, nil +} + +// ReadFrom reads all events from the log starting at event with specified id (excluded). +// If id is not found behaves like Read(). +func (o *Log) ReadFrom(id string) ([]eventlog.Event, error) { + // TODO: implement read from endpoint + return nil, nil +} + +// Append appends Event to event log and returns its ID. +func (o *Log) Append(event eventlog.Event) (string, error) { + // TODO: implement cache miss reporting + return "", nil +} + +// Clear is a method for clearing entire state of event log +func (o *Log) Clear(id string) error { + // Do not implement, we cannot clear remote olympus + return nil +} diff --git a/pkg/repo/goget.go b/pkg/repo/goget.go index 4f1b81a92..8135d0ff6 100644 --- a/pkg/repo/goget.go +++ b/pkg/repo/goget.go @@ -48,7 +48,7 @@ func (g *genericFetcher) Fetch() (string, error) { escapedURI := strings.Replace(g.repoURI, "/", "-", -1) repoDirName := fmt.Sprintf(tmpRepoDir, escapedURI, g.version) - repoRoot, err := setupTmp(repoDirName) + gopath, repoRoot, err := setupTmp(repoDirName) if err != nil { return "", err } @@ -56,7 +56,7 @@ func (g *genericFetcher) Fetch() (string, error) { prepareStructure(repoRoot) - dirName, err := getSources(repoRoot, g.repoURI, g.version) + dirName, err := getSources(gopath, repoRoot, g.repoURI, g.version) return dirName, err } @@ -82,11 +82,12 @@ func isVgoInstalled() bool { return false } -func setupTmp(repoDirName string) (string, error) { - tmpDir := os.TempDir() - path := filepath.Join(tmpDir, repoDirName) +func setupTmp(repoDirName string) (string, string, error) { + gopathDir := os.TempDir() - return path, os.MkdirAll(path, os.ModeDir|os.ModePerm) + path := filepath.Join(gopathDir, "src", repoDirName) + + return gopathDir, path, os.MkdirAll(path, os.ModeDir|os.ModePerm) } // Hacky thing makes vgo not to complain @@ -103,7 +104,7 @@ func prepareStructure(repoRoot string) error { return ioutil.WriteFile(sourcePath, sourceContent, 0666) } -func getSources(repoRoot, repoURI, version string) (string, error) { +func getSources(gopath, repoRoot, repoURI, version string) (string, error) { version = strings.TrimPrefix(version, "@") if !strings.HasPrefix(version, "v") { version = "v" + version @@ -112,7 +113,7 @@ func getSources(repoRoot, repoURI, version string) (string, error) { fullURI := fmt.Sprintf("%s@%s", uri, version) - gopathEnv := fmt.Sprintf("GOPATH=%s", repoRoot) + gopathEnv := fmt.Sprintf("GOPATH=%s", gopath) disableCgo := "CGO_ENABLED=0" cmd := exec.Command("vgo", "get", fullURI) @@ -120,7 +121,7 @@ func getSources(repoRoot, repoURI, version string) (string, error) { cmd.Env = append(cmd.Env, gopathEnv, disableCgo) cmd.Dir = repoRoot - packagePath := filepath.Join(repoRoot, "src", "v", "cache", repoURI, "@v") + packagePath := filepath.Join(gopath, "src", "v", "cache", repoURI, "@v") o, err := cmd.CombinedOutput() if err != nil { diff --git a/pkg/storage/checker.go b/pkg/storage/checker.go new file mode 100644 index 000000000..7463794e3 --- /dev/null +++ b/pkg/storage/checker.go @@ -0,0 +1,8 @@ +package storage + +// Checker is the interface that checks if the version of the module exists +type Checker interface { + // Exists checks whether or not module in specified version is present + // in the backing storage + Exists(module, version string) bool +} diff --git a/pkg/storage/reader.go b/pkg/storage/reader.go index 76176288e..9ebd35e52 100644 --- a/pkg/storage/reader.go +++ b/pkg/storage/reader.go @@ -4,4 +4,5 @@ package storage type Reader struct { Lister Getter + Checker }