Skip to content

Commit

Permalink
Change Publication to Event
Browse files Browse the repository at this point in the history
  • Loading branch information
asim committed Jul 7, 2019
1 parent 79b03a6 commit 4b4ad68
Show file tree
Hide file tree
Showing 15 changed files with 42 additions and 42 deletions.
2 changes: 1 addition & 1 deletion api/handler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *conn) writeLoop() {
opts = append(opts, broker.Queue(c.queue))
}

subscriber, err := c.b.Subscribe(c.topic, func(p broker.Publication) error {
subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error {
b, err := json.Marshal(p.Message())
if err != nil {
return nil
Expand Down
6 changes: 3 additions & 3 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ type Broker interface {
// Handler is used to process messages via a subscription of a topic.
// The handler is passed a publication interface which contains the
// message and optional Ack method to acknowledge receipt of the message.
type Handler func(Publication) error
type Handler func(Event) error

type Message struct {
Header map[string]string
Body []byte
}

// Publication is given to a subscription handler for processing
type Publication interface {
// Event is given to a subscription handler for processing
type Event interface {
Topic() string
Message() *Message
Ack() error
Expand Down
10 changes: 5 additions & 5 deletions broker/http_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type httpSubscriber struct {
hb *httpBroker
}

type httpPublication struct {
type httpEvent struct {
m *Message
t string
}
Expand Down Expand Up @@ -155,15 +155,15 @@ func newHttpBroker(opts ...Option) Broker {
return h
}

func (h *httpPublication) Ack() error {
func (h *httpEvent) Ack() error {
return nil
}

func (h *httpPublication) Message() *Message {
func (h *httpEvent) Message() *Message {
return h.m
}

func (h *httpPublication) Topic() string {
func (h *httpEvent) Topic() string {
return h.t
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

p := &httpPublication{m: m, t: topic}
p := &httpEvent{m: m, t: topic}
id := req.Form.Get("id")

h.RLock()
Expand Down
10 changes: 5 additions & 5 deletions broker/http_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func sub(be *testing.B, c int) {
done := make(chan bool, c)

for i := 0; i < c; i++ {
sub, err := b.Subscribe(topic, func(p Publication) error {
sub, err := b.Subscribe(topic, func(p Event) error {
done <- true
m := p.Message()

Expand Down Expand Up @@ -107,7 +107,7 @@ func pub(be *testing.B, c int) {

done := make(chan bool, c*4)

sub, err := b.Subscribe(topic, func(p Publication) error {
sub, err := b.Subscribe(topic, func(p Event) error {
done <- true
m := p.Message()
if string(m.Body) != string(msg.Body) {
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestBroker(t *testing.T) {

done := make(chan bool)

sub, err := b.Subscribe("test", func(p Publication) error {
sub, err := b.Subscribe("test", func(p Event) error {
m := p.Message()

if string(m.Body) != string(msg.Body) {
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestConcurrentSubBroker(t *testing.T) {
var wg sync.WaitGroup

for i := 0; i < 10; i++ {
sub, err := b.Subscribe("test", func(p Publication) error {
sub, err := b.Subscribe("test", func(p Event) error {
defer wg.Done()

m := p.Message()
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestConcurrentPubBroker(t *testing.T) {

var wg sync.WaitGroup

sub, err := b.Subscribe("test", func(p Publication) error {
sub, err := b.Subscribe("test", func(p Event) error {
defer wg.Done()

m := p.Message()
Expand Down
10 changes: 5 additions & 5 deletions broker/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type memoryBroker struct {
Subscribers map[string][]*memorySubscriber
}

type memoryPublication struct {
type memoryEvent struct {
topic string
message *broker.Message
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...br
return nil
}

p := &memoryPublication{
p := &memoryEvent{
topic: topic,
message: message,
}
Expand Down Expand Up @@ -142,15 +142,15 @@ func (m *memoryBroker) String() string {
return "memory"
}

func (m *memoryPublication) Topic() string {
func (m *memoryEvent) Topic() string {
return m.topic
}

func (m *memoryPublication) Message() *broker.Message {
func (m *memoryEvent) Message() *broker.Message {
return m.message
}

func (m *memoryPublication) Ack() error {
func (m *memoryEvent) Ack() error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion broker/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestMemoryBroker(t *testing.T) {
topic := "test"
count := 10

fn := func(p broker.Publication) error {
fn := func(p broker.Event) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (g *grpcClient) Options() client.Options {
}

func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
return newGRPCPublication(topic, msg, g.opts.ContentType, opts...)
return newGRPCEvent(topic, msg, g.opts.ContentType, opts...)
}

func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
Expand Down Expand Up @@ -498,7 +498,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
}

b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Payload()); err != nil {
if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}

Expand Down
12 changes: 6 additions & 6 deletions client/grpc/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"github.com/micro/go-micro/client"
)

type grpcPublication struct {
type grpcEvent struct {
topic string
contentType string
payload interface{}
}

func newGRPCPublication(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
var options client.MessageOptions
for _, o := range opts {
o(&options)
Expand All @@ -20,21 +20,21 @@ func newGRPCPublication(topic string, payload interface{}, contentType string, o
contentType = options.ContentType
}

return &grpcPublication{
return &grpcEvent{
payload: payload,
topic: topic,
contentType: contentType,
}
}

func (g *grpcPublication) ContentType() string {
func (g *grpcEvent) ContentType() string {
return g.contentType
}

func (g *grpcPublication) Topic() string {
func (g *grpcEvent) Topic() string {
return g.topic
}

func (g *grpcPublication) Payload() interface{} {
func (g *grpcEvent) Payload() interface{} {
return g.payload
}
2 changes: 1 addition & 1 deletion client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{
Target: topic,
Type: codec.Publication,
Type: codec.Event,
Header: map[string]string{
"Micro-Id": id,
"Micro-Topic": msg.Topic(),
Expand Down
2 changes: 1 addition & 1 deletion codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const (
Error MessageType = iota
Request
Response
Publication
Event
)

type MessageType int
Expand Down
6 changes: 3 additions & 3 deletions codec/jsonrpc/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (j *jsonCodec) Write(m *codec.Message, b interface{}) error {
return j.c.Write(m, b)
case codec.Response, codec.Error:
return j.s.Write(m, b)
case codec.Publication:
case codec.Event:
data, err := json.Marshal(b)
if err != nil {
return err
Expand All @@ -54,7 +54,7 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
return j.s.ReadHeader(m)
case codec.Response:
return j.c.ReadHeader(m)
case codec.Publication:
case codec.Event:
_, err := io.Copy(j.buf, j.rwc)
return err
default:
Expand All @@ -69,7 +69,7 @@ func (j *jsonCodec) ReadBody(b interface{}) error {
return j.s.ReadBody(b)
case codec.Response:
return j.c.ReadBody(b)
case codec.Publication:
case codec.Event:
if b != nil {
return json.Unmarshal(j.buf.Bytes(), b)
}
Expand Down
6 changes: 3 additions & 3 deletions codec/protorpc/protorpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err
}
}
case codec.Publication:
case codec.Event:
data, err := proto.Marshal(b.(proto.Message))
if err != nil {
return err
Expand Down Expand Up @@ -141,7 +141,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
m.Method = rtmp.GetServiceMethod()
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
m.Error = rtmp.GetError()
case codec.Publication:
case codec.Event:
_, err := io.Copy(c.buf, c.rwc)
return err
default:
Expand All @@ -159,7 +159,7 @@ func (c *protoCodec) ReadBody(b interface{}) error {
if err != nil {
return err
}
case codec.Publication:
case codec.Event:
data = c.buf.Bytes()
default:
return fmt.Errorf("Unrecognised message type: %v", c.mt)
Expand Down
4 changes: 2 additions & 2 deletions server/grpc/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func validateSubscriber(sub server.Subscriber) error {
}

func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Publication) error {
return func(p broker.Event) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
if len(ct) == 0 {
Expand Down Expand Up @@ -208,7 +208,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
co := cf(b)
defer co.Close()

if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil {
if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions server/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func validateSubscriber(sub Subscriber) error {
}

func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Publication) error {
return func(p broker.Event) error {
msg := p.Message()

// get codec
Expand Down Expand Up @@ -214,7 +214,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
co := cf(b)
defer co.Close()

if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil {
if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions sync/task/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (t *Task) Run(c task.Command) error {
errCh := make(chan error, t.Options.Pool)

// subscribe for distributed work
workFn := func(p broker.Publication) error {
workFn := func(p broker.Event) error {
msg := p.Message()

// get command name
Expand Down Expand Up @@ -110,7 +110,7 @@ func (t *Task) Run(c task.Command) error {
}

// subscribe to all status messages
subStatus, err := t.Broker.Subscribe(topic, func(p broker.Publication) error {
subStatus, err := t.Broker.Subscribe(topic, func(p broker.Event) error {
msg := p.Message()

// get command name
Expand Down

0 comments on commit 4b4ad68

Please sign in to comment.