Skip to content

Commit

Permalink
added admin RPC server, accessed through 'sky register' and 'sky unre…
Browse files Browse the repository at this point in the history
…gister'
  • Loading branch information
skelterjohn committed Jul 13, 2012
1 parent 4ceb1d5 commit 9eaf354
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 16 deletions.
14 changes: 9 additions & 5 deletions rpc/bsonrpc/bsoncoders.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package bsonrpc

import (
"io"
"errors"
"fmt"
"io"
"launchpad.net/mgo/v2/bson"
)

Expand All @@ -16,13 +16,13 @@ func NewEncoder(w io.Writer) *Encoder {
}

func (e *Encoder) Encode(v interface{}) (err error) {
fmt.Printf("encoding: %+v\n", v)
//fmt.Printf("encoding: %+v\n", v)
buf, err := bson.Marshal(v)
if err != nil {
return
}
_, err = e.w.Write(buf)
fmt.Printf("encoded to: %v\n", buf)
//fmt.Printf("encoded to: %v\n", buf)
return
}

Expand All @@ -37,6 +37,10 @@ func NewDecoder(r io.Reader) *Decoder {
func (d *Decoder) Decode(pv interface{}) (err error) {
var lbuf [4]byte
n, err := d.r.Read(lbuf[:])
if n == 0 {
err = io.EOF
return
}
if n != 4 {
err = errors.New(fmt.Sprintf("Corrupted BSON stream: could only read %d", n))
return
Expand All @@ -57,11 +61,11 @@ func (d *Decoder) Decode(pv interface{}) (err error) {
return
}

fmt.Printf("decoding: %v\n", buf)
//fmt.Printf("decoding: %v\n", buf)

err = bson.Unmarshal(buf, pv)

fmt.Printf("decoded: %+v\n", pv)
//fmt.Printf("decoded: %+v\n", pv)

return
}
8 changes: 1 addition & 7 deletions rpc/bsonrpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package bsonrpc

import (
"io"
"testing"
"net/rpc"
"testing"
)

type duplex struct {
Expand All @@ -28,12 +28,6 @@ func (ts Test) Foo(in TestParam, out *TestParam) (err error) {
return
}

func basicServer(conn io.ReadWriteCloser) {
s := ServeConn(conn)
var ts Test
s.Register(&ts)
}

func TestBasicClientServer(t *testing.T) {
toServer, fromClient := io.Pipe()
toClient, fromServer := io.Pipe()
Expand Down
51 changes: 51 additions & 0 deletions sky/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"github.com/bketelsen/skynet/rpc/bsonrpc"
"github.com/bketelsen/skynet/skylib"
"net"
"net/rpc"
"os"
)

func doRegister(rpcClient *rpc.Client, log skylib.Logger) {
var args skylib.RegisterParams
var reply skylib.RegisterReturns
err := rpcClient.Call("Admin.Register", args, &reply)
if err != nil {
log.Item(err)
}
}

func doUnregister(rpcClient *rpc.Client, log skylib.Logger) {
var args skylib.UnregisterParams
var reply skylib.UnregisterReturns
err := rpcClient.Call("Admin.Unregister", args, &reply)
if err != nil {
log.Item(err)
}
}

func Register(q *skylib.Query) {
doSomething(q, doRegister)
}

func Unregister(q *skylib.Query) {
doSomething(q, doUnregister)
}

func doSomething(q *skylib.Query, do func(*rpc.Client, skylib.Logger)) {

log := skylib.NewConsoleLogger(os.Stderr)
results := *q.FindInstances()
for _, result := range results {
conn, err := net.Dial("tcp", result.Config.AdminAddr.String())
if err != nil {
log.Item(err)
continue
}
rpcClient := bsonrpc.NewClient(conn)
do(rpcClient, log)
conn.Close()
}
}
12 changes: 8 additions & 4 deletions sky/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package main

import (
"bufio"
"bytes"
"encoding/json"
"errors"
"flag"
"github.com/bketelsen/skynet/skylib"
"sync"
"io"
"bytes"
"encoding/json"
"log"
"os"
"strings"
"sync"
)

// Daemon() will run and maintain skynet services.
Expand Down Expand Up @@ -115,7 +115,11 @@ func (s *SkynetDaemon) Stopped(service *skylib.Service) {

func (s *SkynetDaemon) Deploy(servicePath, args string) (uuid string, err error) {
uuid = skylib.UUID()
s.Log.Println("Deploying", servicePath, args)

s.Log.Item(SubserviceDeployment{
ServicePath: servicePath,
Args: args,
})

ss, err := NewSubService(s.Log, servicePath, args)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions sky/logmessages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"fmt"
)

type SubserviceDeployment struct {
ServicePath string
Args string
}

func (sd SubserviceDeployment) String() string {
return fmt.Sprintf("Deployed %s %s", sd.ServicePath, sd.Args)
}
4 changes: 4 additions & 0 deletions sky/sky.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func main() {
Daemon(query, flag.Args()[1:])
case "remote":
Remote(query, flag.Args()[1:])
case "register":
Register(query)
case "unregister":
Unregister(query)
case "cli":
InteractiveShell()

Expand Down
74 changes: 74 additions & 0 deletions skylib/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package skylib

import (
"github.com/bketelsen/skynet/rpc/bsonrpc"
"net"
"net/rpc"
)

type ServiceAdmin struct {
service *Service
rpc *rpc.Server
}

func NewServiceAdmin(service *Service) (sa *ServiceAdmin) {
sa = &ServiceAdmin{
service: service,
rpc: rpc.NewServer(),
}

sa.rpc.Register(&Admin{
service: service,
})

return
}

func (sa *ServiceAdmin) Listen(addr *BindAddr) {
laddr, err := net.ResolveTCPAddr("tcp", addr.String())
if err != nil {
panic(err)
}
listener, err := net.ListenTCP("tcp", laddr)
if err != nil {
panic(err)
}

sa.service.Log.Item(AdminListening{sa.service.Config})

for {
conn, err := listener.AcceptTCP()
if err != nil {
panic(err)
}
go sa.rpc.ServeCodec(bsonrpc.NewServerCodec(conn))
}
}

type Admin struct {
service *Service
}

type RegisterParams struct {
}

type RegisterReturns struct {
}

func (sa *Admin) Register(in RegisterParams, out *RegisterReturns) (err error) {
sa.service.Log.Item("Got RPC admin command Register")
sa.service.Register()
return
}

type UnregisterParams struct {
}

type UnregisterReturns struct {
}

func (sa *Admin) Unregister(in UnregisterParams, out *UnregisterReturns) (err error) {
sa.service.Log.Item("Got RPC admin command Unregister")
sa.service.Unregister()
return
}
5 changes: 5 additions & 0 deletions skylib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func GetServiceConfigFromFlags() *ServiceConfig {

var (
bindPort *int = flagset.Int("port", 9999, "tcp port to listen")
adminPort *int = flagset.Int("adminport", 9998, "tcp port to listen for admin")
bindAddr *string = flagset.String("address", "127.0.0.1", "address to bind")
region *string = flagset.String("region", "unknown", "region service is located in")
doozer *string = flagset.String("doozer", "127.0.0.1:8046", "initial doozer instance to connect to")
Expand All @@ -68,6 +69,10 @@ func GetServiceConfigFromFlags() *ServiceConfig {
IPAddress: *bindAddr,
Port: *bindPort,
},
AdminAddr: &BindAddr{
IPAddress: *bindAddr,
Port: *adminPort,
},
DoozerConfig: &DoozerConfig{
Uri: *doozer,
BootUri: *doozerBoot,
Expand Down
8 changes: 8 additions & 0 deletions skylib/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Service struct {

Log Logger `json:"-"`

Admin *ServiceAdmin `json:"-"`

Delegate ServiceInterface `json:"-"`
methods map[string]reflect.Value `json:"-"`
}
Expand All @@ -45,6 +47,12 @@ func (s *Service) Start(register bool) {

rpcServ.Listen(l)

// the admin server
if s.Config.AdminAddr != nil {
s.Admin = NewServiceAdmin(s)
go s.Admin.Listen(s.Config.AdminAddr)
}

// Watch signals for shutdown
c := make(chan os.Signal, 1)
go watchSignals(c, s)
Expand Down

0 comments on commit 9eaf354

Please sign in to comment.