Skip to content

Commit

Permalink
add timeline libs (aliyun#23)
Browse files Browse the repository at this point in the history
* add timeline libs

* add timeline and batch writer benchmark test

* ajust priority

* add benchmark result in README, improve timeline future

* update README

* update README
  • Loading branch information
zhoucan1990 authored and danielxiaoran committed Oct 12, 2018
1 parent 930a739 commit 6330304
Show file tree
Hide file tree
Showing 16 changed files with 1,849 additions and 0 deletions.
62 changes: 62 additions & 0 deletions timeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# timeline

timeline is a package that provides functionality for easily implementing social scene application such as IM app, feed stream app.

timeline is based on [TableStore](https://cn.aliyun.com/product/ots
) timeline model.

## Installation

```
$ go get github.com/aliyun/aliyun-tablestore-go-sdk
```

## Sample

* im application demo:
```
$ cd timeline/sample/im
$ go run main.go im.go
```

* feed stream demo:
```
$ cd timeline/sample/feed
$ go run main.go feed.go
```

## Benchmark

**VM OS:** Aliyun Linux 17.1 64bit

**VM Configuration:** 4 CPU, 8 GB RAM.

**GO VERSION:** 1.10.3 linux/amd64

**Timeline Message Size:** almost 220 bytes

* on TableStore SSD High-performance instance
```
$ cd timeline
$ go test -bench=. -benchtime=10s -test.cpu=12
goos: linux
goarch: amd64
pkg: github.com/aliyun/aliyun-tablestore-go-sdk/timeline
BenchmarkTmLine_BatchStore_Concurrent-12 10000 1737993 ns/op
BenchmarkTmLine_BatchStore_WriteSpread-12 100 127729883 ns/op
BenchmarkTmLine_BatchStore_WriteSpread_IgnoreMessageLost-12 200 80166859 ns/op
PASS
```

* on TableStore Capacity instance

```
$ cd timeline
$ go test -bench=. -benchtime=10s -test.cpu=12
goos: linux
goarch: amd64
pkg: github.com/aliyun/aliyun-tablestore-go-sdk/timeline
BenchmarkTmLine_BatchStore_Concurrent-12 10000 1791522 ns/op
BenchmarkTmLine_BatchStore_WriteSpread-12 100 124597783 ns/op
BenchmarkTmLine_BatchStore_WriteSpread_IgnoreMessageLost-12 200 83780501 ns/op
```
9 changes: 9 additions & 0 deletions timeline/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package timeline

import "errors"

var (
ErrMisuse = errors.New("misuse")
ErrUnexpected = errors.New("unexpected")
ErrorDone = errors.New("done")
)
117 changes: 117 additions & 0 deletions timeline/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package timeline

import (
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore"
"reflect"
"strings"
)

var typeOfBytes = reflect.TypeOf([]byte(nil))

var DefaultStreamAdapter = &StreamMessageAdapter{
IdKey: "Id",
ContentKey: "Content",
TimestampKey: "Timestamp",
AttrPrefix: "Attr_",
}

type Message interface{}

type ColumnMap map[string]interface{}

func LoadColumnMap(attrs []*tablestore.AttributeColumn) ColumnMap {
cols := make(map[string]interface{})
for _, attr := range attrs {
cols[attr.ColumnName] = attr.Value
}
return cols
}

type MessageAdapter interface {
Marshal(msg Message) (ColumnMap, error)
Unmarshal(cols ColumnMap) (Message, error)
}

type StreamMessage struct {
Id string
Content interface{}
Timestamp int64
Attr map[string]interface{}
}

type StreamMessageAdapter struct {
IdKey string
ContentKey string
TimestampKey string
AttrPrefix string
}

func (s *StreamMessageAdapter) Marshal(msg Message) (ColumnMap, error) {
sMsg, ok := msg.(*StreamMessage)
if !ok {
return nil, ErrUnexpected
}
cols := make(map[string]interface{})

cols[s.IdKey] = sMsg.Id
if err := checkInterface(sMsg.Content); err != nil {
return nil, err
}
cols[s.ContentKey] = sMsg.Content
cols[s.TimestampKey] = sMsg.Timestamp
for key, val := range sMsg.Attr {
if err := checkInterface(val); err != nil {
return nil, err
}
cols[s.AttrPrefix+key] = val
}
return cols, nil
}

func checkInterface(val interface{}) error {
v := reflect.ValueOf(val)
switch v.Kind() {
case reflect.Bool:
return nil
case reflect.Int64:
return nil
case reflect.String:
return nil
case reflect.Slice:
if v.Type() == typeOfBytes {
return nil
}
return ErrMisuse
default:
return ErrMisuse
}
}

func (s *StreamMessageAdapter) Unmarshal(cols ColumnMap) (Message, error) {
sMsg := new(StreamMessage)
sMsg.Attr = make(map[string]interface{})
for key, val := range cols {
switch key {
case s.IdKey:
if id, ok := val.(string); ok {
sMsg.Id = id
} else {
return nil, ErrUnexpected
}
case s.ContentKey:
sMsg.Content = val
case s.TimestampKey:
if timestamp, ok := val.(int64); ok {
sMsg.Timestamp = timestamp
} else {
return nil, ErrUnexpected
}
default:
if strings.HasPrefix(key, s.AttrPrefix) {
realKey := key[len(s.AttrPrefix):]
sMsg.Attr[realKey] = val
}
}
}
return sMsg, nil
}
60 changes: 60 additions & 0 deletions timeline/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package timeline

import (
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore"
"github.com/aliyun/aliyun-tablestore-go-sdk/timeline/writer"
)

var (
DefaultFirstPk = "TimelineId"
DefaultSecondPk = "Sequence"

MinTTL = 86400
)

type StoreOption struct {
Endpoint string
Instance string
TableName string
AkId string
AkSecret string

Schema *Schema
TTL int
Throughput *tablestore.ReservedThroughput

TableStoreConfig *tablestore.TableStoreConfig
WriterConfig *writer.Config
}

type Schema struct {
FirstPk string
SecondPk string
}

func (b *StoreOption) prepare() error {
if b.Endpoint == "" || b.Instance == "" || b.TableName == "" ||
b.AkId == "" || b.AkSecret == "" {
return ErrMisuse
}
//fill in default value if empty
if b.Schema == nil {
b.Schema = &Schema{FirstPk: DefaultFirstPk, SecondPk: DefaultSecondPk}
}
if b.Schema.FirstPk == "" {
b.Schema.FirstPk = DefaultFirstPk
}
if b.Schema.SecondPk == "" {
b.Schema.SecondPk = DefaultSecondPk
}
if b.TTL > 0 && b.TTL < MinTTL {
b.TTL = MinTTL
}
if b.TTL == 0 {
b.TTL = -1
}
if b.Throughput == nil {
b.Throughput = new(tablestore.ReservedThroughput)
}
return nil
}
67 changes: 67 additions & 0 deletions timeline/promise/future.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package promise

import (
"errors"
"sync"
)

type Future struct {
done chan struct{}
result interface{}
err error
once sync.Once
}

func NewFuture() *Future {
return &Future{
done: make(chan struct{}),
}
}

func (f *Future) Get() (interface{}, error) {
<-f.done
return f.result, f.err
}

func (f *Future) Set(v interface{}, err error) {
f.once.Do(func() {
defer close(f.done)
f.result = v
f.err = err
})
}

func (f *Future) FanInGet() ([]*FanResult, error) {
ret, err := f.Get()
if err != nil {
return nil, err
}
if fanRet, ok := ret.([]*FanResult); ok {
return fanRet, nil
}
return nil, errors.New("not a fan in future")
}

type FanResult struct {
Result interface{}
Err error
}

func FanIn(futures ...*Future) *Future {
f := NewFuture()
go func() {
fanResults := make([]*FanResult, len(futures))
wg := new(sync.WaitGroup)
wg.Add(len(futures))
for i, f := range futures {
go func(idx int) {
defer wg.Done()
ret, err := f.Get()
fanResults[idx] = &FanResult{Result: ret, Err: err}
}(i)
}
wg.Wait()
f.Set(fanResults, nil)
}()
return f
}
Loading

0 comments on commit 6330304

Please sign in to comment.