Skip to content

Commit

Permalink
update client revision to 54460 (#37)
Browse files Browse the repository at this point in the history
* update client revision to 54460

* add tests

* fix tests

* fix  tests

* update golangci

* update linter
  • Loading branch information
vahid-sohrabloo authored Sep 5, 2022
1 parent 8c46d7a commit fd6ce3e
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 39 deletions.
21 changes: 4 additions & 17 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,15 @@ linters:
disable-all: true
enable:
# - bodyclose
- deadcode
- depguard
- dogsled
- dupl
- errcheck
- exportloopref
- exhaustive
- funlen
- gochecknoinits
- goconst
# - gocritic go 1.18
# - gocritic
- gocyclo
- gofmt
- goimports
Expand All @@ -71,19 +69,15 @@ linters:
- lll
- misspell
- nakedret
- noctx
# - noctx
- nolintlint
- rowserrcheck
- staticcheck
- structcheck
- stylecheck
- typecheck
- unconvert
- unparam
# - unparam
- unused
- varcheck
- whitespace
# - goerr113

# don't enable:
# - asciicheck
Expand All @@ -92,6 +86,7 @@ linters:
# - gocognit
# - godot
# - godox
# - goerr113
# - interfacer
# - maligned
# - nestif
Expand Down Expand Up @@ -131,11 +126,3 @@ issues:

run:
skip-dirs:


# golangci.com configuration
# https://github.com/golangci/golangci/wiki/Configuration
service:
golangci-lint-version: 1.23.x # use the fixed version to not introduce new linters unexpectedly
prepare:
- echo "here I can run custom commands, but no preparation needed for this repo"
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ endif

# Dependency versions
GOTESTSUM_VERSION = 1.8.1
GOLANGCI_VERSION = 1.47.0
GOLANGCI_VERSION = 1.49.0

GOLANG_VERSION = 1.14

Expand Down
64 changes: 47 additions & 17 deletions chconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net"
"strconv"
"time"

"github.com/vahid-sohrabloo/chconn/v2/column"
Expand Down Expand Up @@ -71,7 +72,7 @@ const (
dbmsVersionMajor = 1
dbmsVersionMinor = 0
dbmsVersionPatch = 0
dbmsVersionRevision = 54459
dbmsVersionRevision = 54460
)

type queryProcessingStage uint64
Expand Down Expand Up @@ -167,7 +168,7 @@ type conn struct {
}

// Connect establishes a connection to a ClickHouse server using the environment and connString (in URL or DSN format)
// to provide configuration. See documenting for ParseConfig for details. ctx can be used to cancel a connect attempt.
// to provide configuration. See documentation for ParseConfig for details. ctx can be used to cancel a connect attempt.
func Connect(ctx context.Context, connString string) (Conn, error) {
config, err := ParseConfig(connString)
if err != nil {
Expand All @@ -184,19 +185,13 @@ func Connect(ctx context.Context, connString string) (Conn, error) {
// authentication error will terminate the chain of attempts (like libpq:
// https://www.postgresql.org/docs/12/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error. Otherwise,
// if all attempts fail the last error is returned.
func ConnectConfig(ctx context.Context, config *Config) (c Conn, err error) {
func ConnectConfig(octx context.Context, config *Config) (c Conn, err error) {
// Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
// zero values.
if !config.createdByParseConfig {
panic("config must be created by ParseConfig")
}

if config.ConnectTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, config.ConnectTimeout)
defer cancel()
}

// Simplify usage by treating primary config and fallbacks the same.
fallbackConfigs := []*FallbackConfig{
{
Expand All @@ -206,7 +201,7 @@ func ConnectConfig(ctx context.Context, config *Config) (c Conn, err error) {
},
}
fallbackConfigs = append(fallbackConfigs, config.Fallbacks...)

ctx := octx
fallbackConfigs, err = expandWithIPs(ctx, config.LookupFunc, fallbackConfigs)
if err != nil {
return nil, &connectError{config: config, msg: "hostname resolving error", err: err}
Expand All @@ -216,12 +211,30 @@ func ConnectConfig(ctx context.Context, config *Config) (c Conn, err error) {
return nil, &connectError{config: config, msg: "hostname resolving error", err: ErrIPNotFound}
}

foundBestServer := false
var fallbackConfig *FallbackConfig
for _, fc := range fallbackConfigs {
// ConnectTimeout restricts the whole connection process.
if config.ConnectTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(octx, config.ConnectTimeout)
defer cancel()
} else {
ctx = octx
}
c, err = connect(ctx, config, fc)
if err == nil {
foundBestServer = true
break
} else if err, ok := err.(*ChError); ok {
return nil, &connectError{config: config, msg: "server error", err: err}
} else if chErr, ok := err.(*ChError); ok {
return nil, &connectError{config: config, msg: "server error", err: chErr}
}
}

if !foundBestServer && fallbackConfig != nil {
c, err = connect(ctx, config, fallbackConfig)
if cherr, ok := err.(*ChError); ok {
err = &connectError{config: config, msg: "server error", err: cherr}
}
}

Expand Down Expand Up @@ -250,11 +263,24 @@ func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*Fallba
}

for _, ip := range ips {
configs = append(configs, &FallbackConfig{
Host: ip,
Port: fb.Port,
TLSConfig: fb.TLSConfig,
})
splitIP, splitPort, err := net.SplitHostPort(ip)
if err == nil {
port, err := strconv.ParseUint(splitPort, 10, 16)
if err != nil {
return nil, fmt.Errorf("error parsing port (%s) from lookup: %w", splitPort, err)
}
configs = append(configs, &FallbackConfig{
Host: splitIP,
Port: uint16(port),
TLSConfig: fb.TLSConfig,
})
} else {
configs = append(configs, &FallbackConfig{
Host: ip,
Port: fb.Port,
TLSConfig: fb.TLSConfig,
})
}
}
}

Expand All @@ -271,6 +297,10 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port)
c.conn, err = config.DialFunc(ctx, network, address)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
err = &errTimeout{err: err}
}
return nil, &connectError{config: config, msg: "dial error", err: err}
}

