Skip to content

Commit

Permalink
revise and optimise guerrilla-db-redis backend (flashmob#50)
Browse files Browse the repository at this point in the history
* revise and optimise guerrilla-db-redis backend

* travis ci: gofmt - ignore .glide
  • Loading branch information
flashmob authored Jan 22, 2017
1 parent f9e05a9 commit b9befb4
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .travis.gofmt.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

if [[ -n $(find . -path '*/vendor/*' -prune -o -name '*.go' -type f -exec gofmt -l {} \;) ]]; then
if [[ -n $(find . -path '*/vendor/*' -prune -o -path '*.glide/*' -prune -o -name '*.go' -type f -exec gofmt -l {} \;) ]]; then
echo "Go code is not formatted:"
gofmt -d .
exit 1
Expand Down
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ If you want to build on the sample `guerrilla-db-redis` module, setup the follow
in MySQL:

CREATE TABLE IF NOT EXISTS `new_mail` (
`mail_id` int(11) NOT NULL auto_increment,
`mail_id` BIGINT(20) unsigned NOT NULL AUTO_INCREMENT,
`date` datetime NOT NULL,
`from` varchar(128) character set latin1 NOT NULL,
`to` varchar(128) character set latin1 NOT NULL,
Expand All @@ -137,9 +137,8 @@ in MySQL:
`recipient` varchar(128) character set latin1 NOT NULL,
`has_attach` int(11) NOT NULL,
`ip_addr` varchar(15) NOT NULL,
`delivered` bit(1) NOT NULL default b'0',
`attach_info` text NOT NULL,
`dkim_valid` tinyint(4) default NULL,
`return_path` VARCHAR(255) NOT NULL,
`is_tls` BIT(1) DEFAULT b'0' NOT NULL,
PRIMARY KEY (`mail_id`),
KEY `to` (`to`),
KEY `hash` (`hash`),
Expand All @@ -148,7 +147,9 @@ in MySQL:

The above table does not store the body of the email which makes it quick
to query and join, while the body of the email is fetched from Redis
if needed.
for future processing. The `mail` field can contain data in case Redis is down.
Otherwise, if data is in Redis, the `mail` will be blank, and
the `body` field will contain the word 'redis'.

You can implement your own saveMail function to use whatever storage /
backend fits for you. Please share them ^_^, in particular, we would
Expand Down
179 changes: 143 additions & 36 deletions backends/guerrilla_db_redis.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
package backends

// This backend is presented here as an example only, please modify it to your needs.
// The backend stores the email data in Redis.
// Other meta-information is stored in MySQL to be joined later.
// A lot of email gets discarded without viewing on Guerrilla Mail,
// so it's much faster to put in Redis, where other programs can
// process it later, without touching the disk.
// Short history:
// Started with issuing an insert query for each single email and another query to update the tally
// Then applied the following optimizations:
// - Moved tally updates to another background process which does the tallying in a single query
// - Changed the MySQL queries to insert in batch
// - Made a Compressor that recycles buffers using sync.Pool
// The result was around 400% speed improvement. If you know of any more improvements, please share!
import (
"errors"
"fmt"

"time"
Expand All @@ -18,16 +30,28 @@ import (
"sync"
)

// how many rows to batch at a time
const GuerrillaDBAndRedisBatchMax = 500

// tick on every...
const GuerrillaDBAndRedisBatchTimeout = time.Second * 3

func init() {
backends["guerrilla-db-redis"] = &AbstractBackend{
extend: &GuerrillaDBAndRedisBackend{}}
}

type GuerrillaDBAndRedisBackend struct {
AbstractBackend
config guerrillaDBAndRedisConfig
config guerrillaDBAndRedisConfig
batcherWg sync.WaitGroup
// cache prepared queries
cache stmtCache
}

// statement cache. It's an array, not slice
type stmtCache [GuerrillaDBAndRedisBatchMax]*autorc.Stmt

type guerrillaDBAndRedisConfig struct {
NumberOfWorkers int `json:"save_workers_size"`
MysqlTable string `json:"mail_table"`
Expand Down Expand Up @@ -131,12 +155,111 @@ func (c *compressedData) clear() {
c.data = nil
}

// prepares the sql query with the number of rows that can be batched with it
func (g *GuerrillaDBAndRedisBackend) prepareInsertQuery(rows int, db *autorc.Conn) *autorc.Stmt {
if rows == 0 {
panic("rows argument cannot be 0")
}
if g.cache[rows-1] != nil {
return g.cache[rows-1]
}
sql := "INSERT INTO " + g.config.MysqlTable + " "
sql += "(`date`, `to`, `from`, `subject`, `body`, `charset`, `mail`, `spam_score`, `hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, `return_path`, `is_tls`)"
sql += " values "
values := "(NOW(), ?, ?, ?, ? , 'UTF-8' , ?, 0, ?, '', ?, 0, ?, ?, ?)"
// add more rows
comma := ""
for i := 0; i < rows; i++ {
sql += comma + values
if comma == "" {
comma = ","
}
}
//log.Debug("Prepared SQL", rows, sql)
stmt, sqlErr := db.Prepare(sql)
if sqlErr != nil {
log.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
}
// cache it
g.cache[rows-1] = stmt
return stmt
}

// Batches the rows from the feeder chan in to a single INSERT statement.
// Execute the batches query when:
// - number of batched rows reaches a threshold, i.e. count n = threshold
// - or, no new rows within a certain time, i.e. times out
func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{}, db *autorc.Conn) {
// controls shutdown
defer g.batcherWg.Done()
g.batcherWg.Add(1)
// vals is where values are batched to
var vals []interface{}
// how many rows were batched
count := 0
// The timer will tick every second.
// Interrupting the select clause when there's no data on the feeder channel
t := time.NewTimer(GuerrillaDBAndRedisBatchTimeout)
// prepare the query used to insert when rows reaches batchMax
insertStmt := g.prepareInsertQuery(GuerrillaDBAndRedisBatchMax, db)
// inserts executes a batched insert query, clears the vals and resets the count
insert := func(c int) {
if c > 0 {
insertStmt = g.prepareInsertQuery(c, db)
insertStmt.Bind(vals...)
_, _, err := insertStmt.Exec()
if err != nil {
log.WithError(err).Error("There was a problem the insert")
} else {
//log.Debugf("Inserted %d rows ", count)
}
}
vals = nil
count = 0
}
// Keep getting values from feeder and add to batch.
// if feeder times out, execute the batched query
// otherwise, execute the batched query once it reaches the GuerrillaDBAndRedisBatchMax threshold
for {
select {
case row := <-feeder:
log.Info("row form chan is", row, "cols:", len(row))
if row == nil {
log.Debug("Query batchaer exiting")
// Insert any remaining rows
insert(count)
return
}
vals = append(vals, row...)
count++
//log.Debug("apend vals", count, vals)
if count == GuerrillaDBAndRedisBatchMax {
insert(GuerrillaDBAndRedisBatchMax)
}
// stop timer from firing (reset the interrupt)
if !t.Stop() {
<-t.C
}
t.Reset(GuerrillaDBAndRedisBatchTimeout)
case <-t.C:
//log.Debugf("Query batcher timer fired! [%d]", len(vals))
//log.Debug("Contents:", count, vals)
// anything to insert?
if n := len(vals); n > 0 {
insert(count)
}
t.Reset(GuerrillaDBAndRedisBatchTimeout)
}
}
}

func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePayload) {
var to, body string
var err error
//var length int
//var err error

var redisErr error
var length int

redisClient := &redisClient{}
db := autorc.New(
"tcp",
Expand All @@ -146,18 +269,10 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
g.config.MysqlPass,
g.config.MysqlDB)
db.Register("set names utf8")
sql := "INSERT INTO " + g.config.MysqlTable + " "
sql += "(`date`, `to`, `from`, `subject`, `body`, `charset`, `mail`, `spam_score`, `hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, `return_path`, `is_tls`)"
sql += " values (NOW(), ?, ?, ?, ? , 'UTF-8' , ?, 0, ?, '', ?, 0, ?, ?, ?)"
ins, sqlErr := db.Prepare(sql)
if sqlErr != nil {
log.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
}
sql = "UPDATE gm2_setting SET `setting_value` = `setting_value`+1 WHERE `setting_name`='received_emails' LIMIT 1"
incr, sqlErr := db.Prepare(sql)
if sqlErr != nil {
log.WithError(sqlErr).Fatalf("failed while db.Prepare(UPDATE...)")
}
// start the query SQL batching where we will send data via the feeder channel
feeder := make(chan []interface{}, 1)
go g.insertQueryBatcher(feeder, db)

defer func() {
if r := recover(); r != nil {
//recover form closed channel
Expand All @@ -170,18 +285,22 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
log.Infof("closed redis")
redisClient.conn.Close()
}
}()
// close the feeder & wait for query batcher to exit.
close(feeder)
g.batcherWg.Wait()

}()
var vals []interface{}
data := newCompressedData()
// receives values from the channel repeatedly until it is closed.

for {
payload := <-saveMailChan
if payload == nil {
log.Debug("No more saveMailChan payload")
return
}
to = payload.recipient.User + "@" + g.config.PrimaryHost
length = payload.mail.Data.Len()

ts := fmt.Sprintf("%d", time.Now().UnixNano())
payload.mail.ParseHeaders()
Expand Down Expand Up @@ -218,8 +337,8 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
log.WithError(redisErr).Warn("Error while SETEX on redis")
}

// bind data to cursor
ins.Bind(
vals = []interface{}{} // clear the vals
vals = append(vals,
to,
payload.mail.MailFrom.String(),
payload.mail.Subject,
Expand All @@ -229,22 +348,10 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
to,
payload.mail.RemoteAddress,
payload.mail.MailFrom.String(),
payload.mail.TLS,
)
// save, discard result
_, _, err = ins.Exec()
if err != nil {
errMsg := "Database error while inserting"
log.WithError(err).Warn(errMsg)
payload.savedNotify <- &saveStatus{errors.New(errMsg), hash}
} else {
log.Debugf("Email saved %s (len=%d)", hash, length)
_, _, err = incr.Exec()
if err != nil {
log.WithError(err).Warn("Database error while incr count")
}
payload.savedNotify <- &saveStatus{nil, hash}
}
payload.mail.TLS)
feeder <- vals
payload.savedNotify <- &saveStatus{nil, hash}

}
}

Expand Down

0 comments on commit b9befb4

Please sign in to comment.