rsocket-go is an implementation of the RSocket protocol in Go.
🚧🚧🚧 IT IS UNDER ACTIVE DEVELOPMENT, APIs are unstable and maybe change at any time until release of v1.0.0.⚠️ ⚠️ ⚠️ DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!
- Design For Golang.
- Thin reactive-streams implementation.
- Simulate Java SDK API.
- Fast CLI (Compatible with https://github.com/rsocket/rsocket-cli).
- Installation:
go get github.com/rsocket/rsocket-go/cmd/rsocket-cli
- Example:
rsocket-cli --request -i hello_world --setup setup_me tcp://127.0.0.1:7878
- Installation:
Start an echo server
package main
import (
"context"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/mono"
)
func main() {
err := rsocket.Receive().
Resume().
Fragment(1024).
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
// bind responder
return rsocket.NewAbstractSocket(
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
return mono.Just(msg)
}),
), nil
}).
Transport("tcp://127.0.0.1:7878").
Serve(context.Background())
panic(err)
}
Connect to echo server
package main
import (
"context"
"log"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
)
func main() {
// Connect to server
cli, err := rsocket.Connect().
Resume().
Fragment(1024).
SetupPayload(payload.NewString("Hello", "World")).
Transport("tcp://127.0.0.1:7878").
Start(context.Background())
if err != nil {
panic(err)
}
defer cli.Close()
// Send request
result, err := cli.RequestResponse(payload.NewString("你好", "世界")).Block(context.Background())
if err != nil {
panic(err)
}
log.Println("response:", result)
}
NOTICE: more server examples are Here
Basic load balance feature, see here.
Mono
and Flux
are two parts of Reactor API. They are based on my another project reactor-go.
Mono
completes successfully by emitting an element, or with an error.
Here is a tiny example:
package main
import (
"context"
"fmt"
"github.com/jjeffcaii/reactor-go/scheduler"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/mono"
)
func main() {
// Create a Mono using Just.
m := mono.Just(payload.NewString("Hello World!", "text/plain"))
// More create
//m := mono.Create(func(i context.Context, sink mono.Sink) {
// sink.Success(payload.NewString("Hello World!", "text/plain"))
//})
done := make(chan struct{})
m.
DoFinally(func(s rx.SignalType) {
close(done)
}).
DoOnSuccess(func(input payload.Payload) {
// Handle and consume payload.
// Do something here...
fmt.Println("bingo:", input)
}).
SubscribeOn(scheduler.Elastic()).
Subscribe(context.Background())
<-done
}
Flux
emits 0 to N elements, and then completes (successfully or with an error).
Here is tiny example:
package main
import (
"context"
"fmt"
flxx "github.com/jjeffcaii/reactor-go/flux"
"github.com/rsocket/rsocket-go/extension"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/flux"
)
func main() {
// Create a Flux and produce 10 elements.
f := flux.Create(func(ctx context.Context, sink flux.Sink) {
for i := 0; i < 10; i++ {
sink.Next(payload.NewString(fmt.Sprintf("Hello@%d", i), extension.TextPlain.String()))
}
sink.Complete()
})
// Or use Just.
//f := flux.Just(
// payload.NewString("foo", extension.TextPlain.String()),
// payload.NewString("bar", extension.TextPlain.String()),
// payload.NewString("qux", extension.TextPlain.String()),
//)
f.
DoOnNext(func(elem payload.Payload) {
// Handle and consume elements
// Do something here...
fmt.Println("bingo:", elem)
}).
Subscribe(context.Background())
// Or you can use Raw reactor-go API. :-D
f2 := flux.Raw(flxx.Range(0, 10).Map(func(i interface{}) interface{} {
return payload.NewString(fmt.Sprintf("Hello@%d", i.(int)), extension.TextPlain.String())
}))
f2.
DoOnNext(func(input payload.Payload) {
fmt.Println("bingo:", input)
}).
BlockLast(context.Background())
}
Flux
support backpressure.
You can call func Request
in Subscription
or use LimitRate
before subscribe.
package main
import (
"context"
"fmt"
"github.com/rsocket/rsocket-go/extension"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
)
func main() {
// Here is an example which consume Payload one by one.
f := flux.Create(func(ctx context.Context, s flux.Sink) {
for i := 0; i < 5; i++ {
s.Next(payload.NewString(fmt.Sprintf("Hello@%d", i), extension.TextPlain.String()))
}
s.Complete()
})
var su rx.Subscription
f.
DoOnRequest(func(n int) {
fmt.Printf("requesting next %d element......\n", n)
}).
Subscribe(
context.Background(),
rx.OnSubscribe(func(s rx.Subscription) {
// Init Request 1 element.
su = s
su.Request(1)
}),
rx.OnNext(func(elem payload.Payload) {
// Consume element, do something...
fmt.Println("bingo:", elem)
// Request for next one manually.
su.Request(1)
}),
)
}
We do not use a specific log implementation. You can register your own log implementation. For example:
package main
import (
"log"
"github.com/rsocket/rsocket-go/logger"
)
func init() {
logger.SetFunc(logger.LevelDebug|logger.LevelInfo|logger.LevelWarn|logger.LevelError, func(template string, args ...interface{}) {
// Implement your own logger here...
log.Printf(template, args...)
})
logger.SetLevel(logger.LevelInfo)
}
- TCP
- Websocket
- MetadataPush
- RequestFNF
- RequestResponse
- RequestStream
- RequestChannel
- Resume
- Keepalive
- Fragmentation
- Thin Reactor
- Cancel
- Error
- Flow Control: RequestN
- Flow Control: Lease
- Load Balance