Expand Down
1 change: 1 addition & 0 deletions internal/helper/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ const (
DbmsMinProtocolWithCustomSerialization = 54454
DbmsMinProtocolWithQuotaKey = 54458
DbmsMinProtocolWithParameters = 54459
DbmsMinProtocolWithServerQueryTimeInProgress = 54460
)
6 changes: 6 additions & 0 deletions progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Progress struct {
TotalRows uint64
WriterRows uint64
WrittenBytes uint64
ElapsedNS uint64
}

func newProgress() *Progress {
Expand All @@ -35,6 +36,11 @@ func (p *Progress) read(ch *conn) (err error) {
return &readError{"progress: read WrittenBytes", err}
}
}
if ch.serverInfo.Revision >= helper.DbmsMinProtocolWithServerQueryTimeInProgress {
if p.ElapsedNS, err = ch.reader.Uvarint(); err != nil {
return &readError{"progress: read ElapsedNS", err}
}
}

return nil
}
23 changes: 20 additions & 3 deletions select_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vahid-sohrabloo/chconn/v2/column"
"github.com/vahid-sohrabloo/chconn/v2/internal/helper"
"github.com/vahid-sohrabloo/chconn/v2/types"
)

Expand Down Expand Up @@ -140,24 +141,27 @@ func TestSelectParameters(t *testing.T) {
c, err := ConnectConfig(context.Background(), config)
require.NoError(t, err)

colA := column.New[uint32]()
colA := column.New[int32]()
colB := column.NewString()
colC := column.NewDate[types.DateTime]()
colD := column.NewMap[string, uint8](column.NewString(), column.New[uint8]())
colE := column.New[uint32]()
res, err := c.SelectWithOption(context.Background(),
"select {a: UInt32}, {b: String}, {c: DateTime}, {d: Map(String, UInt8)}",
"select {a: Int32}, {b: String}, {c: DateTime}, {d: Map(String, UInt8)}, {e: UInt32}",
&QueryOptions{
Parameters: NewParameters(
IntParameter("a", 13),
StringParameter("b", "str'"),
StringParameter("c", "2022-08-04 18:30:53"),
StringParameter("d", `{'a': 1, 'b': 2}`),
UintParameter("e", 14),
),
},
colA,
colB,
colC,
colD,
colE,
)

if err != nil && err.Error() == "parameters are not supported by the server" {
Expand All @@ -173,13 +177,15 @@ func TestSelectParameters(t *testing.T) {
require.Len(t, colB.Data(), 1)
require.Len(t, colC.Data(), 1)
require.Len(t, colD.Data(), 1)
assert.Equal(t, uint32(13), colA.Data()[0])
require.Len(t, colE.Data(), 1)
assert.Equal(t, int32(13), colA.Data()[0])
assert.Equal(t, "str'", colB.Data()[0])
assert.Equal(t, "2022-08-04 18:30:53", colC.Data()[0].Format("2006-01-02 15:04:05"))
assert.Equal(t, map[string]uint8{
"a": 1,
"b": 2,
}, colD.Data()[0])
assert.Equal(t, uint32(14), colE.Data()[0])

c.Close()
}
Expand All @@ -191,6 +197,7 @@ func TestSelectProgressError(t *testing.T) {
name string
wantErr string
numberValid int
minRevision uint64
}{
{
name: "read ReadRows",
Expand All @@ -217,6 +224,12 @@ func TestSelectProgressError(t *testing.T) {
wantErr: "progress: read WrittenBytes (timeout)",
numberValid: startValidReader + 4,
},
{
name: "read ElapsedNS",
wantErr: "progress: read ElapsedNS (timeout)",
numberValid: startValidReader + 5,
minRevision: helper.DbmsMinProtocolWithServerQueryTimeInProgress,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -232,6 +245,10 @@ func TestSelectProgressError(t *testing.T) {

c, err := ConnectConfig(context.Background(), config)
require.NoError(t, err)
if c.ServerInfo().Revision < tt.minRevision {
c.Close()
return
}
colSleep := column.New[uint8]()
colNumber := column.New[uint64]()
res, err := c.SelectWithOption(context.Background(),
Expand Down
1 change: 1 addition & 0 deletions types/Int256.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func Int256FromBigEx(i *big.Int) (Int256, bool) {
}

// Big returns 256-bit value as a *big.Int.
//
//nolint:dupl
func (u Int256) Big() *big.Int {
t := new(big.Int)
Expand Down
3 changes: 2 additions & 1 deletion types/ipv4.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package types

import "net/netip"

// IPv4 is a compatible type for IPv4 address in clickhouse.
// IPv4 is a compatible type for IPv4 address in clickhouse.
//
// clickhouse use Little endian for IPv4. but golang use big endian
type IPv4 [4]byte

Expand Down
1 change: 1 addition & 0 deletions types/uint256.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func Uint256FromBigEx(i *big.Int) (Uint256, bool) {
}

// Big returns 256-bit value as a *big.Int.
//
//nolint:dupl
func (u Uint256) Big() *big.Int {
t := new(big.Int)
Expand Down

0 comments on commit fd6ce3e

Please sign in to comment.