Skip to content

Commit

Permalink
add SqlBuilderFunc type
Browse files Browse the repository at this point in the history
  • Loading branch information
eidng8 committed Jan 15, 2025
1 parent 63723fa commit 01b830b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
40 changes: 36 additions & 4 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/eidng8/go-utils"
)

type SqlBuilderFunc func(params []any) (string, []any)

// CachedWriter is an interface for writing data to a database in a batched
// manner. This allows for better performance when writing a large number of
// records to the database, or write operation is too frequent.
Expand Down Expand Up @@ -48,12 +50,26 @@ type DelegateCachedWriter interface {
SetQueryBuilder(func([]any) (string, []any))
}

// LoggedWriter is an interface for a CachedWriter that allows the user to set
// the log to write console logs.
type LoggedWriter interface {
// SetLogger sets the log to write console logs.
SetLogger(log utils.TaggedLogger)
}

// SplitLoggedWriter is an interface for a CachedWriter that allows the user to
// use separate logs for writing console and db logs.
type SplitLoggedWriter interface {
LoggedWriter
// SetFailedLog sets the log to write records with db failure.
SetFailedLog(log io.Writer)
}

// NewMemCachedWriter creates a new MemCachedWriter with the given database
// connection and query builder function. It sets the default values of
// retries to 3, and interval to 1.
func NewMemCachedWriter(
db *sql.DB, sqlBuilder func(params []any) (string, []any),
db *sql.DB, sqlBuilder SqlBuilderFunc,
) *MemCachedWriter {
return &MemCachedWriter{
db: db,
Expand All @@ -63,9 +79,9 @@ func NewMemCachedWriter(
}
}

// MemCachedWriter is a DelegateCachedWriter that writes string data to a
// database in a batched manner. It uses a query builder function to build the
// query and arguments for writing data to the DB.
// MemCachedWriter is a DelegateCachedWriter and SplitLoggedWriter that writes
// string data to a database in a batched manner. It uses a query builder
// function to build the query and arguments for writing data to the DB.
type MemCachedWriter struct {
db *sql.DB
dataCache []any
Expand All @@ -78,24 +94,32 @@ type MemCachedWriter struct {
builder func([]any) (string, []any)
}

// SetLogger sets the log to write console logs.
func (w *MemCachedWriter) SetLogger(log utils.TaggedLogger) {
w.logger = log
}

// SetQueryBuilder sets the function that will be used to build the query
// and arguments for writing data to the DB.
func (w *MemCachedWriter) SetQueryBuilder(
fn func([]any) (string, []any),
) {
w.builder = fn
}

// SetDB sets the database connection for the writer.
func (w *MemCachedWriter) SetDB(db *sql.DB) {
w.db = db
}

// SetRetries sets the number of times the writer will attempt to write data
// to the DB before giving up. Defaults to 3.
func (w *MemCachedWriter) SetRetries(numRetries int) {
w.maxRetries = numRetries
}

// SetInterval sets the interval (second) at which the writer will attempt
// to write data to the DB. Defaults to 1 second.
func (w *MemCachedWriter) SetInterval(duration time.Duration) {
w.interval = duration
}
Expand All @@ -104,24 +128,29 @@ func (w *MemCachedWriter) SetFailedLog(log io.Writer) {
w.failedLog = log
}

// Pause temporarily stops the writer from writing to the DB. Data can still
// be added to the cache while the writer is paused.
func (w *MemCachedWriter) Pause() {
w.cacheMu.Lock()
defer w.cacheMu.Unlock()
atomic.StoreInt32(&w.paused, 1)
}

// Resume restarts the writer after a pause.
func (w *MemCachedWriter) Resume() {
w.cacheMu.Lock()
defer w.cacheMu.Unlock()
atomic.StoreInt32(&w.paused, 0)
}

// Push adds a record to the cache
func (w *MemCachedWriter) Push(data any) {
w.cacheMu.Lock()
defer w.cacheMu.Unlock()
w.dataCache = append(w.dataCache, data)
}

// Write sends all cached data to the DB in a multi-value insert
func (w *MemCachedWriter) Write() {
if 1 == atomic.LoadInt32(&w.paused) {
return
Expand Down Expand Up @@ -164,6 +193,9 @@ func (w *MemCachedWriter) Write() {
}
}

// Start begins the writer and run until the given channel is signaled.
// The writer will attempt to write data to the DB at the interval set by
// SetInterval.
func (w *MemCachedWriter) Start(stopChan <-chan struct{}) {
ticker := time.NewTicker(w.interval)
go func() {
Expand Down
2 changes: 1 addition & 1 deletion writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func Test_Write_retries_and_log_io_error(t *testing.T) {
writer.logFailed([]any{"test", "val"})
}

func bt(t *testing.T) func(params []any) (string, []any) {
func bt(t *testing.T) SqlBuilderFunc {
return func(params []any) (string, []any) {
require.Equal(t, []any{"http://localhost/up?a=1&b=2"}, params)
return "insert into test values(?)", []any{"test value"}
Expand Down

0 comments on commit 01b830b

Please sign in to comment.