Skip to content

Commit

Permalink
add oracle connect and data sync rpc service
Browse files Browse the repository at this point in the history
  • Loading branch information
carusyte committed Aug 23, 2017
1 parent 78cf6d0 commit a198b99
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 2 deletions.
26 changes: 26 additions & 0 deletions db/gorp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package db

import (
"gopkg.in/gorp.v2"
"github.com/carusyte/stock/util"
"database/sql"
)

var (
Ora *gorp.DbMap
)

func init() {
db, err := sql.Open("ora", "rima/[email protected]:1521/hundsun")
util.CheckErr(err, "sql.Open failed,")

db.SetMaxOpenConns(64)
db.SetMaxIdleConns(64)

// construct a gorp DbMap
dbmap := &gorp.DbMap{Db: db, Dialect: gorp.OracleDialect{}}

util.CheckErr(db.Ping(), "Failed to ping db,")

Ora = dbmap
}
7 changes: 5 additions & 2 deletions rima/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ func (t *Arith) Divide(args *Args, quo *Quotient) error {

func main() {
gio.Init() // If the command line invokes the mapper or reducer, execute it and exit.
scorer := new(rsec.IndcScorer)
rpc.Register(scorer)

//Register RPC services
rpc.Register(new(rsec.IndcScorer))
rpc.Register(new(rsec.DataSync))

rpc.HandleHTTP()
l, e := net.Listen("tcp", ":45321")
if e != nil {
Expand Down
81 changes: 81 additions & 0 deletions rsec/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package rsec

import (
"github.com/carusyte/stock/model"
"fmt"
"github.com/carusyte/stock/util"
"github.com/carusyte/rima/db"
"strings"
"log"
"time"
"github.com/pkg/errors"
)

type DataSync struct{}

func (d *DataSync) SyncKdjFd(req *[]*model.KDJfdView, rep *bool) error {
st := time.Now()
fdvs := *req
log.Printf("DataSync.SyncKdjFd called, input size: %d", len(fdvs))
defer func() {
log.Printf("DataSync.SyncKdjFd finished, input size: %d, time: %.2f", len(fdvs), time.Since(st).Seconds())
}()
tran, e := db.Ora.Begin()
if e != nil {
return errors.Wrap(e, "failed to begin new transaction")
}
if len(fdvs) > 0 {
valueStrings := make([]string, 0, len(fdvs))
valueArgs := make([]interface{}, 0, len(fdvs)*10)
dt, tm := util.TimeStr()
for _, f := range fdvs {
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
valueArgs = append(valueArgs, f.Indc)
valueArgs = append(valueArgs, f.Fid)
valueArgs = append(valueArgs, f.Cytp)
valueArgs = append(valueArgs, f.Bysl)
valueArgs = append(valueArgs, f.SmpNum)
valueArgs = append(valueArgs, f.FdNum)
valueArgs = append(valueArgs, f.Weight)
valueArgs = append(valueArgs, f.Remarks)
valueArgs = append(valueArgs, dt)
valueArgs = append(valueArgs, tm)
}
stmt := fmt.Sprintf("INSERT INTO indc_feat (indc,fid,cytp,bysl,smp_num,fd_num,weight,remarks,"+
"udate,utime) VALUES %s on duplicate key update fid=values(fid),fd_num=values(fd_num),weight=values"+
"(weight),remarks=values(remarks),udate=values(udate),utime=values(utime)",
strings.Join(valueStrings, ","))
_, err := tran.Exec(stmt, valueArgs...)
if err != nil {
tran.Rollback()
return errors.Wrap(err, "failed to bulk insert indc_feat")
}

for _, f := range fdvs {
valueStrings = make([]string, 0, f.SmpNum)
valueArgs = make([]interface{}, 0, f.SmpNum*7)
for i := 0; i < f.SmpNum; i++ {
valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?)")
valueArgs = append(valueArgs, f.Fid)
valueArgs = append(valueArgs, i)
valueArgs = append(valueArgs, f.K[i])
valueArgs = append(valueArgs, f.D[i])
valueArgs = append(valueArgs, f.J[i])
valueArgs = append(valueArgs, dt)
valueArgs = append(valueArgs, tm)
}
stmt = fmt.Sprintf("INSERT INTO kdj_feat_dat (fid,seq,k,d,j,"+
"udate,utime) VALUES %s on duplicate key update k=values(k),d=values(d),"+
"j=values(j),udate=values(udate),utime=values(utime)",
strings.Join(valueStrings, ","))
_, err = tran.Exec(stmt, valueArgs...)
if err != nil {
tran.Rollback()
return errors.Wrap(err, "failed to bulk insert kdj_feat_dat")
}
}
tran.Commit()
}
*rep = true
return nil
}

0 comments on commit a198b99

Please sign in to comment.