Skip to content

Commit

Permalink
refactor: refine usage collection
Browse files Browse the repository at this point in the history
  • Loading branch information
pinglin committed Jun 8, 2022
1 parent caf148b commit bf0d3db
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 91 deletions.
5 changes: 1 addition & 4 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,4 @@ POSTGRESQL_VERSION=14.1

#----------- Temopral version ------------#
TEMPORAL_VERSION=1.16.2
TEMPORAL_UI_VERSION=0.14.0

#----------- Triton server version ------------#
TRITON_CONDA_ENV_VERSOIN=0.2.2-alpha
TEMPORAL_UI_VERSION=2.0.0
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
.DEFAULT_GOAL:=help

DEV := pipeline_backend
DEP := mgmt_backend connector_backend model_backend triton_conda_env
DEP := mgmt_backend connector_backend model_backend
DB := pg_sql redis
TRITON := triton_server
TRITON := triton_server triton_conda_env
TEMPORAL := temporal

#============================================================================
Expand Down
61 changes: 35 additions & 26 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"

"github.com/go-redis/redis/v9"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
Expand All @@ -34,9 +36,10 @@ import (
"github.com/instill-ai/x/repo"

database "github.com/instill-ai/pipeline-backend/internal/db"
mgmtPB "github.com/instill-ai/protogen-go/vdp/mgmt/v1alpha"
pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1alpha"
usagePB "github.com/instill-ai/protogen-go/vdp/usage/v1alpha"
usageclient "github.com/instill-ai/usage-client/usage"
usageclient "github.com/instill-ai/usage-client/client"
)

func grpcHandlerFunc(grpcServer *grpc.Server, gwHandler http.Handler, CORSOrigins []string) http.Handler {
Expand All @@ -58,16 +61,39 @@ func grpcHandlerFunc(grpcServer *grpc.Server, gwHandler http.Handler, CORSOrigin
)
}

