Skip to content
This repository has been archived by the owner on Dec 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #12 from evanhuang8/fetch
Browse files Browse the repository at this point in the history
Expand Job details and add fetch function
  • Loading branch information
VojtechVitek authored Mar 22, 2019
2 parents 6edc6e9 + 409dfd6 commit ae073e1
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 1 deletion.
85 changes: 85 additions & 0 deletions disque.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,91 @@ func (pool *Pool) Get(queues ...string) (*Job, error) {
return &job, nil
}

// Fetch finds the job by its id and return its details
func (pool *Pool) Fetch(ID string) (*Job, error) {
sess := pool.redis.Get()
defer sess.Close()

reply, err := pool.do([]interface{}{
"SHOW",
ID,
})
if err != nil {
return nil, err
}

arr, ok := reply.([]interface{})
if !ok || len(arr) != 30 {
return nil, errors.New("unexpected reply #1")
}

job := Job{}

var bytes []byte

if bytes, ok = arr[1].([]byte); ok {
job.ID = string(bytes)
} else {
return nil, errors.New("unexpected reply: id")
}

if bytes, ok = arr[3].([]byte); ok {
job.Queue = string(bytes)
} else {
return nil, errors.New("unexpected reply: queue")
}

if bytes, ok = arr[5].([]byte); ok {
job.State = string(bytes)
} else {
return nil, errors.New("unexpected reply: state")
}

if job.Replication, ok = arr[7].(int64); !ok {
return nil, errors.New("unexpected reply: repl")
}

if ttl, ok := arr[9].(int64); ok {
job.TTL = time.Duration(ttl) * time.Second
} else {
return nil, errors.New("unexpected reply: ttl")
}

if createdAt, ok := arr[11].(int64); ok {
job.CreatedAt = time.Unix(0, createdAt)
} else {
return nil, errors.New("unexpected reply: ctime")
}

if delay, ok := arr[13].(int64); ok {
job.Delay = time.Duration(delay) * time.Second
} else {
return nil, errors.New("unexpected reply: delay")
}

if retry, ok := arr[15].(int64); ok {
job.Retry = time.Duration(retry) * time.Second
} else {
return nil, errors.New("unexpected reply: retry")
}

if job.Nacks, ok = arr[17].(int64); !ok {
return nil, errors.New("unexpected reply: nacks")
}

if job.AdditionalDeliveries, ok = arr[19].(int64); !ok {
return nil, errors.New("unexpected reply: additional-deliveries")
}

if bytes, ok := arr[29].([]byte); ok {
job.Data = string(bytes)
} else {
return nil, errors.New("unexpected reply: data")
}

return &job, nil
}

// Ack acknowledges (dequeues/removes) a job from its queue.
func (pool *Pool) Ack(job *Job) error {
sess := pool.redis.Get()
Expand Down
33 changes: 32 additions & 1 deletion disque_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,37 @@ func TestPing(t *testing.T) {
}
}

func TestFetch(t *testing.T) {
// Connect to Disque.
jobs, err := disque.New("127.0.0.1:7711")
if err != nil {
t.Fatal(err)
}
defer jobs.Close()

// Add.
job, err := jobs.Add("data0x5f3759df", "test:fetch")
if err != nil {
t.Error(err)
}

// Fetch.
_job, err := jobs.Fetch(job.ID)
if err != nil {
t.Error(err)
}

// Job should have been fetched and content of two jobs should be equal
if _job == nil {
t.Fatal("expected job to be fetched")
}
if job.Data != _job.Data {
t.Error("expected data of the jobs to be equal")
}

jobs.Ack(_job)
}

func TestDelay(t *testing.T) {
// Connect to Disque.
jobs, err := disque.New("127.0.0.1:7711")
Expand Down Expand Up @@ -281,7 +312,7 @@ func TestQueueLength(t *testing.T) {
t.Error(err)
}
if length != 0 {
t.Error("unexpected length %v", length)
t.Fatalf("unexpected length %v", length)
}

// Enqueue hundred jobs.
Expand Down
8 changes: 8 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package disque

import "time"

// Job represents job/message returned from a Disque server.
type Job struct {
ID string
Data string
Queue string
State string
TTL time.Duration
Delay time.Duration
Retry time.Duration
CreatedAt time.Time
Replication int64
Nacks int64
AdditionalDeliveries int64
}

0 comments on commit ae073e1

Please sign in to comment.