From a198b99212804cf028ad8f9106014a1ba49fdb01 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 23 Aug 2017 22:36:41 +0800 Subject: [PATCH] add oracle connect and data sync rpc service --- db/gorp.go | 26 +++++++++++++++++ rima/main.go | 7 +++-- rsec/sync.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 db/gorp.go create mode 100644 rsec/sync.go diff --git a/db/gorp.go b/db/gorp.go new file mode 100644 index 0000000..14c8907 --- /dev/null +++ b/db/gorp.go @@ -0,0 +1,26 @@ +package db + +import ( + "gopkg.in/gorp.v2" + "github.com/carusyte/stock/util" + "database/sql" +) + +var ( + Ora *gorp.DbMap +) + +func init() { + db, err := sql.Open("ora", "rima/rima@10.16.53.30:1521/hundsun") + util.CheckErr(err, "sql.Open failed,") + + db.SetMaxOpenConns(64) + db.SetMaxIdleConns(64) + + // construct a gorp DbMap + dbmap := &gorp.DbMap{Db: db, Dialect: gorp.OracleDialect{}} + + util.CheckErr(db.Ping(), "Failed to ping db,") + + Ora = dbmap +} diff --git a/rima/main.go b/rima/main.go index be25fdd..0662a50 100644 --- a/rima/main.go +++ b/rima/main.go @@ -48,8 +48,11 @@ func (t *Arith) Divide(args *Args, quo *Quotient) error { func main() { gio.Init() // If the command line invokes the mapper or reducer, execute it and exit. - scorer := new(rsec.IndcScorer) - rpc.Register(scorer) + + //Register RPC services + rpc.Register(new(rsec.IndcScorer)) + rpc.Register(new(rsec.DataSync)) + rpc.HandleHTTP() l, e := net.Listen("tcp", ":45321") if e != nil { diff --git a/rsec/sync.go b/rsec/sync.go new file mode 100644 index 0000000..ca0471f --- /dev/null +++ b/rsec/sync.go @@ -0,0 +1,81 @@ +package rsec + +import ( + "github.com/carusyte/stock/model" + "fmt" + "github.com/carusyte/stock/util" + "github.com/carusyte/rima/db" + "strings" + "log" + "time" + "github.com/pkg/errors" +) + +type DataSync struct{} + +func (d *DataSync) SyncKdjFd(req *[]*model.KDJfdView, rep *bool) error { + st := time.Now() + fdvs := *req + log.Printf("DataSync.SyncKdjFd called, input size: %d", len(fdvs)) + defer func() { + log.Printf("DataSync.SyncKdjFd finished, input size: %d, time: %.2f", len(fdvs), time.Since(st).Seconds()) + }() + tran, e := db.Ora.Begin() + if e != nil { + return errors.Wrap(e, "failed to begin new transaction") + } + if len(fdvs) > 0 { + valueStrings := make([]string, 0, len(fdvs)) + valueArgs := make([]interface{}, 0, len(fdvs)*10) + dt, tm := util.TimeStr() + for _, f := range fdvs { + valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + valueArgs = append(valueArgs, f.Indc) + valueArgs = append(valueArgs, f.Fid) + valueArgs = append(valueArgs, f.Cytp) + valueArgs = append(valueArgs, f.Bysl) + valueArgs = append(valueArgs, f.SmpNum) + valueArgs = append(valueArgs, f.FdNum) + valueArgs = append(valueArgs, f.Weight) + valueArgs = append(valueArgs, f.Remarks) + valueArgs = append(valueArgs, dt) + valueArgs = append(valueArgs, tm) + } + stmt := fmt.Sprintf("INSERT INTO indc_feat (indc,fid,cytp,bysl,smp_num,fd_num,weight,remarks,"+ + "udate,utime) VALUES %s on duplicate key update fid=values(fid),fd_num=values(fd_num),weight=values"+ + "(weight),remarks=values(remarks),udate=values(udate),utime=values(utime)", + strings.Join(valueStrings, ",")) + _, err := tran.Exec(stmt, valueArgs...) + if err != nil { + tran.Rollback() + return errors.Wrap(err, "failed to bulk insert indc_feat") + } + + for _, f := range fdvs { + valueStrings = make([]string, 0, f.SmpNum) + valueArgs = make([]interface{}, 0, f.SmpNum*7) + for i := 0; i < f.SmpNum; i++ { + valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?)") + valueArgs = append(valueArgs, f.Fid) + valueArgs = append(valueArgs, i) + valueArgs = append(valueArgs, f.K[i]) + valueArgs = append(valueArgs, f.D[i]) + valueArgs = append(valueArgs, f.J[i]) + valueArgs = append(valueArgs, dt) + valueArgs = append(valueArgs, tm) + } + stmt = fmt.Sprintf("INSERT INTO kdj_feat_dat (fid,seq,k,d,j,"+ + "udate,utime) VALUES %s on duplicate key update k=values(k),d=values(d),"+ + "j=values(j),udate=values(udate),utime=values(utime)", + strings.Join(valueStrings, ",")) + _, err = tran.Exec(stmt, valueArgs...) + if err != nil { + tran.Rollback() + return errors.Wrap(err, "failed to bulk insert kdj_feat_dat") + } + } + tran.Commit() + } + *rep = true + return nil +}