From 85c3df5a543bf24fa88e3b3a470070c65eca1d70 Mon Sep 17 00:00:00 2001 From: Manu Gupta Date: Thu, 26 Jul 2018 20:15:23 -0400 Subject: [PATCH] Add tracing functions to s3, Azure, minio, rdbms, mongo (#332) * Add opentracing to context for azurecdn * Add opentracing context for s3 * Add opentracing for minio * Add opentracing functions for mongodb * Add tracing functions for rdbms * Fix span name in aws.s3.upload * add opentracing calls for storage.olympus * Add opentracing functions to storage.gcp --- pkg/storage/azurecdn/blob_store_client.go | 4 ++++ pkg/storage/azurecdn/storage.go | 4 ++++ pkg/storage/gcp/checker.go | 3 +++ pkg/storage/gcp/deleter.go | 3 +++ pkg/storage/gcp/getter.go | 3 +++ pkg/storage/gcp/lister.go | 3 +++ pkg/storage/gcp/saver.go | 5 +++++ pkg/storage/minio/checker.go | 3 +++ pkg/storage/minio/deleter.go | 3 +++ pkg/storage/minio/getter.go | 4 ++++ pkg/storage/minio/lister.go | 4 ++++ pkg/storage/minio/saver.go | 5 ++++- pkg/storage/mongo/checker.go | 3 +++ pkg/storage/mongo/deleter.go | 3 +++ pkg/storage/mongo/getter.go | 4 ++++ pkg/storage/mongo/lister.go | 3 +++ pkg/storage/mongo/saver.go | 5 ++++- pkg/storage/olympus/getter.go | 7 ++++++- pkg/storage/olympus/saver.go | 6 +++++- pkg/storage/rdbms/checker.go | 3 +++ pkg/storage/rdbms/deleter.go | 3 +++ pkg/storage/rdbms/getter.go | 4 ++++ pkg/storage/rdbms/lister.go | 3 +++ pkg/storage/rdbms/saver.go | 5 ++++- pkg/storage/s3/storage.go | 6 ++++++ 25 files changed, 94 insertions(+), 5 deletions(-) diff --git a/pkg/storage/azurecdn/blob_store_client.go b/pkg/storage/azurecdn/blob_store_client.go index 80a94d7a0..c0a60d792 100644 --- a/pkg/storage/azurecdn/blob_store_client.go +++ b/pkg/storage/azurecdn/blob_store_client.go @@ -5,6 +5,8 @@ import ( "io" "net/url" + "github.com/opentracing/opentracing-go" + "github.com/Azure/azure-storage-blob-go/2017-07-29/azblob" ) @@ -26,6 +28,8 @@ func newBlobStoreClient(accountURL *url.URL, accountName, accountKey, containerN } func (c *azureBlobStoreClient) UploadWithContext(ctx context.Context, path, contentType string, content io.Reader) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.azurecdn.UploadWithContext") + defer sp.Finish() blobURL := c.containerURL.NewBlockBlobURL(path) emptyMeta := map[string]string{} emptyBlobAccessCond := azblob.BlobAccessConditions{} diff --git a/pkg/storage/azurecdn/storage.go b/pkg/storage/azurecdn/storage.go index 09e0c1cd1..4f954742d 100644 --- a/pkg/storage/azurecdn/storage.go +++ b/pkg/storage/azurecdn/storage.go @@ -7,6 +7,8 @@ import ( "io" "net/url" + "github.com/opentracing/opentracing-go" + "github.com/gomods/athens/pkg/config/env" moduploader "github.com/gomods/athens/pkg/storage/module" ) @@ -56,6 +58,8 @@ func (s Storage) BaseURL() *url.URL { // Save implements the (github.com/gomods/athens/pkg/storage).Saver interface. func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.azurecdn.Save") + sp.Finish() err := moduploader.Upload(ctx, module, version, bytes.NewReader(info), bytes.NewReader(mod), zip, s.cl.UploadWithContext) // TODO: take out lease on the /list file and add the version to it // diff --git a/pkg/storage/gcp/checker.go b/pkg/storage/gcp/checker.go index c70be0f82..bd697964c 100644 --- a/pkg/storage/gcp/checker.go +++ b/pkg/storage/gcp/checker.go @@ -4,10 +4,13 @@ import ( "context" "github.com/gomods/athens/pkg/config" + opentracing "github.com/opentracing/opentracing-go" ) // Exists implements the (./pkg/storage).Checker interface // returning true if the module at version exists in storage func (s *Storage) Exists(ctx context.Context, module, version string) bool { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.Exists") + defer sp.Finish() return s.bucket.Exists(ctx, config.PackageVersionedName(module, version, ".mod")) } diff --git a/pkg/storage/gcp/deleter.go b/pkg/storage/gcp/deleter.go index e437349dc..15e982034 100644 --- a/pkg/storage/gcp/deleter.go +++ b/pkg/storage/gcp/deleter.go @@ -6,12 +6,15 @@ import ( "github.com/gomods/athens/pkg/config" "github.com/gomods/athens/pkg/storage" modupl "github.com/gomods/athens/pkg/storage/module" + opentracing "github.com/opentracing/opentracing-go" ) // Delete implements the (./pkg/storage).Deleter interface and // removes a version of a module from storage. Returning ErrNotFound // if the version does not exist. func (s *Storage) Delete(ctx context.Context, module, version string) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.Delete") + defer sp.Finish() if exists := s.bucket.Exists(ctx, config.PackageVersionedName(module, version, "mod")); !exists { return storage.ErrVersionNotFound{Module: module, Version: version} } diff --git a/pkg/storage/gcp/getter.go b/pkg/storage/gcp/getter.go index cc29b493c..9143f9cc5 100644 --- a/pkg/storage/gcp/getter.go +++ b/pkg/storage/gcp/getter.go @@ -7,6 +7,7 @@ import ( "github.com/gomods/athens/pkg/config" "github.com/gomods/athens/pkg/storage" + opentracing "github.com/opentracing/opentracing-go" ) // Get retrieves a module at a specific version from storage as a (./pkg/storage).Version @@ -14,6 +15,8 @@ import ( // The caller is responsible for calling close on the Zip ReadCloser func (s *Storage) Get(module, version string) (*storage.Version, error) { ctx := context.TODO() + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.Get") + defer sp.Finish() if exists := s.Exists(ctx, module, version); !exists { return nil, storage.ErrVersionNotFound{Module: module, Version: version} } diff --git a/pkg/storage/gcp/lister.go b/pkg/storage/gcp/lister.go index df9a5432a..936343645 100644 --- a/pkg/storage/gcp/lister.go +++ b/pkg/storage/gcp/lister.go @@ -5,11 +5,14 @@ import ( "strings" "github.com/gomods/athens/pkg/storage" + opentracing "github.com/opentracing/opentracing-go" ) // List implements the (./pkg/storage).Lister interface // It returns a list of versions, if any, for a given module func (s *Storage) List(ctx context.Context, module string) ([]string, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.List") + defer sp.Finish() paths, err := s.bucket.List(ctx, module) if err != nil { return nil, err diff --git a/pkg/storage/gcp/saver.go b/pkg/storage/gcp/saver.go index 29a5b6ab2..cd4c674ff 100644 --- a/pkg/storage/gcp/saver.go +++ b/pkg/storage/gcp/saver.go @@ -8,6 +8,7 @@ import ( stg "github.com/gomods/athens/pkg/storage" moduploader "github.com/gomods/athens/pkg/storage/module" + opentracing "github.com/opentracing/opentracing-go" ) // Save uploads the module's .mod, .zip and .info files for a given version @@ -18,6 +19,8 @@ import ( // Uploaded files are publicly accessable in the storage bucket as per // an ACL rule. func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.Save") + defer sp.Finish() if exists := s.Exists(ctx, module, version); exists { return stg.ErrVersionAlreadyExists{Module: module, Version: version} } @@ -30,6 +33,8 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, } func (s *Storage) upload(ctx context.Context, path, contentType string, stream io.Reader) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.gcp.upload") + defer sp.Finish() wc := s.bucket.Write(ctx, path) defer func(wc io.WriteCloser) { if err := wc.Close(); err != nil { diff --git a/pkg/storage/minio/checker.go b/pkg/storage/minio/checker.go index e1f897552..d169416a7 100644 --- a/pkg/storage/minio/checker.go +++ b/pkg/storage/minio/checker.go @@ -5,9 +5,12 @@ import ( "fmt" minio "github.com/minio/minio-go" + opentracing "github.com/opentracing/opentracing-go" ) func (v *storageImpl) Exists(ctx context.Context, module, version string) bool { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.minio.Exists") + defer sp.Finish() versionedPath := v.versionLocation(module, version) modPath := fmt.Sprintf("%s/go.mod", versionedPath) _, err := v.minioClient.StatObject(v.bucketName, modPath, minio.StatObjectOptions{}) diff --git a/pkg/storage/minio/deleter.go b/pkg/storage/minio/deleter.go index 14bcf8cc7..e8db624f8 100644 --- a/pkg/storage/minio/deleter.go +++ b/pkg/storage/minio/deleter.go @@ -5,9 +5,12 @@ import ( "fmt" "github.com/gomods/athens/pkg/storage" + opentracing "github.com/opentracing/opentracing-go" ) func (v *storageImpl) Delete(ctx context.Context, module, version string) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.minio.Delete") + defer sp.Finish() if !v.Exists(ctx, module, version) { return storage.ErrVersionNotFound{ Module: module, diff --git a/pkg/storage/minio/getter.go b/pkg/storage/minio/getter.go index a7d3ed597..d0c89a71f 100644 --- a/pkg/storage/minio/getter.go +++ b/pkg/storage/minio/getter.go @@ -1,15 +1,19 @@ package minio import ( + "context" "fmt" "io/ioutil" "net/http" "github.com/gomods/athens/pkg/storage" minio "github.com/minio/minio-go" + opentracing "github.com/opentracing/opentracing-go" ) func (v *storageImpl) Get(module, version string) (*storage.Version, error) { + sp, _ := opentracing.StartSpanFromContext(context.TODO(), "storage.minio.Get") + defer sp.Finish() versionedPath := v.versionLocation(module, version) modPath := fmt.Sprintf("%s/go.mod", versionedPath) modReader, err := v.minioClient.GetObject(v.bucketName, modPath, minio.GetObjectOptions{}) diff --git a/pkg/storage/minio/lister.go b/pkg/storage/minio/lister.go index f6d1b5181..f15128610 100644 --- a/pkg/storage/minio/lister.go +++ b/pkg/storage/minio/lister.go @@ -4,9 +4,13 @@ import ( "context" "sort" "strings" + + opentracing "github.com/opentracing/opentracing-go" ) func (l *storageImpl) List(ctx context.Context, module string) ([]string, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.minio.List") + defer sp.Finish() dict := make(map[string]struct{}) doneCh := make(chan struct{}) diff --git a/pkg/storage/minio/saver.go b/pkg/storage/minio/saver.go index a071d0c89..aee1b77ac 100644 --- a/pkg/storage/minio/saver.go +++ b/pkg/storage/minio/saver.go @@ -6,9 +6,12 @@ import ( "io" minio "github.com/minio/minio-go" + opentracing "github.com/opentracing/opentracing-go" ) -func (s *storageImpl) Save(_ context.Context, module, vsn string, mod []byte, zip io.Reader, info []byte) error { +func (s *storageImpl) Save(ctx context.Context, module, vsn string, mod []byte, zip io.Reader, info []byte) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.minio.Save") + defer sp.Finish() dir := s.versionLocation(module, vsn) modFileName := dir + "/" + "go.mod" zipFileName := dir + "/" + "source.zip" diff --git a/pkg/storage/mongo/checker.go b/pkg/storage/mongo/checker.go index bedafe7d5..be1a76314 100644 --- a/pkg/storage/mongo/checker.go +++ b/pkg/storage/mongo/checker.go @@ -4,10 +4,13 @@ import ( "context" "github.com/globalsign/mgo/bson" + opentracing "github.com/opentracing/opentracing-go" ) // Exists checks for a specific version of a module func (s *ModuleStore) Exists(ctx context.Context, module, vsn string) bool { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.mongo.Exists") + defer sp.Finish() c := s.s.DB(s.d).C(s.c) count, err := c.Find(bson.M{"module": module, "version": vsn}).Count() return err == nil && count > 0 diff --git a/pkg/storage/mongo/deleter.go b/pkg/storage/mongo/deleter.go index d997fa047..73fac1fb9 100644 --- a/pkg/storage/mongo/deleter.go +++ b/pkg/storage/mongo/deleter.go @@ -5,10 +5,13 @@ import ( "github.com/globalsign/mgo/bson" "github.com/gomods/athens/pkg/storage" + opentracing "github.com/opentracing/opentracing-go" ) // Delete removes a specific version of a module func (s *ModuleStore) Delete(ctx context.Context, module, version string) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.mongo.Delete") + defer sp.Finish() if !s.Exists(ctx, module, version) { return storage.ErrVersionNotFound{ Module: module, diff --git a/pkg/storage/mongo/getter.go b/pkg/storage/mongo/getter.go index fe9ba16e1..28d739e2e 100644 --- a/pkg/storage/mongo/getter.go +++ b/pkg/storage/mongo/getter.go @@ -2,15 +2,19 @@ package mongo import ( "bytes" + "context" "io/ioutil" "strings" "github.com/globalsign/mgo/bson" "github.com/gomods/athens/pkg/storage" + opentracing "github.com/opentracing/opentracing-go" ) // Get a specific version of a module func (s *ModuleStore) Get(module, vsn string) (*storage.Version, error) { + sp, _ := opentracing.StartSpanFromContext(context.TODO(), "storage.mongo.Get") + defer sp.Finish() c := s.s.DB(s.d).C(s.c) result := &storage.Module{} err := c.Find(bson.M{"module": module, "version": vsn}).One(result) diff --git a/pkg/storage/mongo/lister.go b/pkg/storage/mongo/lister.go index 7581cd04f..6b2db7ccc 100644 --- a/pkg/storage/mongo/lister.go +++ b/pkg/storage/mongo/lister.go @@ -6,10 +6,13 @@ import ( "github.com/globalsign/mgo/bson" "github.com/gomods/athens/pkg/storage" + opentracing "github.com/opentracing/opentracing-go" ) // List lists all versions of a module func (s *ModuleStore) List(ctx context.Context, module string) ([]string, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.mongo.List") + defer sp.Finish() c := s.s.DB(s.d).C(s.c) result := make([]storage.Module, 0) err := c.Find(bson.M{"module": module}).All(&result) diff --git a/pkg/storage/mongo/saver.go b/pkg/storage/mongo/saver.go index 98e74af74..43f5e1b1a 100644 --- a/pkg/storage/mongo/saver.go +++ b/pkg/storage/mongo/saver.go @@ -6,10 +6,13 @@ import ( "io/ioutil" "github.com/gomods/athens/pkg/storage" + opentracing "github.com/opentracing/opentracing-go" ) // Save stores a module in mongo storage. -func (s *ModuleStore) Save(_ context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { +func (s *ModuleStore) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.mongo.Save") + defer sp.Finish() zipBytes, err := ioutil.ReadAll(zip) if err != nil { return err diff --git a/pkg/storage/olympus/getter.go b/pkg/storage/olympus/getter.go index 21afd3cd9..3c55fc5de 100644 --- a/pkg/storage/olympus/getter.go +++ b/pkg/storage/olympus/getter.go @@ -2,18 +2,23 @@ package olympus import ( "bytes" + "context" "fmt" "io/ioutil" "net/http" + "github.com/opentracing/opentracing-go" + "github.com/gomods/athens/pkg/config" "github.com/gomods/athens/pkg/storage" ) // Get a specific version of a module func (s *ModuleStore) Get(module, vsn string) (*storage.Version, error) { - // TODO: fetch from endpoint + sp, _ := opentracing.StartSpanFromContext(context.TODO(), "storage.olympus.Get") + defer sp.Finish() + // TODO: fetch from endpoint modURI := fmt.Sprintf("%s/%s", s.url, config.PackageVersionedName(module, vsn, "mod")) zipURI := fmt.Sprintf("%s/%s", s.url, config.PackageVersionedName(module, vsn, "zip")) infoURI := fmt.Sprintf("%s/%s", s.url, config.PackageVersionedName(module, vsn, "info")) diff --git a/pkg/storage/olympus/saver.go b/pkg/storage/olympus/saver.go index b45fac3a7..f4edeff83 100644 --- a/pkg/storage/olympus/saver.go +++ b/pkg/storage/olympus/saver.go @@ -3,11 +3,15 @@ package olympus import ( "context" "io" + + "github.com/opentracing/opentracing-go" ) // Save stores a module in olympus. // This actually does not store anything just reports cache miss -func (s *ModuleStore) Save(_ context.Context, module, version string, _ []byte, _ io.ReadSeeker, _ []byte) error { +func (s *ModuleStore) Save(ctx context.Context, module, version string, _ []byte, _ io.ReadSeeker, _ []byte) error { // dummy implementation so Olympus Store can be used everywhere as Backend iface + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.olympus.Save") + defer sp.Finish() return nil } diff --git a/pkg/storage/rdbms/checker.go b/pkg/storage/rdbms/checker.go index e3f410d84..7608709ca 100644 --- a/pkg/storage/rdbms/checker.go +++ b/pkg/storage/rdbms/checker.go @@ -4,10 +4,13 @@ import ( "context" "github.com/gomods/athens/pkg/storage/rdbms/models" + opentracing "github.com/opentracing/opentracing-go" ) // Exists checks for a specific version of a module func (r *ModuleStore) Exists(ctx context.Context, module, vsn string) bool { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.rdbms.Exists") + defer sp.Finish() result := models.Module{} query := r.conn.Where("module = ?", module).Where("version = ?", vsn) count, err := query.Count(&result) diff --git a/pkg/storage/rdbms/deleter.go b/pkg/storage/rdbms/deleter.go index fb6c13a28..1a3bad845 100644 --- a/pkg/storage/rdbms/deleter.go +++ b/pkg/storage/rdbms/deleter.go @@ -5,10 +5,13 @@ import ( "github.com/gomods/athens/pkg/storage" "github.com/gomods/athens/pkg/storage/rdbms/models" + opentracing "github.com/opentracing/opentracing-go" ) // Delete removes a specific version of a module. func (r *ModuleStore) Delete(ctx context.Context, module, version string) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.rdbms.Delete") + defer sp.Finish() if !r.Exists(ctx, module, version) { return storage.ErrVersionNotFound{ Module: module, diff --git a/pkg/storage/rdbms/getter.go b/pkg/storage/rdbms/getter.go index 90649ddbf..6b7cfb003 100644 --- a/pkg/storage/rdbms/getter.go +++ b/pkg/storage/rdbms/getter.go @@ -2,16 +2,20 @@ package rdbms import ( "bytes" + "context" "database/sql" "io/ioutil" "github.com/gomods/athens/pkg/storage" "github.com/gomods/athens/pkg/storage/rdbms/models" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" ) // Get a specific version of a module func (r *ModuleStore) Get(module, vsn string) (*storage.Version, error) { + sp, _ := opentracing.StartSpanFromContext(context.TODO(), "storage.rdbms.Get") + defer sp.Finish() result := models.Module{} query := r.conn.Where("module = ?", module).Where("version = ?", vsn) if err := query.First(&result); err != nil { diff --git a/pkg/storage/rdbms/lister.go b/pkg/storage/rdbms/lister.go index 24ec176f7..d959d2670 100644 --- a/pkg/storage/rdbms/lister.go +++ b/pkg/storage/rdbms/lister.go @@ -4,10 +4,13 @@ import ( "context" "github.com/gomods/athens/pkg/storage/rdbms/models" + opentracing "github.com/opentracing/opentracing-go" ) // List lists all versions of a module func (r *ModuleStore) List(ctx context.Context, module string) ([]string, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.rdbms.List") + defer sp.Finish() result := make([]models.Module, 0) err := r.conn.Where("module = ?", module).All(&result) if err != nil { diff --git a/pkg/storage/rdbms/saver.go b/pkg/storage/rdbms/saver.go index 395e0c8bc..0c582c08c 100644 --- a/pkg/storage/rdbms/saver.go +++ b/pkg/storage/rdbms/saver.go @@ -6,10 +6,13 @@ import ( "io/ioutil" "github.com/gomods/athens/pkg/storage/rdbms/models" + opentracing "github.com/opentracing/opentracing-go" ) // Save stores a module in rdbms storage. -func (r *ModuleStore) Save(_ context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { +func (r *ModuleStore) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.rdbms.Save") + defer sp.Finish() zipBytes, err := ioutil.ReadAll(zip) if err != nil { return err diff --git a/pkg/storage/s3/storage.go b/pkg/storage/s3/storage.go index 5d115924e..83dee6fdb 100644 --- a/pkg/storage/s3/storage.go +++ b/pkg/storage/s3/storage.go @@ -7,6 +7,8 @@ import ( "io" "net/url" + "github.com/opentracing/opentracing-go" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface" @@ -75,6 +77,8 @@ func (s Storage) BaseURL() *url.URL { // Save implements the (github.com/gomods/athens/pkg/storage).Saver interface. func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.s3.Save") + defer sp.Finish() err := moduploader.Upload(ctx, module, version, bytes.NewReader(info), bytes.NewReader(mod), zip, s.upload) // TODO: take out lease on the /list file and add the version to it // @@ -83,6 +87,8 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, } func (s *Storage) upload(ctx context.Context, path, contentType string, stream io.Reader) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "storage.s3.upload") + defer sp.Finish() upParams := &s3manager.UploadInput{ Bucket: &s.bucket, Key: &path,