Skip to content

Commit

Permalink
Merge pull request dtm-labs#55 from yedf/alpha
Browse files Browse the repository at this point in the history
fix postgres sql
  • Loading branch information
yedf2 authored Nov 12, 2021
2 parents fec4957 + 7d40b37 commit fb40e80
Show file tree
Hide file tree
Showing 21 changed files with 457 additions and 396 deletions.
3 changes: 3 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ func checkConfig() string {
func WaitDBUp() {
sdb, err := dtmimp.StandaloneDB(DtmConfig.DB)
dtmimp.FatalIfError(err)
defer func() {
sdb.Close()
}()
for _, err := dtmimp.DBExec(sdb, "select 1"); err != nil; { // wait for mysql to start
time.Sleep(3 * time.Second)
_, err = dtmimp.DBExec(sdb, "select 1")
Expand Down
4 changes: 4 additions & 0 deletions common/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ func TestDb(t *testing.T) {
assert.NotEqual(t, nil, err)
}

func TestWaitDBUp(t *testing.T) {
WaitDBUp()
}

func TestDbAlone(t *testing.T) {
db, err := dtmimp.StandaloneDB(DtmConfig.DB)
assert.Nil(t, err)
Expand Down
4 changes: 2 additions & 2 deletions dtmcli/barrier.postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ create table if not exists dtm_barrier.barrier(
op varchar(45) default '',
barrier_id varchar(45) default '',
reason varchar(45) default '',
create_time timestamp(0) DEFAULT NULL,
update_time timestamp(0) DEFAULT NULL,
create_time timestamp(0) with time zone DEFAULT NULL,
update_time timestamp(0) with time zone DEFAULT NULL,
PRIMARY KEY(id),
CONSTRAINT uniq_barrier unique(gid, branch_id, op, barrier_id)
);
2 changes: 1 addition & 1 deletion dtmcli/dtmimp/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func GetDsn(conf map[string]string) string {
dsn := map[string]string{
"mysql": fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
conf["user"], conf["password"], host, conf["port"], conf["database"]),
"postgres": fmt.Sprintf("host=%s user=%s password=%s dbname='%s' port=%s sslmode=disable TimeZone=Asia/Shanghai",
"postgres": fmt.Sprintf("host=%s user=%s password=%s dbname='%s' port=%s sslmode=disable",
host, conf["user"], conf["password"], conf["database"], conf["port"]),
}[driver]
PanicIf(dsn == "", fmt.Errorf("unknow driver: %s", driver))
Expand Down
65 changes: 36 additions & 29 deletions dtmsvr/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/yedf/dtm/dtmcli"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

Expand All @@ -13,10 +14,11 @@ func svcSubmit(t *TransGlobal) (interface{}, error) {
err := t.saveNew(db)

if err == errUniqueConflict {
dbt := transFromDb(db, t.Gid)
dbt := transFromDb(db.DB, t.Gid, false)
if dbt.Status == dtmcli.StatusPrepared {
updates := t.setNextCron(cronReset)
db.Must().Model(t).Where("gid=? and status=?", t.Gid, dtmcli.StatusPrepared).Select(append(updates, "status")).Updates(t)
dbr := db.Must().Model(&TransGlobal{}).Where("gid=? and status=?", t.Gid, dtmcli.StatusPrepared).Select(append(updates, "status")).Updates(t)
checkAffected(dbr)
} else if dbt.Status != dtmcli.StatusSubmitted {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot sumbmit", dbt.Status)}, nil
}
Expand All @@ -28,7 +30,7 @@ func svcPrepare(t *TransGlobal) (interface{}, error) {
t.Status = dtmcli.StatusPrepared
err := t.saveNew(dbGet())
if err == errUniqueConflict {
dbt := transFromDb(dbGet(), t.Gid)
dbt := transFromDb(dbGet().DB, t.Gid, false)
if dbt.Status != dtmcli.StatusPrepared {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot prepare", dbt.Status)}, nil
}
Expand All @@ -38,40 +40,45 @@ func svcPrepare(t *TransGlobal) (interface{}, error) {

func svcAbort(t *TransGlobal) (interface{}, error) {
db := dbGet()
dbt := transFromDb(db, t.Gid)
dbt := transFromDb(db.DB, t.Gid, false)
if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != dtmcli.StatusPrepared && dbt.Status != dtmcli.StatusAborting {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("trans type: '%s' current status '%s', cannot abort", dbt.TransType, dbt.Status)}, nil
}
dbt.changeStatus(db, dtmcli.StatusAborting)
return dbt.Process(db), nil
}

func svcRegisterBranch(branch *TransBranch, data map[string]string) (interface{}, error) {
db := dbGet()
dbt := transFromDb(db, branch.Gid)
if dbt.Status != dtmcli.StatusPrepared {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}, nil
}
func svcRegisterBranch(branch *TransBranch, data map[string]string) (ret interface{}, rerr error) {
err := dbGet().Transaction(func(db *gorm.DB) error {
dbt := transFromDb(db, branch.Gid, true)
if dbt.Status != dtmcli.StatusPrepared {
ret = map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status: %s cannot register branch", dbt.Status)}
return nil
}

branches := []TransBranch{*branch, *branch}
if dbt.TransType == "tcc" {
for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} {
branches[i].Op = b
branches[i].URL = data[b]
branches := []TransBranch{*branch, *branch}
if dbt.TransType == "tcc" {
for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} {
branches[i].Op = b
branches[i].URL = data[b]
}
} else if dbt.TransType == "xa" {
branches[0].Op = dtmcli.BranchRollback
branches[0].URL = data["url"]
branches[1].Op = dtmcli.BranchCommit
branches[1].URL = data["url"]
} else {
rerr = fmt.Errorf("unknow trans type: %s", dbt.TransType)
return nil
}
} else if dbt.TransType == "xa" {
branches[0].Op = dtmcli.BranchRollback
branches[0].URL = data["url"]
branches[1].Op = dtmcli.BranchCommit
branches[1].URL = data["url"]
} else {
return nil, fmt.Errorf("unknow trans type: %s", dbt.TransType)
}

db.Must().Clauses(clause.OnConflict{
DoNothing: true,
}).Create(branches)
global := TransGlobal{Gid: branch.Gid}
global.touch(dbGet(), cronKeep)
return dtmcli.MapSuccess, nil
dbr := db.Clauses(clause.OnConflict{
DoNothing: true,
}).Create(branches)
checkAffected(dbr)
ret = dtmcli.MapSuccess
return nil
})
e2p(err)
return
}
2 changes: 1 addition & 1 deletion dtmsvr/api_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func query(c *gin.Context) (interface{}, error) {
return nil, errors.New("no gid specified")
}
db := dbGet()
trans := transFromDb(db, gid)
trans := transFromDb(db.DB, gid, false)
branches := []TransBranch{}
db.Must().Where("gid", gid).Find(&branches)
return map[string]interface{}{"transaction": trans, "branches": branches}, nil
Expand Down
24 changes: 12 additions & 12 deletions dtmsvr/dtmsvr.postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ CREATE TABLE if not EXISTS dtm.trans_global (
status varchar(45) NOT NULL,
query_prepared varchar(128) NOT NULL,
protocol varchar(45) not null,
create_time timestamp(0) DEFAULT NULL,
update_time timestamp(0) DEFAULT NULL,
commit_time timestamp(0) DEFAULT NULL,
finish_time timestamp(0) DEFAULT NULL,
rollback_time timestamp(0) DEFAULT NULL,
create_time timestamp(0) with time zone DEFAULT NULL,
update_time timestamp(0) with time zone DEFAULT NULL,
commit_time timestamp(0) with time zone DEFAULT NULL,
finish_time timestamp(0) with time zone DEFAULT NULL,
rollback_time timestamp(0) with time zone DEFAULT NULL,
options varchar(256) DEFAULT '',
custom_data varchar(256) DEFAULT '',
next_cron_interval int default null,
next_cron_time timestamp(0) default null,
next_cron_time timestamp(0) with time zone default null,
owner varchar(128) not null default '',
PRIMARY KEY (id),
CONSTRAINT gid UNIQUE (gid)
Expand All @@ -34,14 +34,14 @@ CREATE TABLE IF NOT EXISTS dtm.trans_branch_op (
gid varchar(128) NOT NULL,
url varchar(128) NOT NULL,
data TEXT,
bin_data BLOB,
bin_data bytea,
branch_id VARCHAR(128) NOT NULL,
op varchar(45) NOT NULL,
status varchar(45) NOT NULL,
finish_time timestamp(0) DEFAULT NULL,
rollback_time timestamp(0) DEFAULT NULL,
create_time timestamp(0) DEFAULT NULL,
update_time timestamp(0) DEFAULT NULL,
finish_time timestamp(0) with time zone DEFAULT NULL,
rollback_time timestamp(0) with time zone DEFAULT NULL,
create_time timestamp(0) with time zone DEFAULT NULL,
update_time timestamp(0) with time zone DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT gid_uniq UNIQUE (gid, branch_id, op)
CONSTRAINT gid_branch_uniq UNIQUE (gid, branch_id, op)
);
Loading

0 comments on commit fb40e80

Please sign in to comment.