-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgateway.go
159 lines (130 loc) · 3.81 KB
/
gateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package orm
import (
"context"
"github.com/phogolabs/log"
"github.com/phogolabs/orm/dialect"
"github.com/phogolabs/orm/dialect/sql"
"github.com/phogolabs/prana"
)
var _ Querier = &Gateway{}
// Gateway is connected to a database and can executes SQL queries against it.
type Gateway struct {
engine *engine
}
// Connect creates a new gateway connecto to the provided URL.
func Connect(url string, opts ...Option) (*Gateway, error) {
driver, source, err := prana.ParseURL(url)
if err != nil {
return nil, err
}
gateway, err := Open(driver, source, opts...)
if err != nil {
return nil, err
}
if err = gateway.Ping(context.TODO()); err != nil {
return nil, err
}
return gateway, nil
}
// Open creates a new gateway connected to the provided source.
func Open(name, source string, opts ...Option) (*Gateway, error) {
driver, err := sql.Open(name, source)
if err != nil {
return nil, err
}
dialect := driver.Dialect()
// setup the provider
provider := &sql.Provider{}
provider.SetDialect(dialect)
gateway := &Gateway{
engine: &engine{
querier: driver,
dialect: dialect,
provider: provider,
},
}
for _, opt := range opts {
if err := opt.Apply(gateway); err != nil {
return nil, err
}
}
return gateway, nil
}
// Ping pins the underlying database
func (g *Gateway) Ping(ctx context.Context) error {
driver := g.engine.querier.(dialect.Driver)
// make a ping request
return driver.Ping(ctx)
}
// Close closes the connection to the database.
func (g *Gateway) Close() error {
driver := g.engine.querier.(dialect.Driver)
// close the connection
return driver.Close()
}
// Dialect returns the driver's dialect
func (g *Gateway) Dialect() string {
return g.engine.dialect
}
// Migrate runs all pending migration
func (g *Gateway) Migrate(storage FileSystem) error {
driver := g.engine.querier.(dialect.Driver)
// run the migration
return driver.Migrate(storage)
}
// Begin begins a transaction and returns an *Tx
func (g *Gateway) Begin(ctx context.Context) (*GatewayTx, error) {
driver := g.engine.querier.(dialect.Driver)
tx, err := driver.Tx(ctx)
if err != nil {
return nil, err
}
gtx := &GatewayTx{
engine: &engine{
querier: tx,
dialect: g.engine.dialect,
provider: g.engine.provider,
},
}
return gtx, nil
}
// RunInTx runs a callback function within a transaction. It commits the
// transaction if succeeds, otherwise rollbacks.
func (g *Gateway) RunInTx(ctx context.Context, fn RunTxFunc) error {
gtx, err := g.Begin(ctx)
if err != nil {
return err
}
if err := fn(gtx); err != nil {
if err := gtx.Rollback(); err != nil {
log.WithError(err).Error("cannot rollback")
}
return err
}
return gtx.Commit()
}
// All executes the query and returns a list of entities.
func (g *Gateway) All(ctx context.Context, q sql.Querier, v interface{}) error {
return g.engine.All(ctx, q, v)
}
// Only returns the only entity in the query, returns an error if not
// exactly one entity was returned.
func (g *Gateway) Only(ctx context.Context, q sql.Querier, v interface{}) error {
return g.engine.Only(ctx, q, v)
}
// First returns the first entity in the query. Returns *NotFoundError
// when no records were found.
func (g *Gateway) First(ctx context.Context, q sql.Querier, v interface{}) error {
return g.engine.First(ctx, q, v)
}
// Query executes a query that returns rows, typically a SELECT in SQL.
// It scans the result into the pointer v. In SQL, you it's usually *sql.Rows.
func (g *Gateway) Query(ctx context.Context, q sql.Querier) (*sql.Rows, error) {
return g.engine.Query(ctx, q)
}
// Exec executes a query that doesn't return rows. For example, in SQL, INSERT
// or UPDATE. It scans the result into the pointer v. In SQL, you it's usually
// sql.Result.
func (g *Gateway) Exec(ctx context.Context, q sql.Querier) (sql.Result, error) {
return g.engine.Exec(ctx, q)
}