func main() {
func startReporter(ctx context.Context, usageServiceClient usagePB.UsageServiceClient, r repository.Repository, mu mgmtPB.UserServiceClient, rc *redis.Client) {
if config.Config.Server.DisableUsage {
return
}

logger, _ := logger.GetZapLogger()
defer logger.Sync() //nolint
grpc_zap.ReplaceGrpcLoggerV2(logger)

if err := config.Init(); err != nil {
version, err := repo.ReadReleaseManifest("release-please/manifest.json")
if err != nil {
logger.Fatal(err.Error())
}

go func() {
time.Sleep(5 * time.Second)

usg := usage.NewUsage(r, mu, rc)
err = usageclient.StartReporter(ctx, usageServiceClient, usagePB.Session_SERVICE_PIPELINE, config.Config.Server.Edition, version, usg.RetrieveUsageData)
if err != nil {
logger.Error(fmt.Sprintf("unable to start reporter: %v\n", err))
}
}()
}

func main() {

if err := config.Init(); err != nil {
log.Fatal(err.Error())
}

logger, _ := logger.GetZapLogger()
defer logger.Sync() //nolint
grpc_zap.ReplaceGrpcLoggerV2(logger)

db := database.GetConnection()
defer database.Close(db)

Expand Down Expand Up @@ -123,6 +149,9 @@ func main() {
modelServiceClient, modelServiceClientConn := external.InitModelServiceClient()
defer modelServiceClientConn.Close()

usageServiceClient, usageServiceClientConn := external.InitUsageServiceClient()
defer usageServiceClientConn.Close()

redisClient := redis.NewClient(&config.Config.Cache.Redis.RedisOptions)
defer redisClient.Close()

Expand Down Expand Up @@ -163,27 +192,7 @@ func main() {
}

// Start usage reporter
if !config.Config.Server.DisableUsage {
version, err := repo.ReadReleaseManifest("release-please/manifest.json")
if err != nil {
logger.Fatal(err.Error())
}

usageServiceClient, usageServiceClientConn := external.InitUsageServiceClient()
defer usageServiceClientConn.Close()

usg := usage.NewUsage(repository, userServiceClient, redisClient)
err = usageclient.StartReporter(
context.Background(),
usageServiceClient,
usagePB.Session_SERVICE_PIPELINE,
config.Config.Server.Edition,
version,
usg.RetrieveUsageData)
if err != nil {
logger.Error(fmt.Sprintf("Unable to start usage reporter: %v\n", err))
}
}
startReporter(ctx, usageServiceClient, repository, userServiceClient, redisClient)

// Start gRPC server
var dialOpts []grpc.DialOption
Expand Down
8 changes: 3 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"flag"
"log"
"os"
"strings"
"time"
Expand All @@ -12,8 +13,6 @@ import (
"github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file"
"go.temporal.io/sdk/client"

"github.com/instill-ai/pipeline-backend/internal/logger"
)

// Config - Global variable to export
Expand Down Expand Up @@ -41,6 +40,7 @@ type ServerConfig struct {
CORSOrigins []string `koanf:"corsorigins"`
Edition string `koanf:"edition"`
DisableUsage bool `koanf:"disableusage"`
Debug bool `koanf:"debug"`
}

// DatabaseConfig related to database
Expand Down Expand Up @@ -113,8 +113,6 @@ type UsageBackendConfig struct {

// Init - Assign global config to decoded config struct
func Init() error {
logger, _ := logger.GetZapLogger()

k := koanf.New(".")
parser := yaml.Parser()

Expand All @@ -123,7 +121,7 @@ func Init() error {
flag.Parse()

if err := k.Load(file.Provider(*fileRelativePath), parser); err != nil {
logger.Fatal(err.Error())
log.Fatal(err.Error())
}

if err := k.Load(env.ProviderWithValue("CFG_", ".", func(s string, v string) (string, interface{}) {
Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ server:
- https://instill.tech
edition: local-ce:dev
disableusage: false
debug: false
database:
username: postgres
password: password
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ services:
CFG_DATABASE_PORT: 5432
CFG_DATABASE_USERNAME: postgres
CFG_DATABASE_PASSWORD: password
CFG_USAGEBACKEND_HOST: usage-backend-cloud
CFG_USAGEBACKEND_HOST: usage_backend_cloud
CFG_USAGEBACKEND_PORT: 8084
ports:
- 8080:8080
Expand Down Expand Up @@ -93,7 +93,7 @@ services:
CFG_CONNECTORBACKEND_PORT: 8082
CFG_MODELBACKEND_HOST: model_backend
CFG_MODELBACKEND_PORT: 8083
CFG_USAGEBACKEND_HOST: usage-backend-cloud
CFG_USAGEBACKEND_HOST: usage_backend_cloud
CFG_USAGEBACKEND_PORT: 8084
ports:
- 8081:8081
Expand Down Expand Up @@ -209,7 +209,7 @@ services:
CFG_DATABASE_PORT: 5432
CFG_DATABASE_USERNAME: postgres
CFG_DATABASE_PASSWORD: password
CFG_TRITONSERVER_GRPCURI: triton-server:8001
CFG_TRITONSERVER_GRPCURI: triton_server:8001
CFG_TRITONSERVER_MODELSTORE: /model-repository
ports:
- 8083:8083
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.3
github.com/iancoleman/strcase v0.2.0
github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220606124550-53f79f9c7d74
github.com/instill-ai/usage-client v0.0.0-20220606142220-c17424a565e4
github.com/instill-ai/usage-client v0.0.0-20220607201439-d646c37f5b02
github.com/instill-ai/x v0.1.0-alpha.0.20220604235252-39fcffc82edb
github.com/knadh/koanf v1.4.0
github.com/mennanov/fieldmask-utils v0.5.0
github.com/rs/cors v1.8.2
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.2
go.temporal.io/sdk v1.13.1
go.uber.org/zap v1.21.0
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
Expand Down Expand Up @@ -69,5 +69,5 @@ require (
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/genproto v0.0.0-20220317150908-0efb43f6373e // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,8 @@ github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220606124550-53f79f9c7d74 h1:daR/0QKiOy/y1tnrg0Gdgt6Hsf5Im7Zxmu8c4SLwVls=
github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220606124550-53f79f9c7d74/go.mod h1:d9ebEdwMX2Las4OScym45qbQM+xcBQITqvq/8anTVas=
github.com/instill-ai/usage-client v0.0.0-20220606142220-c17424a565e4 h1:yJ34x5zUKawwslRAPug8Ad6rziIvhheITQvz7auNyf8=
github.com/instill-ai/usage-client v0.0.0-20220606142220-c17424a565e4/go.mod h1:ibsksgd0nzO7zHom5qVa9lmb9Y4XuSF8JMjBpkdGW6s=
github.com/instill-ai/usage-client v0.0.0-20220607201439-d646c37f5b02 h1:7dhRYHERy+NbvESpaQ0NPOo3CiiDpvLsARR90Ftkiqw=
github.com/instill-ai/usage-client v0.0.0-20220607201439-d646c37f5b02/go.mod h1:saH0H46iHHMxBx+znN3CoE4IOylbTlpQUPj0Do06yKo=
github.com/instill-ai/x v0.1.0-alpha.0.20220604235252-39fcffc82edb h1:70AJVfr463jWkgPQ1w281zsQ1LK/tOW5INTNc+yOBsI=
github.com/instill-ai/x v0.1.0-alpha.0.20220604235252-39fcffc82edb/go.mod h1:uMgeUs+q+Bjr43AsYb3QRsFX2Ebxhr74seM8lNdbdLA=
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
Expand Down Expand Up @@ -969,8 +969,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down Expand Up @@ -1597,8 +1598,9 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.0.8/go.mod h1:4eOzrI1MUfm6ObJU/UcmbXyiHSs8jSwH95G5P5dxcAg=
gorm.io/driver/postgres v1.3.6 h1:Q0iLoYvWwsJVpYQrSrY5p5P4YzW7fJjFMBG2sa4Bz5U=
gorm.io/driver/postgres v1.3.6/go.mod h1:f02ympjIcgtHEGFMZvdgTxODZ9snAHDb4hXfigBVuNI=
Expand Down
10 changes: 8 additions & 2 deletions internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ import (
"sync"

"go.uber.org/zap"

"github.com/instill-ai/pipeline-backend/config"
)

var logger *zap.Logger
var once sync.Once

// GetZapLogger returns an instance of zap logger
func GetZapLogger() (*zap.Logger, error) {
var err error
once.Do(func() {
logger, err = zap.NewDevelopment()
// logger, err = zap.NewProduction()
if config.Config.Server.Debug {
logger, err = zap.NewDevelopment()
} else {
logger, err = zap.NewProduction()
}
})

return logger, err
Expand Down
4 changes: 2 additions & 2 deletions internal/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func GetCollectionID(name string) (string, error) {
return colID, nil
}

// GetNameID returns the resource ID given a resource name
func GetNameID(name string) (string, error) {
// GetRscNameID returns the resource ID given a resource name
func GetRscNameID(name string) (string, error) {
id := name[strings.LastIndex(name, "/")+1:]
if id == "" {
return "", status.Errorf(codes.InvalidArgument, "Error when extract resource id from resource name `%s`", name)
Expand Down
12 changes: 6 additions & 6 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (h *handler) GetPipeline(ctx context.Context, req *pipelinePB.GetPipelineRe
return &pipelinePB.GetPipelineResponse{}, err
}

id, err := resource.GetNameID(req.GetName())
id, err := resource.GetRscNameID(req.GetName())
if err != nil {
return &pipelinePB.GetPipelineResponse{}, err
}
Expand Down Expand Up @@ -293,7 +293,7 @@ func (h *handler) ActivatePipeline(ctx context.Context, req *pipelinePB.Activate
return &pipelinePB.ActivatePipelineResponse{}, err
}

id, err := resource.GetNameID(req.GetName())
id, err := resource.GetRscNameID(req.GetName())
if err != nil {
return &pipelinePB.ActivatePipelineResponse{}, err
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (h *handler) DeactivatePipeline(ctx context.Context, req *pipelinePB.Deacti
return &pipelinePB.DeactivatePipelineResponse{}, err
}

id, err := resource.GetNameID(req.GetName())
id, err := resource.GetRscNameID(req.GetName())
if err != nil {
return &pipelinePB.DeactivatePipelineResponse{}, err
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func (h *handler) RenamePipeline(ctx context.Context, req *pipelinePB.RenamePipe
return &pipelinePB.RenamePipelineResponse{}, err
}

id, err := resource.GetNameID(req.GetName())
id, err := resource.GetRscNameID(req.GetName())
if err != nil {
return &pipelinePB.RenamePipelineResponse{}, err
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (h *handler) TriggerPipeline(ctx context.Context, req *pipelinePB.TriggerPi
return &pipelinePB.TriggerPipelineResponse{}, err
}

id, err := resource.GetNameID(req.GetName())
id, err := resource.GetRscNameID(req.GetName())
if err != nil {
return &pipelinePB.TriggerPipelineResponse{}, err
}
Expand Down Expand Up @@ -433,7 +433,7 @@ func (h *handler) TriggerPipelineBinaryFileUpload(stream pipelinePB.PipelineServ
return status.Error(codes.InvalidArgument, err.Error())
}

id, err := resource.GetNameID(data.GetName())
id, err := resource.GetRscNameID(data.GetName())
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit bf0d3db

Please sign in to comment.