rsocket-go is an implementation of the RSocket protocol in Go. It is still under development, APIs are unstable and maybe change at any time until release of v1.0.0. Please do not use it in a production environment.
- Design For Golang.
- Thin reactive-streams implementation.
- Simulate Java SDK API.
- Fast CLI (Compatible with https://github.com/rsocket/rsocket-cli).
- Installation:
go get github.com/rsocket/rsocket-go/cmd/rsocket-cli
- Example:
rsocket-cli --request -i hello_world --setup setup_me tcp://127.0.0.1:7878
- Installation:
Start an echo server
package main
import (
"context"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/mono"
)
func main() {
err := rsocket.Receive().
Resume().
Fragment(1024).
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) rsocket.RSocket {
// bind responder
return rsocket.NewAbstractSocket(
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
return mono.Just(msg)
}),
)
}).
Transport("tcp://127.0.0.1:7878").
Serve(context.Background())
panic(err)
}
Connect to echo server
package main
import (
"context"
"log"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
)
func main() {
// Connect to server
cli, err := rsocket.Connect().
Resume().
Fragment(1024).
SetupPayload(payload.NewString("Hello", "World")).
Transport("tcp://127.0.0.1:7878").
Start(context.Background())
if err != nil {
panic(err)
}
defer cli.Close()
// Send request
result, err := cli.RequestResponse(payload.NewString("你好", "世界")).Block(context.Background())
if err != nil {
panic(err)
}
log.Println("response:", result)
}
NOTICE: more server examples are Here
Basic load balance feature, see here.
Mono
and Flux
are two parts of Reactor API. They are based on my another project reactor-go.
Mono
completes successfully by emitting an element, or with an error.
Here is a tiny example:
package main
import (
"context"
"fmt"
"github.com/jjeffcaii/reactor-go/scheduler"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/mono"
)
func main() {
// Create a Mono using Just.
m := mono.Just(payload.NewString("Hello World!", "text/plain"))
// More create
//m := mono.Create(func(i context.Context, sink mono.Sink) {
// sink.Success(payload.NewString("Hello World!", "text/plain"))
//})
done := make(chan struct{})
m.
DoFinally(func(s rx.SignalType) {
close(done)
}).
DoOnSuccess(func(input payload.Payload) {
// Handle and consume payload.
// Do something here...
fmt.Println("bingo:", input)
}).
SubscribeOn(scheduler.Elastic()).
Subscribe(context.Background())
<-done
}
Flux
emits 0 to N elements, and then completes (successfully or with an error).
Here is tiny example:
package main
import (
"context"
"fmt"
flxx "github.com/jjeffcaii/reactor-go/flux"
"github.com/rsocket/rsocket-go/extension"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/flux"
)
func main() {
// Create a Flux and produce 10 elements.
f := flux.Create(func(ctx context.Context, sink flux.Sink) {
for i := 0; i < 10; i++ {
sink.Next(payload.NewString(fmt.Sprintf("Hello@%d", i), extension.TextPlain.String()))
}
sink.Complete()
})
// Or use Just.
//f := flux.Just(
// payload.NewString("foo", extension.TextPlain.String()),
// payload.NewString("bar", extension.TextPlain.String()),
// payload.NewString("qux", extension.TextPlain.String()),
//)
f.
DoOnNext(func(elem payload.Payload) {
// Handle and consume elements
// Do something here...
fmt.Println("bingo:", elem)
}).
Subscribe(context.Background())
// Or you can use Raw reactor-go API. :-D
f2 := flux.Raw(flxx.Range(0, 10).Map(func(i interface{}) interface{} {
return payload.NewString(fmt.Sprintf("Hello@%d", i.(int)), extension.TextPlain.String())
}))
f2.
DoOnNext(func(input payload.Payload) {
fmt.Println("bingo:", input)
}).
BlockLast(context.Background())
}
Flux
support backpressure.
You can call func Request
in Subscription
or use LimitRate
before subscribe.
package main
import (
"context"
"fmt"
"github.com/rsocket/rsocket-go/extension"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
)
func main() {
// Here is an example which consume Payload one by one.
f := flux.Create(func(ctx context.Context, s flux.Sink) {
for i := 0; i < 5; i++ {
s.Next(payload.NewString(fmt.Sprintf("Hello@%d", i), extension.TextPlain.String()))
}
s.Complete()
})
var su rx.Subscription
f.
DoOnRequest(func(n int) {
fmt.Printf("requesting next %d element......\n", n)
}).
Subscribe(
context.Background(),
rx.OnSubscribe(func(s rx.Subscription) {
// Init Request 1 element.
su = s
su.Request(1)
}),
rx.OnNext(func(elem payload.Payload) {
// Consume element, do something...
fmt.Println("bingo:", elem)
// Request for next one manually.
su.Request(1)
}),
)
}
- TCP
- Websocket
- MetadataPush
- RequestFNF
- RequestResponse
- RequestStream
- RequestChannel
- Resume
- Keepalive
- Fragmentation
- Thin Reactor
- Cancel
- Error
- Flow Control: RequestN
- Flow Control: Lease
- Load Balance