Skip to content

Commit

Permalink
minimize allocations in logger and tunnel code (micro#1323)
Browse files Browse the repository at this point in the history
* logs alloc

Signed-off-by: Vasiliy Tolstov <[email protected]>

* fix allocs

Signed-off-by: Vasiliy Tolstov <[email protected]>

* fix allocs

Signed-off-by: Vasiliy Tolstov <[email protected]>

* tunnel allocs

Signed-off-by: Vasiliy Tolstov <[email protected]>

* try to fix tunnel

Signed-off-by: Vasiliy Tolstov <[email protected]>

* cache cipher for send

Signed-off-by: Vasiliy Tolstov <[email protected]>

* more logger

Signed-off-by: Vasiliy Tolstov <[email protected]>

* more logger

Signed-off-by: Vasiliy Tolstov <[email protected]>

* more logger

Signed-off-by: Vasiliy Tolstov <[email protected]>

* more logger

Signed-off-by: Vasiliy Tolstov <[email protected]>

* more logger

Signed-off-by: Vasiliy Tolstov <[email protected]>

* more logger

Signed-off-by: Vasiliy Tolstov <[email protected]>

* more logger

Signed-off-by: Vasiliy Tolstov <[email protected]>
  • Loading branch information
vtolstov authored Mar 11, 2020
1 parent 4125ae8 commit 7b385bf
Show file tree
Hide file tree
Showing 47 changed files with 917 additions and 382 deletions.
6 changes: 4 additions & 2 deletions agent/input/discord/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/bwmarrin/discordgo"
"github.com/micro/go-micro/v2/agent/input"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
)

type discordConn struct {
Expand Down Expand Up @@ -74,7 +74,9 @@ func (dc *discordConn) Send(e *input.Event) error {
fields := strings.Split(e.To, ":")
_, err := dc.master.session.ChannelMessageSend(fields[0], string(e.Data))
if err != nil {
log.Error("[bot][loop][send]", err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("[bot][loop][send]", err)
}
}
return nil
}
Expand Down
6 changes: 4 additions & 2 deletions agent/input/telegram/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/forestgiant/sliceutil"
"github.com/micro/go-micro/v2/agent/input"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
tgbotapi "gopkg.in/telegram-bot-api.v4"
)

Expand Down Expand Up @@ -104,7 +104,9 @@ func (tc *telegramConn) Send(event *input.Event) error {

if err != nil {
// probably it could be because of nested HTML tags -- telegram doesn't allow nested tags
log.Error("[telegram][Send] error:", err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("[telegram][Send] error:", err)
}
msgConfig.Text = "This bot couldn't send the response (Internal error)"
tc.input.api.Send(msgConfig)
}
Expand Down
10 changes: 7 additions & 3 deletions api/handler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/gorilla/websocket"
"github.com/micro/go-micro/v2/api/handler"
"github.com/micro/go-micro/v2/broker"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
)

const (
Expand Down Expand Up @@ -136,7 +136,9 @@ func (c *conn) writeLoop() {
}()

if err != nil {
log.Error(err.Error())
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err.Error())
}
return
}

Expand Down Expand Up @@ -214,7 +216,9 @@ func (b *brokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

ws, err := b.u.Upgrade(w, r, nil)
if err != nil {
log.Error(err.Error())
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err.Error())
}
return
}

Expand Down
8 changes: 5 additions & 3 deletions api/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/gorilla/handlers"
"github.com/micro/go-micro/v2/api/server"
"github.com/micro/go-micro/v2/api/server/cors"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
)

type httpServer struct {
Expand Down Expand Up @@ -75,7 +75,9 @@ func (s *httpServer) Start() error {
return err
}

log.Infof("HTTP API Listening on %s", l.Addr().String())
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("HTTP API Listening on %s", l.Addr().String())
}

