Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay committed Aug 2, 2024
1 parent 594422a commit c79f94d
Showing 1 changed file with 182 additions and 106 deletions.
288 changes: 182 additions & 106 deletions queue/sqlite/sqlite.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package sqlite

import (
"database/sql"
"errors"
"os"
"strings"
"sync"
Expand All @@ -26,11 +24,11 @@ import (

type SQLiteQueue struct {
Filename string
DB *sqlx.DB
DBG *gorm.DB
Mu *sync.Mutex
snow *snowflake.Node
ticker *time.Ticker
// DB *sqlx.DB
DBG *gorm.DB
Mu *sync.Mutex
snow *snowflake.Node
ticker *time.Ticker
}

var queueDiskSize = promauto.NewGauge(
Expand All @@ -41,14 +39,36 @@ var queueDiskSize = promauto.NewGauge(
)

type Queue struct {
ID int64 `gorm:"primaryKey;autoIncrement:false"`
TenantID int64 `gorm:"not null;index:idx_queue_name,priority:1,unique"`
Name string `gorm:"not null;index:idx_queue_name,priority:2"`
RateLimit int64 `gorm:"not null"`
Paused bool `gorm:"not null"`
MaxRetries int64 `gorm:"not null"`
// Backoff int64 `gorm:"backoff"`
VisibilityTimeout int `gorm:"not null"`
ID int64 `gorm:"primaryKey;autoIncrement:false"`
TenantID int64 `gorm:"not null;index:idx_queue_name,priority:1,unique"`
Name string `gorm:"not null;index:idx_queue_name,priority:2"`
RateLimit int64 `gorm:"not null"`
Paused bool `gorm:"not null"`
MaxRetries int `gorm:"not null"`
VisibilityTimeout int `gorm:"not null"`
}

type Message struct {
// tenant_id, queue_id, deliver_at, delivered_at, tries, max_tries
ID int64 `gorm:"primaryKey;autoIncrement:false"`
TenantID int64 `gorm:"not null;index:idx_message,priority:1;not null"`
QueueID int64 `gorm:"not null;index:idx_message,priority:2;not null"`

DeliverAt int64 `gorm:"not null;index:idx_message,priority:3;not null"`
DeliveredAt int64 `gorm:"not null;index:idx_message,priority:4;not null"`
Tries int `gorm:"not null;index:idx_message,priority:5;not null"`
MaxTries int `gorm:"not null;index:idx_message,priority:6;not null"`
RequeueIn int `gorm:"not null"`

Message string `gorm:"not null"`
}

type KV struct {
TenantID int64 `gorm:"not null;index:idx_kv,priority:1"`
QueueID int64 `gorm:"not null;index:idx_kv,priority:2"`
MessageID int64 `gorm:"not null;index:idx_kv,priority:3"`
K string `gorm:"not null"`
V string `gorm:"not null"`
}

// var queueMessageCount = promauto.NewGaugeVec(
Expand All @@ -70,14 +90,27 @@ func NewSQLiteQueue(cfg config.SQLiteConfig) *SQLiteQueue {
// newDb = true
// }

olddb, err := sqlx.Open("sqlite3", cfg.Path+"?_journal_mode=WAL&_foreign_keys=off&_auto_vacuum=full")
// olddb, err := sqlx.Open("sqlite3", cfg.Path+"?_journal_mode=WAL&_foreign_keys=off&_auto_vacuum=full")

db, err := gorm.Open(sqlite.Open(cfg.Path+"?_journal_mode=WAL&_foreign_keys=off&_auto_vacuum=full"), &gorm.Config{})
if err != nil {
log.Fatal().Err(err).Send()
}

db.AutoMigrate(&Queue{})
err = db.AutoMigrate(&Queue{})
if err != nil {
log.Fatal().Err(err).Send()
}

err = db.AutoMigrate(&Message{})
if err != nil {
log.Fatal().Err(err).Send()
}

err = db.AutoMigrate(&KV{})
if err != nil {
log.Fatal().Err(err).Send()
}

// if newDb {
// tx, err := db.Begin()
Expand Down Expand Up @@ -116,11 +149,11 @@ func NewSQLiteQueue(cfg config.SQLiteConfig) *SQLiteQueue {

rc := &SQLiteQueue{
Filename: cfg.Path,
DB: olddb,
DBG: db,
Mu: &sync.Mutex{},
snow: snow,
ticker: time.NewTicker(1 * time.Second),
// DB: olddb,
DBG: db,
Mu: &sync.Mutex{},
snow: snow,
ticker: time.NewTicker(1 * time.Second),
}

go func() {
Expand Down Expand Up @@ -159,42 +192,65 @@ func (q *SQLiteQueue) CreateQueue(tenantId int64, queue string, visibilityTimeou
return res.Error
}

func (q *SQLiteQueue) DeleteQueue(tenantId int64, queue string) error {
func (q *SQLiteQueue) DeleteQueue(tenantId int64, queueName string) error {
// Delete all messages with the queue, and then the queue itself

q.Mu.Lock()
defer q.Mu.Unlock()

tx, err := q.DB.Begin()
queue, err := q.getQueue(tenantId, queueName)
if err != nil {
return err
}

defer tx.Rollback()
q.Mu.Lock()
defer q.Mu.Unlock()

var queueId int64
row := tx.QueryRow("select id from queues where name = ? and tenant_id = ?", strings.ToLower(queue), tenantId)
err = row.Scan(&queueId)
if err != nil {
return err
}
rc := q.DBG.Transaction(func(tx *gorm.DB) error {
if err := tx.Where("tenant_id = ? AND queue_id = ?", tenantId, queue.ID).Delete(&KV{}).Error; err != nil {
return err
}

_, err = tx.Exec("DELETE FROM messages WHERE tenant_id = ? AND queue_id = ?", tenantId, queueId)
if err != nil {
return err
}
if err := tx.Where("tenant_id = ? AND queue_id = ?", tenantId, queue.ID).Delete(&Message{}).Error; err != nil {
return err
}

_, err = tx.Exec("DELETE FROM kv WHERE tenant_id = ? AND queue_id = ?", tenantId, queueId)
if err != nil {
return err
}
if err := tx.Where("tenant_id = ? AND queue_id = ?", tenantId, queue.ID).Delete(&Queue{}).Error; err != nil {
return err
}

_, err = tx.Exec("DELETE FROM queues WHERE tenant_id = ? AND id = ?", tenantId, queueId)
if err != nil {
return err
}
return nil
})

return rc

// tx, err := q.DB.Begin()
// if err != nil {
// return err
// }

// defer tx.Rollback()

// var queueId int64
// row := tx.QueryRow("select id from queues where name = ? and tenant_id = ?", strings.ToLower(queue), tenantId)
// err = row.Scan(&queueId)
// if err != nil {
// return err
// }

// _, err = tx.Exec("DELETE FROM messages WHERE tenant_id = ? AND queue_id = ?", tenantId, queueId)
// if err != nil {
// return err
// }

// _, err = tx.Exec("DELETE FROM kv WHERE tenant_id = ? AND queue_id = ?", tenantId, queueId)
// if err != nil {
// return err
// }

return tx.Commit()
// _, err = tx.Exec("DELETE FROM queues WHERE tenant_id = ? AND id = ?", tenantId, queueId)
// if err != nil {
// return err
// }

// return tx.Commit()
}

func (q *SQLiteQueue) ListQueues(tenantId int64) ([]string, error) {
Expand All @@ -212,28 +268,10 @@ func (q *SQLiteQueue) ListQueues(tenantId int64) ([]string, error) {
return rc, nil
}

func (q *SQLiteQueue) getQueue(tenantId int64, queue string) (*Queue, error) {
func (q *SQLiteQueue) getQueue(tenantId int64, queueName string) (*Queue, error) {
rc := &Queue{}

row := q.DB.QueryRowx(
"select * from queues where name = ? and tenant_id = ?",
strings.TrimSpace(strings.ToLower(queue)), tenantId,
)

if row.Err() != nil {
return nil, row.Err()
}

err := row.StructScan(rc)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errors.New("Queue does not exist")
} else {
return nil, err
}
}

return rc, nil
res := q.DBG.Where("tenant_id = ? AND name = ?", tenantId, strings.ToLower(queueName)).Select("name").First(rc)
return rc, res.Error
}

func (q *SQLiteQueue) Enqueue(tenantId int64, queueName string, message string, kv map[string]string, delay int) (int64, error) {
Expand All @@ -248,52 +286,90 @@ func (q *SQLiteQueue) Enqueue(tenantId int64, queueName string, message string,
now := time.Now().UTC().Unix()
deliverAt := now + int64(delay)

// messageId, tenantId, queue.ID, deliverAt, 0, 0, queue.MaxRetries, queue.VisibilityTimeout, message)

q.Mu.Lock()
defer q.Mu.Unlock()
tx, err := q.DB.Beginx()
if err != nil {
return 0, err
}

defer tx.Rollback()

// TODO: visibility timeout
_, err = tx.Exec(
`
INSERT INTO messages
(
id,
tenant_id,
queue_id,
deliver_at,
delivered_at,
tries,
max_tries,
requeue_in,
message
)
VALUES
(?,?,?,?,?,?,?,?,?)
`,
messageId, tenantId, queue.ID, deliverAt, 0, 0, queue.MaxRetries, queue.VisibilityTimeout, message)
if err != nil {
return 0, err
}
rc := q.DBG.Transaction(func(tx *gorm.DB) error {
newMessage := &Message{
ID: messageId,
TenantID: tenantId,
QueueID: queue.ID,
DeliverAt: deliverAt,
DeliveredAt: 0,
MaxTries: queue.MaxRetries,
RequeueIn: queue.VisibilityTimeout,
Message: message,
}
if err := tx.Create(newMessage).Error; err != nil {
return err
}

for k, v := range kv {
_, err = tx.Exec("INSERT INTO kv (tenant_id, queue_id,k, v,message_id) VALUES (?,?,?,?,?)",
tenantId, queue.ID, k, v, messageId,
)
if err != nil {
return 0, err
for k, v := range kv {
newKv := &KV{
TenantID: tenantId,
MessageID: messageId,
QueueID: queue.ID,
K: k,
V: v,
}
if err := tx.Create(newKv).Error; err != nil {
return err
}
}
}

err = tx.Commit()
if err != nil {
return 0, err
return nil
})

if rc != nil {
return 0, rc
}

// tx, err := q.DB.Beginx()
// if err != nil {
// return 0, err
// }

// defer tx.Rollback()

// // TODO: visibility timeout
// _, err = tx.Exec(
// `
// INSERT INTO messages
// (
// id,
// tenant_id,
// queue_id,
// deliver_at,
// delivered_at,
// tries,
// max_tries,
// requeue_in,
// message
// )
// VALUES
// (?,?,?,?,?,?,?,?,?)
// `,
// messageId, tenantId, queue.ID, deliverAt, 0, 0, queue.MaxRetries, queue.VisibilityTimeout, message)
// if err != nil {
// return 0, err
// }

// for k, v := range kv {
// _, err = tx.Exec("INSERT INTO kv (tenant_id, queue_id,k, v,message_id) VALUES (?,?,?,?,?)",
// tenantId, queue.ID, k, v, messageId,
// )
// if err != nil {
// return 0, err
// }
// }

// err = tx.Commit()
// if err != nil {
// return 0, err
// }

log.Debug().Int64("message_id", messageId).Msg("Enqueued message")

return messageId, nil
Expand Down

0 comments on commit c79f94d

Please sign in to comment.