Skip to content

Commit

Permalink
The mega cruft proxy PR (micro#974)
Browse files Browse the repository at this point in the history
* the mega cruft proxy PR

* Rename broker id

* add protocol=grpc

* fix compilation breaks

* Add the tunnel broker to the network

* fix broker id

* continue to be backwards compatible in the protocol
  • Loading branch information
asim authored Nov 25, 2019
1 parent 2526673 commit 080363e
Show file tree
Hide file tree
Showing 23 changed files with 595 additions and 196 deletions.
11 changes: 4 additions & 7 deletions broker/http_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func newHttpBroker(opts ...Option) Broker {
}

h := &httpBroker{
id: "broker-" + uuid.New().String(),
id: "go.micro.http.broker-" + uuid.New().String(),
address: addr,
opts: options,
r: reg,
Expand Down Expand Up @@ -472,7 +472,7 @@ func (h *httpBroker) Init(opts ...Option) error {
}

if len(h.id) == 0 {
h.id = "broker-" + uuid.New().String()
h.id = "go.micro.http.broker-" + uuid.New().String()
}

// get registry
Expand Down Expand Up @@ -648,9 +648,6 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
return nil, err
}

// create unique id
id := h.id + "." + uuid.New().String()

var secure bool

if h.opts.Secure || h.opts.TLSConfig != nil {
Expand All @@ -659,7 +656,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO

// register service
node := &registry.Node{
Id: id,
Id: h.id,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
Expand All @@ -684,7 +681,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: id,
id: h.id,
topic: topic,
fn: handler,
svc: service,
Expand Down
12 changes: 12 additions & 0 deletions client/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@ var (
{
Id: "foo-1.0.0-123",
Address: "localhost:9999",
Metadata: map[string]string{
"protocol": "mucp",
},
},
{
Id: "foo-1.0.0-321",
Address: "localhost:9999",
Metadata: map[string]string{
"protocol": "mucp",
},
},
},
},
Expand All @@ -29,6 +35,9 @@ var (
{
Id: "foo-1.0.1-321",
Address: "localhost:6666",
Metadata: map[string]string{
"protocol": "mucp",
},
},
},
},
Expand All @@ -39,6 +48,9 @@ var (
{
Id: "foo-1.0.3-345",
Address: "localhost:8888",
Metadata: map[string]string{
"protocol": "mucp",
},
},
},
},
Expand Down
45 changes: 39 additions & 6 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
Expand Down Expand Up @@ -70,8 +71,13 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele
}, nil
}

// only get the things that are of grpc protocol
selectOptions := append(opts.SelectOptions, selector.WithFilter(
selector.FilterLabel("protocol", "grpc"),
))

// get next nodes from the selector
next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
next, err := g.opts.Selector.Select(service, selectOptions...)
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
Expand Down Expand Up @@ -510,29 +516,56 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
}

func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
var options client.PublishOptions
for _, o := range opts {
o(&options)
}

md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
}
md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic()

cf, err := g.newGRPCCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}

b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
var body []byte

// passed in raw data
if d, ok := p.Payload().(*raw.Frame); ok {
body = d.Data
} else {
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
}

g.once.Do(func() {
g.opts.Broker.Connect()
})

return g.opts.Broker.Publish(p.Topic(), &broker.Message{
topic := p.Topic()

// get proxy topic
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}

// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}

return g.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: b,
Body: body,
})
}

Expand Down
3 changes: 3 additions & 0 deletions client/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func TestGRPCClient(t *testing.T) {
{
Id: "test-1",
Address: l.Addr().String(),
Metadata: map[string]string{
"protocol": "grpc",
},
},
},
})
Expand Down
43 changes: 30 additions & 13 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/micro/go-micro/client/pool"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
Expand Down Expand Up @@ -349,8 +350,13 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
}, nil
}

// only get the things that are of mucp protocol
selectOptions := append(opts.SelectOptions, selector.WithFilter(
selector.FilterLabel("protocol", "mucp"),
))

// get next nodes from the selector
next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
next, err := r.opts.Selector.Select(service, selectOptions...)
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
Expand Down Expand Up @@ -583,26 +589,37 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
return errors.InternalServerError("go.micro.client", err.Error())
}

// new buffer
b := buf.New(nil)
var body []byte

if err := cf(b).Write(&codec.Message{
Target: topic,
Type: codec.Event,
Header: map[string]string{
"Micro-Id": id,
"Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
// passed in raw data
if d, ok := msg.Payload().(*raw.Frame); ok {
body = d.Data
} else {
// new buffer
b := buf.New(nil)

if err := cf(b).Write(&codec.Message{
Target: topic,
Type: codec.Event,
Header: map[string]string{
"Micro-Id": id,
"Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}

// set the body
body = b.Bytes()
}

r.once.Do(func() {
r.opts.Broker.Connect()
})

return r.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: b.Bytes(),
Body: body,
})
}

Expand Down
3 changes: 3 additions & 0 deletions client/rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func TestCallWrapper(t *testing.T) {
{
Id: id,
Address: address,
Metadata: map[string]string{
"protocol": "mucp",
},
},
},
})
Expand Down
5 changes: 5 additions & 0 deletions client/rpc_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
return nil
}

// processing topic publishing
if len(msg.Header["Micro-Topic"]) > 0 {
return nil
}

// no protocol use old codecs
switch msg.Header["Content-Type"] {
case "application/json":
Expand Down
8 changes: 8 additions & 0 deletions network/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/tunnel"
bun "github.com/micro/go-micro/tunnel/broker"
tun "github.com/micro/go-micro/tunnel/transport"
"github.com/micro/go-micro/util/backoff"
"github.com/micro/go-micro/util/log"
Expand Down Expand Up @@ -112,17 +113,24 @@ func newNetwork(opts ...Option) Network {
tun.WithTunnel(options.Tunnel),
)

// create the tunnel broker
tunBroker := bun.NewBroker(
bun.WithTunnel(options.Tunnel),
)

// server is network server
server := server.NewServer(
server.Id(options.Id),
server.Address(peerAddress),
server.Advertise(advertise),
server.Name(options.Name),
server.Transport(tunTransport),
server.Broker(tunBroker),
)

// client is network client
client := client.NewClient(
client.Broker(tunBroker),
client.Transport(tunTransport),
client.Selector(
rtr.NewSelector(
Expand Down
11 changes: 8 additions & 3 deletions proxy/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/micro/go-micro/client/grpc"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/server"
)
Expand Down Expand Up @@ -62,8 +61,14 @@ func readLoop(r server.Request, s client.Stream) error {
}
}

func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy.grpc", "SendRequest is unsupported")
// ProcessMessage acts as a message exchange and forwards messages to ongoing topics
// TODO: should we look at p.Endpoint and only send to the local endpoint? probably
func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error {
// TODO: check that we're not broadcast storming by sending to the same topic
// that we're actually subscribed to

// directly publish to the local client
return p.Client.Publish(ctx, msg)
}

// ServeRequest honours the server.Proxy interface
Expand Down
Loading

0 comments on commit 080363e

Please sign in to comment.