s.mtx.Lock()
s.address = l.Addr().String()
Expand All @@ -84,7 +86,7 @@ func (s *httpServer) Start() error {
go func() {
if err := http.Serve(l, s.mux); err != nil {
// temporary fix
//log.Fatal(err)
//logger.Fatal(err)
}
}()

Expand Down
18 changes: 13 additions & 5 deletions broker/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

"github.com/micro/go-micro/v2/codec/json"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/util/addr"
"github.com/nats-io/nats-server/v2/server"
Expand Down Expand Up @@ -172,7 +172,9 @@ func (n *natsBroker) serve(exit chan bool) error {
for _, node := range service.Nodes {
u, err := url.Parse("nats://" + node.Address)
if err != nil {
log.Info(err)
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Info(err)
}
continue
}
// append to the cluster routes
Expand Down Expand Up @@ -247,7 +249,9 @@ func (n *natsBroker) serve(exit chan bool) error {
select {
case err := <-n.closeCh:
if err != nil {
log.Info(err)
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Info(err)
}
}
case <-exit:
// deregister on exit
Expand Down Expand Up @@ -402,15 +406,19 @@ func (n *natsBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
pub.m = &m
if err != nil {
m.Body = msg.Data
log.Error(err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
if eh != nil {
eh(pub)
}
return
}
if err := handler(pub); err != nil {
pub.err = err
log.Error(err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
if eh != nil {
eh(pub)
}
Expand Down
6 changes: 4 additions & 2 deletions broker/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/google/uuid"
"github.com/micro/go-micro/v2/broker"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
maddr "github.com/micro/go-micro/v2/util/addr"
mnet "github.com/micro/go-micro/v2/util/net"
)
Expand Down Expand Up @@ -190,7 +190,9 @@ func (m *memoryEvent) Message() *broker.Message {
case []byte:
msg := &broker.Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
log.Errorf("[memory]: failed to unmarshal: %v\n", err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[memory]: failed to unmarshal: %v\n", err)
}
return nil
}
return msg
Expand Down
14 changes: 10 additions & 4 deletions broker/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/codec/json"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/util/addr"
"github.com/nats-io/nats-server/v2/server"
Expand Down Expand Up @@ -169,7 +169,9 @@ func (n *natsBroker) serve(exit chan bool) error {
for _, node := range service.Nodes {
u, err := url.Parse("nats://" + node.Address)
if err != nil {
log.Error(err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
continue
}
// append to the cluster routes
Expand Down Expand Up @@ -387,15 +389,19 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
pub.m = &m
if err != nil {
m.Body = msg.Data
log.Error(err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
if eh != nil {
eh(pub)
}
return
}
if err := handler(pub); err != nil {
pub.err = err
log.Error(err)
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
if eh != nil {
eh(pub)
}
Expand Down
28 changes: 20 additions & 8 deletions broker/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/micro/go-micro/v2/broker"
pb "github.com/micro/go-micro/v2/broker/service/proto"
"github.com/micro/go-micro/v2/client"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
)

type serviceBroker struct {
Expand Down Expand Up @@ -45,7 +45,9 @@ func (b *serviceBroker) Options() broker.Options {
}

func (b *serviceBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
log.Debugf("Publishing to topic %s broker %v", topic, b.Addrs)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Publishing to topic %s broker %v", topic, b.Addrs)
}
_, err := b.Client.Publish(context.TODO(), &pb.PublishRequest{
Topic: topic,
Message: &pb.Message{
Expand All @@ -61,7 +63,9 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ...
for _, o := range opts {
o(&options)
}
log.Debugf("Subscribing to topic %s queue %s broker %v", topic, options.Queue, b.Addrs)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Subscribing to topic %s queue %s broker %v", topic, options.Queue, b.Addrs)
}
stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{
Topic: topic,
Queue: options.Queue,
Expand All @@ -83,19 +87,27 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ...
for {
select {
case <-sub.closed:
log.Debugf("Unsubscribed from topic %s", topic)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Unsubscribed from topic %s", topic)
}
return
default:
// run the subscriber
log.Debugf("Streaming from broker %v to topic [%s] queue [%s]", b.Addrs, topic, options.Queue)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
// run the subscriber
logger.Debugf("Streaming from broker %v to topic [%s] queue [%s]", b.Addrs, topic, options.Queue)
}
if err := sub.run(); err != nil {
log.Debugf("Resubscribing to topic %s broker %v", topic, b.Addrs)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Resubscribing to topic %s broker %v", topic, b.Addrs)
}
stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{
Topic: topic,
Queue: options.Queue,
}, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour))
if err != nil {
log.Debugf("Failed to resubscribe to topic %s: %v", topic, err)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Failed to resubscribe to topic %s: %v", topic, err)
}
time.Sleep(time.Second)
continue
}
Expand Down
6 changes: 4 additions & 2 deletions broker/service/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package service
import (
"github.com/micro/go-micro/v2/broker"
pb "github.com/micro/go-micro/v2/broker/service/proto"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
)

type serviceSub struct {
Expand Down Expand Up @@ -62,7 +62,9 @@ func (s *serviceSub) run() error {
// TODO: do not fail silently
msg, err := s.stream.Recv()
if err != nil {
log.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err)
}

// close the exit channel
close(exit)
Expand Down
24 changes: 12 additions & 12 deletions config/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/micro/go-micro/v2/debug/profile/http"
"github.com/micro/go-micro/v2/debug/profile/pprof"
"github.com/micro/go-micro/v2/debug/trace"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/runtime"
"github.com/micro/go-micro/v2/server"
Expand Down Expand Up @@ -537,13 +537,13 @@ func (c *cmd) Before(ctx *cli.Context) error {
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))

if err := (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)); err != nil {
log.Fatalf("Error configuring registry: %v", err)
logger.Fatalf("Error configuring registry: %v", err)
}

clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))

if err := (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)); err != nil {
log.Fatalf("Error configuring broker: %v", err)
logger.Fatalf("Error configuring broker: %v", err)
}
}

Expand Down Expand Up @@ -590,31 +590,31 @@ func (c *cmd) Before(ctx *cli.Context) error {

if len(ctx.String("broker_address")) > 0 {
if err := (*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...)); err != nil {
log.Fatalf("Error configuring broker: %v", err)
logger.Fatalf("Error configuring broker: %v", err)
}
}

if len(ctx.String("registry_address")) > 0 {
if err := (*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...)); err != nil {
log.Fatalf("Error configuring registry: %v", err)
logger.Fatalf("Error configuring registry: %v", err)
}
}

if len(ctx.String("transport_address")) > 0 {
if err := (*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...)); err != nil {
log.Fatalf("Error configuring transport: %v", err)
logger.Fatalf("Error configuring transport: %v", err)
}
}

if len(ctx.String("store_address")) > 0 {
if err := (*c.opts.Store).Init(store.Nodes(strings.Split(ctx.String("store_address"), ",")...)); err != nil {
log.Fatalf("Error configuring store: %v", err)
logger.Fatalf("Error configuring store: %v", err)
}
}

if len(ctx.String("store_namespace")) > 0 {
if err := (*c.opts.Store).Init(store.Namespace(ctx.String("store_address"))); err != nil {
log.Fatalf("Error configuring store: %v", err)
logger.Fatalf("Error configuring store: %v", err)
}
}

Expand Down Expand Up @@ -648,7 +648,7 @@ func (c *cmd) Before(ctx *cli.Context) error {

if len(ctx.String("runtime_source")) > 0 {
if err := (*c.opts.Runtime).Init(runtime.WithSource(ctx.String("runtime_source"))); err != nil {
log.Fatalf("Error configuring runtime: %v", err)
logger.Fatalf("Error configuring runtime: %v", err)
}
}

Expand Down Expand Up @@ -696,7 +696,7 @@ func (c *cmd) Before(ctx *cli.Context) error {

if len(authOpts) > 0 {
if err := (*c.opts.Auth).Init(authOpts...); err != nil {
log.Fatalf("Error configuring auth: %v", err)
logger.Fatalf("Error configuring auth: %v", err)
}
}

Expand Down Expand Up @@ -729,14 +729,14 @@ func (c *cmd) Before(ctx *cli.Context) error {
// Lets set it up
if len(serverOpts) > 0 {
if err := (*c.opts.Server).Init(serverOpts...); err != nil {
log.Fatalf("Error configuring server: %v", err)
logger.Fatalf("Error configuring server: %v", err)
}
}

// Use an init option?
if len(clientOpts) > 0 {
if err := (*c.opts.Client).Init(clientOpts...); err != nil {
log.Fatalf("Error configuring client: %v", err)
logger.Fatalf("Error configuring client: %v", err)
}
}

Expand Down
Loading

0 comments on commit 7b385bf

Please sign in to comment.