Skip to content

Commit

Permalink
segregate serializer interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lorenzoranucci committed Nov 12, 2022
1 parent 9713c18 commit 8f21193
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 13 deletions.
3 changes: 2 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ var runCmd = &cobra.Command{
}
handler, err := run.NewEventHandler(
getRedisStateHandler(),
&string2.Serializer{},
&string2.EventSerializer{},
&string2.EventKeySerializer{},
ed,
)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions internal/app/run/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (

func NewEventHandler(
stateHandler StateHandler,
rowEventSerializer EventSerializer,
eventSerializer EventSerializer,
eventKeySerializer EventKeySerializer,
eventDispatcher EventDispatcher,
) (*EventHandler, error) {
lastPositionRead, err := stateHandler.GetLastPositionRead()
Expand All @@ -16,7 +17,8 @@ func NewEventHandler(

return &EventHandler{
stateHandler: stateHandler,
rowEventSerializer: rowEventSerializer,
eventSerializer: eventSerializer,
eventKeySerializer: eventKeySerializer,
eventDispatcher: eventDispatcher,
lastPositionRead: lastPositionRead,
}, nil
Expand All @@ -26,7 +28,8 @@ type EventHandler struct {
canal.DummyEventHandler

stateHandler StateHandler
rowEventSerializer EventSerializer
eventSerializer EventSerializer
eventKeySerializer EventKeySerializer
eventDispatcher EventDispatcher
lastPositionRead uint32
}
Expand All @@ -37,10 +40,13 @@ type StateHandler interface {
}

type EventSerializer interface {
SerializeKey(e *canal.RowsEvent) (interface{}, error)
SerializeMessage(e *canal.RowsEvent) (interface{}, error)
}

type EventKeySerializer interface {
SerializeKey(e *canal.RowsEvent) (interface{}, error)
}

type EventDispatcher interface {
Dispatch(routingKey interface{}, message interface{}) error
}
Expand All @@ -50,12 +56,12 @@ func (h *EventHandler) OnRow(e *canal.RowsEvent) error {
return nil
}

k, err := h.rowEventSerializer.SerializeKey(e)
k, err := h.eventKeySerializer.SerializeKey(e)
if err != nil {
return err
}

m, err := h.rowEventSerializer.SerializeMessage(e)
m, err := h.eventSerializer.SerializeMessage(e)
if err != nil {
return err
}
Expand Down
12 changes: 12 additions & 0 deletions internal/pkg/string/event_key_serializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package string

import (
"github.com/go-mysql-org/go-mysql/canal"
)

type EventKeySerializer struct {
}

func (j *EventKeySerializer) SerializeKey(e *canal.RowsEvent) (interface{}, error) {
return "", nil
}
8 changes: 2 additions & 6 deletions internal/pkg/string/event_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ import (
"github.com/go-mysql-org/go-mysql/canal"
)

type Serializer struct {
type EventSerializer struct {
}

func (j *Serializer) SerializeKey(e *canal.RowsEvent) (interface{}, error) {
return "", nil
}

func (j *Serializer) SerializeMessage(e *canal.RowsEvent) (interface{}, error) {
func (j *EventSerializer) SerializeMessage(e *canal.RowsEvent) (interface{}, error) {
return fmt.Sprintf("%v", e.Rows), nil
}

0 comments on commit 8f21193

Please sign in to comment.