From b15c5b92e2ef96810a120e2ffea3388cd96197d2 Mon Sep 17 00:00:00 2001 From: Daniel Chalef <131175+danielchalef@users.noreply.github.com> Date: Sun, 12 Nov 2023 08:54:00 -0800 Subject: [PATCH] feat: opentelemetry tracing (#271) * otel implementation * error if collector endpoint not configured * refacvtor http calls; add telemetry; refactor tests * reverse out openai timeouts --- .cursorignore | 1 + cmd/zep/run.go | 58 ++++++++++++++++++++++++-- config.yaml | 4 ++ config/models.go | 5 +++ go.mod | 29 +++++++++++-- go.sum | 76 +++++++++++++++++++++++++++++++--- pkg/llms/embeddings_local.go | 8 ++-- pkg/llms/llm_anthropic.go | 17 +++++--- pkg/llms/llm_anthropic_test.go | 9 +++- pkg/llms/llm_base.go | 72 +++++++++++++++++++++++++++++++- pkg/llms/llm_openai.go | 28 ++++++++----- pkg/llms/llm_openai_test.go | 17 ++++++-- pkg/server/routes.go | 26 ++++++++---- pkg/store/postgres/schema.go | 3 ++ pkg/tasks/http.go | 35 ++++++++++++++++ pkg/tasks/ner.go | 65 ++++++++++++++++------------- pkg/tasks/router.go | 5 +++ pkg/tasks/sql_queue.go | 8 +++- 18 files changed, 386 insertions(+), 80 deletions(-) create mode 100644 .cursorignore create mode 100644 pkg/tasks/http.go diff --git a/.cursorignore b/.cursorignore new file mode 100644 index 00000000..fb7f0c4f --- /dev/null +++ b/.cursorignore @@ -0,0 +1 @@ +test_data \ No newline at end of file diff --git a/cmd/zep/run.go b/cmd/zep/run.go index 0db92035..d4cd591d 100644 --- a/cmd/zep/run.go +++ b/cmd/zep/run.go @@ -22,12 +22,19 @@ import ( "github.com/getzep/zep/pkg/llms" "github.com/getzep/zep/pkg/models" "github.com/getzep/zep/pkg/server" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) const ( - ErrStoreTypeNotSet = "store.type must be set" - ErrPostgresDSNNotSet = "store.postgres.dsn must be set" - StoreTypePostgres = "postgres" + ErrStoreTypeNotSet = "store.type must be set" + ErrPostgresDSNNotSet = "store.postgres.dsn must be set" + ErrOtelEnabledButExporterEmpty = "OpenTelemtry is enabled but OTEL_EXPORTER_OTLP_ENDPOINT is not set" + StoreTypePostgres = "postgres" ) // run is the entrypoint for the zep server @@ -44,6 +51,16 @@ func run() { config.SetLogLevel(cfg) appState := NewAppState(cfg) + if cfg.OpenTelemetry.Enabled { + cleanup := initTracer() + defer func() { + err := cleanup(context.Background()) + if err != nil { + log.Errorf("Failed to cleanup tracer: %v", err) + } + }() + } + srv := server.Create(appState) log.Infof("Listening on: %s", srv.Addr) @@ -52,7 +69,7 @@ func run() { } err = srv.ListenAndServe() if err != nil { - log.Fatal(err) + log.Panic(err) } } @@ -226,3 +243,36 @@ func dumpConfigToJSON(cfg *config.Config) string { return string(b) } + +func initTracer() func(context.Context) error { + if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") == "" { + log.Fatal(ErrOtelEnabledButExporterEmpty) + } + exporter, err := otlptrace.New( + context.Background(), + otlptracehttp.NewClient(), + ) + if err != nil { + log.Fatal(err) + } + + resources, err := resource.New(context.Background(), + resource.WithFromEnv(), + resource.WithProcess(), + resource.WithOS(), + resource.WithContainer(), + resource.WithHost(), + ) + if err != nil { + log.Fatal(err) + } + + otel.SetTracerProvider( + sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(resources), + ), + ) + return exporter.Shutdown +} diff --git a/config.yaml b/config.yaml index 255b554b..1f5b2d0d 100644 --- a/config.yaml +++ b/config.yaml @@ -76,6 +76,10 @@ data: purge_every: 60 log: level: "info" +opentelemetry: + enabled: false + endpoint: + attributes: {} # Custom Prompts Configuration # Allows customization of extractor prompts. custom_prompts: diff --git a/config/models.go b/config/models.go index b58b1466..0a1c41d0 100644 --- a/config/models.go +++ b/config/models.go @@ -11,6 +11,7 @@ type Config struct { Server ServerConfig `mapstructure:"server"` Log LogConfig `mapstructure:"log"` Auth AuthConfig `mapstructure:"auth"` + OpenTelemetry OpenTelemetryConfig `mapstructure:"opentelemetry"` DataConfig DataConfig `mapstructure:"data"` Development bool `mapstructure:"development"` CustomPrompts CustomPromptsConfig `mapstructure:"custom_prompts"` @@ -66,6 +67,10 @@ type LogConfig struct { Level string `mapstructure:"level"` } +type OpenTelemetryConfig struct { + Enabled bool `mapstructure:"enabled"` +} + type AuthConfig struct { Secret string `mapstructure:"secret"` Required bool `mapstructure:"required"` diff --git a/go.mod b/go.mod index 8e4c275b..87972bcd 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ toolchain go1.21.3 require ( dario.cat/mergo v1.0.0 - github.com/avast/retry-go/v4 v4.5.0 github.com/brianvoe/gofakeit/v6 v6.23.2 github.com/chi-middleware/logrus-logger v0.2.0 github.com/go-chi/chi/v5 v5.0.10 @@ -40,11 +39,20 @@ require ( github.com/invopop/jsonschema v0.12.0 github.com/jackc/pgx/v5 v5.4.3 github.com/ma-hartma/watermill-logrus-adapter v0.0.0-20220319171828-0856b297f1c2 - github.com/sony/gobreaker v0.5.0 + github.com/riandyrn/otelchi v0.5.1 github.com/tmc/langchaingo v0.0.0-20230929160525-e16b77704b8d github.com/uptrace/bun/dbfixture v1.1.16 github.com/uptrace/bun/extra/bundebug v1.1.16 + github.com/uptrace/bun/extra/bunotel v1.1.16 github.com/viterin/vek v0.4.2 + github.com/voi-oss/watermill-opentelemetry v0.1.3 + go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.0 + go.opentelemetry.io/otel v1.20.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 + go.opentelemetry.io/otel/sdk v1.20.0 + go.opentelemetry.io/otel/trace v1.20.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -54,13 +62,17 @@ require ( github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/chewxy/math32 v1.10.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/dlclark/regexp2 v1.10.0 // indirect github.com/fatih/color v1.15.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect + github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.20.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/spec v0.20.9 // indirect @@ -68,6 +80,8 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/goccy/go-json v0.10.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect @@ -99,6 +113,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect + github.com/sony/gobreaker v0.5.0 // indirect github.com/spf13/afero v1.10.0 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -106,16 +121,24 @@ require ( github.com/subosito/gotenv v1.6.0 // indirect github.com/sv-tools/openapi v0.2.2 // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect + github.com/uptrace/opentelemetry-go-extra/otelsql v0.2.2 // indirect github.com/viterin/partial v1.1.0 // indirect github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect + go.opentelemetry.io/contrib v1.0.0 // indirect + go.opentelemetry.io/otel/metric v1.20.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.13.0 // indirect + golang.org/x/sys v0.14.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.14.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect + google.golang.org/grpc v1.58.2 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect mellium.im/sasl v0.3.1 // indirect diff --git a/go.sum b/go.sum index f0df3def..c5af8452 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0 h1:wswlLYY0Jc0tloj3lty4Y+VTEA8A github.com/ThreeDotsLabs/watermill-sql/v2 v2.0.0/go.mod h1:83l/4sKaLHwoHJlrAsDLaXcHN+QOHHntAAyabNmiuO4= github.com/alecthomas/chroma v0.10.0 h1:7XDcGkCQopCNKjZHfYrNLraA+M7e0fMiJ/Mfikbfjek= github.com/alecthomas/chroma v0.10.0/go.mod h1:jtJATyUxlIORhUOFNA9NZDWGAQ8wpxQQqNSB4rjA/1s= -github.com/avast/retry-go/v4 v4.5.0 h1:QoRAZZ90cj5oni2Lsgl2GW8mNTnUCnmpx/iKpwVisHg= -github.com/avast/retry-go/v4 v4.5.0/go.mod h1:7hLEXp0oku2Nir2xBAsg0PTphp9z71bN5Aq1fboC3+I= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/brianvoe/gofakeit/v6 v6.23.2 h1:lVde18uhad5wII/f5RMVFLtdQNE0HaGFuBUXmYKk8i8= @@ -63,6 +61,8 @@ github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMU github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chewxy/math32 v1.10.1 h1:LFpeY0SLJXeaiej/eIp2L40VYfscTvKh/FSEZ68uMkU= github.com/chewxy/math32 v1.10.1/go.mod h1:dOB2rcuFrCn6UHrze36WSLVPKtzPMRAQvBvUwkSsLqs= @@ -96,6 +96,9 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= +github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= @@ -105,6 +108,7 @@ github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9 github.com/getzep/sprig/v3 v3.0.0-20230930153539-1d7fce7d845e h1:QTkS0+mTOqmEkQYr9SN8Myk9mj4/u6xm5rd8SouCDKo= github.com/getzep/sprig/v3 v3.0.0-20230930153539-1d7fce7d845e/go.mod h1:t6K8Y1yWCIYJgReS39eaJ7C2RfUH8d05eMdXlK+tJp8= github.com/go-chi/chi/v5 v5.0.1/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-chi/chi/v5 v5.0.8/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk= github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-chi/jwtauth/v5 v5.1.1 h1:Pjixqu5YkjE9sCLpzE01L0Q4sQzJIPdo7uz9r8ftp/c= @@ -112,6 +116,14 @@ github.com/go-chi/jwtauth/v5 v5.1.1/go.mod h1:CYP1WSbzD4MPuKCr537EM3kfFhSQgpUEtM github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= @@ -146,6 +158,8 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= +github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -170,6 +184,9 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -181,8 +198,10 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -205,6 +224,8 @@ github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -326,6 +347,8 @@ github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYde github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/riandyrn/otelchi v0.5.1 h1:0/45omeqpP7f/cvdL16GddQBfAEmZvUyl2QzLSE6uYo= +github.com/riandyrn/otelchi v0.5.1/go.mod h1:ZxVxNEl+jQ9uHseRYIxKWRb3OY8YXFEu+EkNiiSNUEA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= @@ -389,6 +412,10 @@ github.com/uptrace/bun/driver/pgdriver v1.1.16 h1:b/NiSXk6Ldw7KLfMLbOqIkm4odHd7Q github.com/uptrace/bun/driver/pgdriver v1.1.16/go.mod h1:Rmfbc+7lx1z/umjMyAxkOHK81LgnGj71XC5YpA6k1vU= github.com/uptrace/bun/extra/bundebug v1.1.16 h1:SgicRQGtnjhrIhlYOxdkOm1Em4s6HykmT3JblHnoTBM= github.com/uptrace/bun/extra/bundebug v1.1.16/go.mod h1:SkiOkfUirBiO1Htc4s5bQKEq+JSeU1TkBVpMsPz2ePM= +github.com/uptrace/bun/extra/bunotel v1.1.16 h1:qkLTaTZK3FZk3b2P/stO/krS7KX9Fq5wSOj7Hlb2HG8= +github.com/uptrace/bun/extra/bunotel v1.1.16/go.mod h1:JwEH0kdXFnzYuK8D6eXUrf9HKsYy5wmB+lqQ/+dvH4E= +github.com/uptrace/opentelemetry-go-extra/otelsql v0.2.2 h1:USRngIQppxeyb39XzkVHXwQesKK0+JSwnHE/1c7fgic= +github.com/uptrace/opentelemetry-go-extra/otelsql v0.2.2/go.mod h1:1frv9RN1rlTq0jzCq+mVuEQisubZCQ4OU6S/8CaHzGY= github.com/viterin/partial v1.1.0 h1:iH1l1xqBlapXsYzADS1dcbizg3iQUKTU1rbwkHv/80E= github.com/viterin/partial v1.1.0/go.mod h1:oKGAo7/wylWkJTLrWX8n+f4aDPtQMQ6VG4dd2qur5QA= github.com/viterin/vek v0.4.2 h1:Vyv04UjQT6gcjEFX82AS9ocgNbAJqsHviheIBdPlv5U= @@ -402,6 +429,8 @@ github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vb github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/voi-oss/watermill-opentelemetry v0.1.3 h1:AvVx249n1sG5ytwJ73qhTsti7Y+8J5F5/UOtyrtYjS4= +github.com/voi-oss/watermill-opentelemetry v0.1.3/go.mod h1:/CQsSCe3Ki3UKXth6B6UlLj4zvf3i2b3t4dJJ0+HEdA= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -415,6 +444,29 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +go.opentelemetry.io/contrib v1.0.0 h1:khwDCxdSspjOLmFnvMuSHd/5rPzbTx0+l6aURwtQdfE= +go.opentelemetry.io/contrib v1.0.0/go.mod h1:EH4yDYeNoaTqn/8yCWQmfNB78VHfGX2Jt2bvnvzBlGM= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.0 h1:WUMhXWqLmFlznidWF4B9iML8VMdZy4TzJVYzdYTCuaM= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.0/go.mod h1:H1XIOXyXFff1aZa7nQeFHGYMB+gHH1TtZSti37uHX6o= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.0 h1:1eHu3/pUSWaOgltNK3WJFaywKsTIr/PwvHyDmi0lQA0= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.0/go.mod h1:HyABWq60Uy1kjJSa2BVOxUVao8Cdick5AWSKPutqy6U= +go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs= +go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc= +go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= +go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA= +go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM= +go.opentelemetry.io/otel/sdk v1.3.0/go.mod h1:rIo4suHNhQwBIPg9axF8V9CA72Wz2mKF1teNrup8yzs= +go.opentelemetry.io/otel/sdk v1.20.0 h1:5Jf6imeFZlZtKv9Qbo6qt2ZkmWtdWx/wzcCbNUlAWGM= +go.opentelemetry.io/otel/sdk v1.20.0/go.mod h1:rmkSx1cZCm/tn16iWDn1GQbLtsW/LvsdEEFzCSRM6V0= +go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk= +go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ= +go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -574,8 +626,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= @@ -717,6 +769,12 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 h1:Z0hjGZePRE0ZBWotvtrwxFNrNE9CUAGtplaDK5NNI/g= +google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02OqCJTD0E1OiQy1F72PWFB4bZJ87cAtLPYgDR0= +google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 h1:FmF5cCW94Ij59cfpoLiwTgodWmm60eEV0CjlsVg2fuw= +google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -733,6 +791,8 @@ google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= +google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -743,6 +803,10 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/llms/embeddings_local.go b/pkg/llms/embeddings_local.go index 373db7ff..32ee2795 100644 --- a/pkg/llms/embeddings_local.go +++ b/pkg/llms/embeddings_local.go @@ -9,8 +9,6 @@ import ( "net/http" "time" - "github.com/hashicorp/go-retryablehttp" - "github.com/getzep/zep/pkg/models" ) @@ -84,12 +82,12 @@ func makeEmbedRequest(ctx context.Context, url string, jsonBody []byte) ([]byte, ctx, cancel := context.WithTimeout(ctx, LocalEmbedderTimeout) defer cancel() - retryableHTTPClient := NewRetryableHTTPClient( + httpClient := NewRetryableHTTPClient( MaxLocalEmbedderRetryAttempts, LocalEmbedderTimeout, ) - req, err := retryablehttp.NewRequestWithContext( + req, err := http.NewRequestWithContext( ctx, http.MethodPost, url, @@ -100,7 +98,7 @@ func makeEmbedRequest(ctx context.Context, url string, jsonBody []byte) ([]byte, } req.Header.Set("Content-Type", "application/json") - resp, err := retryableHTTPClient.Do(req) + resp, err := httpClient.Do(req) if err != nil { log.Error("Error making POST request:", err) return nil, err diff --git a/pkg/llms/llm_anthropic.go b/pkg/llms/llm_anthropic.go index 7d4bc8b3..65c8236b 100644 --- a/pkg/llms/llm_anthropic.go +++ b/pkg/llms/llm_anthropic.go @@ -18,8 +18,12 @@ const AnthropicAPIKeyNotSetError = "ZEP_ANTHROPIC_API_KEY is not set" //nolint:g var _ models.ZepLLM = &ZepAnthropicLLM{} -func NewAnthropicLLM(ctx context.Context, cfg *config.Config) (*ZepAnthropicLLM, error) { - zllm := &ZepAnthropicLLM{} +func NewAnthropicLLM(ctx context.Context, cfg *config.Config) (models.ZepLLM, error) { + zllm := &ZepLLM{ + llm: &ZepAnthropicLLM{ + cfg: cfg, + }, + } err := zllm.Init(ctx, cfg) if err != nil { return nil, err @@ -28,7 +32,8 @@ func NewAnthropicLLM(ctx context.Context, cfg *config.Config) (*ZepAnthropicLLM, } type ZepAnthropicLLM struct { - llm *anthropic.LLM + client *anthropic.LLM + cfg *config.Config } func (zllm *ZepAnthropicLLM) Init(_ context.Context, cfg *config.Config) error { @@ -42,7 +47,7 @@ func (zllm *ZepAnthropicLLM) Init(_ context.Context, cfg *config.Config) error { if err != nil { return err } - zllm.llm = llm + zllm.client = llm return nil } @@ -52,7 +57,7 @@ func (zllm *ZepAnthropicLLM) Call(ctx context.Context, options ...llms.CallOption, ) (string, error) { // If the LLM is not initialized, return an error - if zllm.llm == nil { + if zllm.client == nil { return "", NewLLMError(InvalidLLMModelError, nil) } @@ -65,7 +70,7 @@ func (zllm *ZepAnthropicLLM) Call(ctx context.Context, prompt = "Human: " + prompt + "\nAssistant:" - completion, err := zllm.llm.Call(thisCtx, prompt, options...) + completion, err := zllm.client.Call(thisCtx, prompt, options...) if err != nil { return "", err } diff --git a/pkg/llms/llm_anthropic_test.go b/pkg/llms/llm_anthropic_test.go index ba4363f7..2c2f07d8 100644 --- a/pkg/llms/llm_anthropic_test.go +++ b/pkg/llms/llm_anthropic_test.go @@ -21,7 +21,14 @@ func TestZepAnthropicLLM_Init(t *testing.T) { zllm, err := NewAnthropicLLM(context.Background(), cfg) assert.NoError(t, err, "Expected no error from NewAnthropicLLM") - assert.NotNil(t, zllm.llm, "Expected llm to be initialized") + + z, ok := zllm.(*ZepLLM) + assert.True(t, ok, "Expected ZepLLM") + assert.NotNil(t, z.llm, "Expected llm to be initialized") + + a, ok := z.llm.(*ZepAnthropicLLM) + assert.True(t, ok, "Expected ZepOpenAILLM") + assert.NotNil(t, a.client, "Expected client to be initialized") } func TestZepAnthropicLLM_Call(t *testing.T) { diff --git a/pkg/llms/llm_base.go b/pkg/llms/llm_base.go index 398beaa6..f7564927 100644 --- a/pkg/llms/llm_base.go +++ b/pkg/llms/llm_base.go @@ -4,9 +4,17 @@ import ( "context" "fmt" "net/http" + "net/http/httptrace" "time" + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + + "go.opentelemetry.io/otel" + "github.com/getzep/zep/pkg/models" + "github.com/tmc/langchaingo/llms" + "go.opentelemetry.io/otel/trace" "github.com/hashicorp/go-retryablehttp" @@ -17,6 +25,7 @@ import ( const DefaultTemperature = 0.0 const InvalidLLMModelError = "llm model is not set or is invalid" +const OtelLLMTracerName = "llm" var log = internal.GetLogger() @@ -80,6 +89,56 @@ func NewLLMClient(ctx context.Context, cfg *config.Config) (models.ZepLLM, error } } +var _ models.ZepLLM = &ZepLLM{} + +// ZepLLM is a wrapper around the Zep LLM implementations that implements the +// ZepLLM interface and adds OpenTelemetry tracing +type ZepLLM struct { + llm models.ZepLLM + tracer trace.Tracer +} + +func (zllm *ZepLLM) Call(ctx context.Context, + prompt string, + options ...llms.CallOption, +) (string, error) { + ctx, span := zllm.tracer.Start(ctx, "llm.Call") + defer span.End() + + result, err := zllm.llm.Call(ctx, prompt, options...) + if err != nil { + span.RecordError(err) + return "", err + } + + return result, err +} + +func (zllm *ZepLLM) EmbedTexts(ctx context.Context, texts []string) ([][]float32, error) { + ctx, span := zllm.tracer.Start(ctx, "llm.EmbedTexts") + defer span.End() + + result, err := zllm.llm.EmbedTexts(ctx, texts) + if err != nil { + span.RecordError(err) + return nil, err + } + + return result, err +} + +func (zllm *ZepLLM) GetTokenCount(text string) (int, error) { + return zllm.llm.GetTokenCount(text) +} + +func (zllm *ZepLLM) Init(ctx context.Context, cfg *config.Config) error { + // set up tracing + tracer := otel.Tracer(OtelLLMTracerName) + zllm.tracer = tracer + + return zllm.llm.Init(ctx, cfg) +} + type LLMError struct { message string originalError error @@ -144,7 +203,7 @@ func Float64ToFloat32Matrix(in [][]float64) [][]float32 { return out } -func NewRetryableHTTPClient(retryMax int, timeout time.Duration) *retryablehttp.Client { +func NewRetryableHTTPClient(retryMax int, timeout time.Duration) *http.Client { retryableHTTPClient := retryablehttp.NewClient() retryableHTTPClient.RetryMax = retryMax retryableHTTPClient.HTTPClient.Timeout = timeout @@ -152,7 +211,16 @@ func NewRetryableHTTPClient(retryMax int, timeout time.Duration) *retryablehttp. retryableHTTPClient.Backoff = retryablehttp.DefaultBackoff retryableHTTPClient.CheckRetry = retryPolicy - return retryableHTTPClient + httpClient := &http.Client{ + Transport: otelhttp.NewTransport( + retryableHTTPClient.StandardClient().Transport, + otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx) + }), + ), + } + + return httpClient } // retryPolicy is a retryablehttp.CheckRetry function. It is used to determine diff --git a/pkg/llms/llm_openai.go b/pkg/llms/llm_openai.go index ee77bfbf..3ecb6bd0 100644 --- a/pkg/llms/llm_openai.go +++ b/pkg/llms/llm_openai.go @@ -20,8 +20,12 @@ const MaxOpenAIAPIRequestAttempts = 5 var _ models.ZepLLM = &ZepOpenAILLM{} -func NewOpenAILLM(ctx context.Context, cfg *config.Config) (*ZepOpenAILLM, error) { - zllm := &ZepOpenAILLM{} +func NewOpenAILLM(ctx context.Context, cfg *config.Config) (models.ZepLLM, error) { + zllm := &ZepLLM{ + llm: &ZepOpenAILLM{ + cfg: cfg, + }, + } err := zllm.Init(ctx, cfg) if err != nil { return nil, err @@ -30,8 +34,9 @@ func NewOpenAILLM(ctx context.Context, cfg *config.Config) (*ZepOpenAILLM, error } type ZepOpenAILLM struct { - llm *openai.Chat - tkm *tiktoken.Tiktoken + client *openai.Chat + cfg *config.Config + tkm *tiktoken.Tiktoken } func (zllm *ZepOpenAILLM) Init(_ context.Context, cfg *config.Config) error { @@ -53,7 +58,7 @@ func (zllm *ZepOpenAILLM) Init(_ context.Context, cfg *config.Config) error { if err != nil { return err } - zllm.llm = llm + zllm.client = llm return nil } @@ -63,7 +68,7 @@ func (zllm *ZepOpenAILLM) Call(ctx context.Context, options ...llms.CallOption, ) (string, error) { // If the LLM is not initialized, return an error - if zllm.llm == nil { + if zllm.client == nil { return "", NewLLMError(InvalidLLMModelError, nil) } @@ -76,7 +81,7 @@ func (zllm *ZepOpenAILLM) Call(ctx context.Context, messages := []schema.ChatMessage{schema.SystemChatMessage{Content: prompt}} - completion, err := zllm.llm.Call(thisCtx, messages, options...) + completion, err := zllm.client.Call(thisCtx, messages, options...) if err != nil { return "", err } @@ -86,14 +91,14 @@ func (zllm *ZepOpenAILLM) Call(ctx context.Context, func (zllm *ZepOpenAILLM) EmbedTexts(ctx context.Context, texts []string) ([][]float32, error) { // If the LLM is not initialized, return an error - if zllm.llm == nil { + if zllm.client == nil { return nil, NewLLMError(InvalidLLMModelError, nil) } thisCtx, cancel := context.WithTimeout(ctx, OpenAIAPITimeout) defer cancel() - embeddings, err := zllm.llm.CreateEmbedding(thisCtx, texts) + embeddings, err := zllm.client.CreateEmbedding(thisCtx, texts) if err != nil { return nil, NewLLMError("error while creating embedding", err) } @@ -117,12 +122,13 @@ func (zllm *ZepOpenAILLM) configureClient(cfg *config.Config) ([]openai.Option, log.Fatal("only one of AzureOpenAIEndpoint or OpenAIEndpoint can be set") } - retryableHTTPClient := NewRetryableHTTPClient(MaxOpenAIAPIRequestAttempts, OpenAIAPITimeout) + // Set up the HTTP client and config OpenTelemetry wrapper + httpClient := NewRetryableHTTPClient(MaxOpenAIAPIRequestAttempts, OpenAIAPITimeout) options := make([]openai.Option, 0) options = append( options, - openai.WithHTTPClient(retryableHTTPClient.StandardClient()), + openai.WithHTTPClient(httpClient), openai.WithModel(cfg.LLM.Model), openai.WithToken(apiKey), ) diff --git a/pkg/llms/llm_openai_test.go b/pkg/llms/llm_openai_test.go index 4d4f15d8..7a2fc0c2 100644 --- a/pkg/llms/llm_openai_test.go +++ b/pkg/llms/llm_openai_test.go @@ -19,12 +19,21 @@ func TestZepOpenAILLM_Init(t *testing.T) { }, } - zllm := &ZepOpenAILLM{} + zllm, err := NewOpenAILLM(context.Background(), cfg) + assert.NoError(t, err, "Expected no error from NewOpenAILLM") - err := zllm.Init(context.Background(), cfg) + err = zllm.Init(context.Background(), cfg) assert.NoError(t, err, "Expected no error from Init") - assert.NotNil(t, zllm.llm, "Expected llm to be initialized") - assert.NotNil(t, zllm.tkm, "Expected tkm to be initialized") + + z, ok := zllm.(*ZepLLM) + assert.True(t, ok, "Expected ZepLLM") + + assert.NotNil(t, z.llm, "Expected client to be initialized") + + o, ok := z.llm.(*ZepOpenAILLM) + assert.True(t, ok, "Expected ZepOpenAILLM") + assert.NotNil(t, o.client, "Expected tkm to be initialized") + assert.NotNil(t, o.tkm, "Expected tkm to be initialized") } func TestZepOpenAILLM_TestConfigureClient(t *testing.T) { diff --git a/pkg/server/routes.go b/pkg/server/routes.go index 621b5b80..1276c48c 100644 --- a/pkg/server/routes.go +++ b/pkg/server/routes.go @@ -6,6 +6,7 @@ import ( "time" "github.com/getzep/zep/pkg/web" + "github.com/riandyrn/otelchi" "github.com/getzep/zep/internal" @@ -22,6 +23,8 @@ import ( ) const ReadHeaderTimeout = 5 * time.Second +const OtelTracerNamer = "chi-server" +const RouterName = "router" var log = internal.GetLogger() @@ -54,14 +57,21 @@ func setupRouter(appState *models.AppState) *chi.Mux { } router := chi.NewRouter() - router.Use(httpLogger.Logger("router", log)) - router.Use(middleware.RequestSize(maxRequestSize)) - router.Use(middleware.Recoverer) - router.Use(middleware.RequestID) - router.Use(middleware.RealIP) - router.Use(middleware.CleanPath) - router.Use(SendVersion) - router.Use(middleware.Heartbeat("/healthz")) + router.Use( + httpLogger.Logger(RouterName, log), + otelchi.Middleware( + RouterName, + otelchi.WithChiRoutes(router), + otelchi.WithRequestMethodInSpanName(true), + ), + middleware.RequestSize(maxRequestSize), + middleware.Recoverer, + middleware.RequestID, + middleware.RealIP, + middleware.CleanPath, + SendVersion, + middleware.Heartbeat("/healthz"), + ) // Only setup web routes if enabled if appState.Config.Server.WebEnabled { diff --git a/pkg/store/postgres/schema.go b/pkg/store/postgres/schema.go index da93e5f1..71e02605 100644 --- a/pkg/store/postgres/schema.go +++ b/pkg/store/postgres/schema.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/uptrace/bun/extra/bunotel" + "github.com/getzep/zep/pkg/store/postgres/migrations" "github.com/Masterminds/semver/v3" @@ -657,6 +659,7 @@ func NewPostgresConn(appState *models.AppState) (*bun.DB, error) { sqldb.SetMaxIdleConns(maxOpenConns) db := bun.NewDB(sqldb, pgdialect.New()) + db.AddQueryHook(bunotel.NewQueryHook(bunotel.WithDBName("zep"))) // Enable pgvector extension err := enablePgVectorExtension(ctx, db) diff --git a/pkg/tasks/http.go b/pkg/tasks/http.go new file mode 100644 index 00000000..84cdb9f8 --- /dev/null +++ b/pkg/tasks/http.go @@ -0,0 +1,35 @@ +package tasks + +import ( + "context" + "net/http" + "net/http/httptrace" + "time" + + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + + "github.com/hashicorp/go-retryablehttp" +) + +// NewRetryableHTTPClient returns a new retryable HTTP client with the given retryMax and timeout. +// The retryable HTTP transport is wrapped in an OpenTelemetry transport. +func NewRetryableHTTPClient(retryMax int, timeout time.Duration) *http.Client { + retryableHTTPClient := retryablehttp.NewClient() + retryableHTTPClient.RetryMax = retryMax + retryableHTTPClient.HTTPClient.Timeout = timeout + retryableHTTPClient.Logger = log + retryableHTTPClient.Backoff = retryablehttp.DefaultBackoff + retryableHTTPClient.CheckRetry = retryablehttp.DefaultRetryPolicy + + httpClient := &http.Client{ + Transport: otelhttp.NewTransport( + retryableHTTPClient.StandardClient().Transport, + otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx) + }), + ), + } + + return httpClient +} diff --git a/pkg/tasks/ner.go b/pkg/tasks/ner.go index 9913366c..ad2cc8f0 100644 --- a/pkg/tasks/ner.go +++ b/pkg/tasks/ner.go @@ -9,13 +9,15 @@ import ( "net/http" "time" - "github.com/avast/retry-go/v4" "github.com/getzep/zep/internal" "github.com/getzep/zep/pkg/models" ) +const NerRetryMax = 3 +const NerTimeout = 10 * time.Second + func callNERTask( - _ context.Context, + ctx context.Context, appState *models.AppState, texts []models.TextData, ) (models.EntityResponse, error) { @@ -38,38 +40,43 @@ func callNERTask( return models.EntityResponse{}, err } - var resp *http.Response var bodyBytes []byte var response models.EntityResponse - // Retry POST request to entity extractor 3 times with 1 second delay. - err = retry.Do( - func() error { - var err error - resp, err = http.Post(url, "application/json", bytes.NewBuffer(jsonBody)) //nolint:gosec - if err != nil { - log.Error("Error making POST request:", err) - return err - } - defer resp.Body.Close() - - bodyBytes, err = io.ReadAll(resp.Body) - if err != nil { - log.Error("Error reading response body:", err) - return err - } - - err = json.Unmarshal(bodyBytes, &response) - if err != nil { - fmt.Println("Error unmarshaling response body:", err) - return err - } - return nil - }, - retry.Attempts(3), - retry.Delay(time.Second), + client := NewRetryableHTTPClient(NerRetryMax, NerTimeout) + + req, err := http.NewRequestWithContext( + ctx, + http.MethodPost, + url, + bytes.NewBuffer(jsonBody), ) + if err != nil { + return models.EntityResponse{}, err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return models.EntityResponse{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + errorString := fmt.Sprintf( + "Error making POST request: %d - %s", + resp.StatusCode, + resp.Status, + ) + return models.EntityResponse{}, fmt.Errorf(errorString) + } + + bodyBytes, err = io.ReadAll(resp.Body) + if err != nil { + return models.EntityResponse{}, err + } + err = json.Unmarshal(bodyBytes, &response) if err != nil { return models.EntityResponse{}, err } diff --git a/pkg/tasks/router.go b/pkg/tasks/router.go index daebe2a3..58dd0356 100644 --- a/pkg/tasks/router.go +++ b/pkg/tasks/router.go @@ -6,6 +6,8 @@ import ( "sync" "time" + wotel "github.com/voi-oss/watermill-opentelemetry/pkg/opentelemetry" + "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" @@ -54,6 +56,9 @@ func NewTaskRouter(appState *models.AppState, db *sql.DB) (*TaskRouter, error) { } router.AddMiddleware( + // Watermill opentelemetry middleware + wotel.Trace(), + // Throttle limits the number of messages processed per second. middleware.NewThrottle(TaskCountThrottle, time.Second).Middleware, diff --git a/pkg/tasks/sql_queue.go b/pkg/tasks/sql_queue.go index 4f37aabb..ec90a307 100644 --- a/pkg/tasks/sql_queue.go +++ b/pkg/tasks/sql_queue.go @@ -4,6 +4,8 @@ import ( "database/sql" "time" + wotel "github.com/voi-oss/watermill-opentelemetry/pkg/opentelemetry" + "github.com/ThreeDotsLabs/watermill" wsql "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" @@ -22,7 +24,7 @@ func (s SQLSchema) SubscribeIsolationLevel() sql.IsolationLevel { } func NewSQLQueuePublisher(db *sql.DB, logger watermill.LoggerAdapter) (message.Publisher, error) { - return wsql.NewPublisher( + p, err := wsql.NewPublisher( db, wsql.PublisherConfig{ SchemaAdapter: SQLSchema{}, @@ -30,6 +32,10 @@ func NewSQLQueuePublisher(db *sql.DB, logger watermill.LoggerAdapter) (message.P }, logger, ) + if err != nil { + return nil, err + } + return wotel.NewPublisherDecorator(p), nil } func NewSQLQueueSubscriber(db *sql.DB, logger watermill.LoggerAdapter) (message.Subscriber, error) {