Skip to content

Commit

Permalink
Fixes goodeggs#2 - Key stream completes without resume
Browse files Browse the repository at this point in the history
  • Loading branch information
hurrymaplelad committed Jun 12, 2013
1 parent 1b9379e commit 6ca0740
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 28 deletions.
7 changes: 4 additions & 3 deletions lib/index.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions lib/key_stream.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 18 additions & 7 deletions spec/knox-copy.spec.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,24 @@ describe 'knox-copy', ->
done()

describe 'streamKeys()', ->
it 'should emit a data event for every key and an end event when keys are exhausted', (done) ->
streamedKeys = []
stream = client.streamKeys(prefix: '/tmp/spec/list')
stream.on 'data', (key) -> streamedKeys.push key
stream.on 'end', ->
expect(streamedKeys).toEqual keys
done()
describe 'when all keys fit on single a page', ->
it 'should emit a data event for every key and an end event when keys are exhausted', (done) ->
streamedKeys = []
stream = client.streamKeys(prefix: '/tmp/spec/list')
stream.on 'data', (key) -> streamedKeys.push key
stream.on 'end', ->
expect(streamedKeys).toEqual keys
done()

describe 'when the number of keys exceeds page size', ->
maxKeysPerRequest = 2
it 'should emit a data event for every key and an end event when keys are exhausted', (done) ->
streamedKeys = []
stream = client.streamKeys({prefix: '/tmp/spec/list', maxKeysPerRequest})
stream.on 'data', (key) -> streamedKeys.push key
stream.on 'end', ->
expect(streamedKeys).toEqual keys
done()

describe 'copyBucket()', ->

Expand Down
4 changes: 2 additions & 2 deletions src/index.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ knox::listPageOfKeys = ({maxKeys, marker, prefix, headers}, cb) ->
else
cb null, page

knox::streamKeys = ({prefix}) ->
return new KeyStream {prefix, client: @}
knox::streamKeys = ({prefix, maxKeysPerRequest}={}) ->
return new KeyStream {prefix, client: @, maxKeysPerRequest}

# Like async.queue but pauses stream instead of queing
# worker gets stream data and a callback to call when the work is done
Expand Down
17 changes: 8 additions & 9 deletions src/key_stream.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ module.exports = class KeyStream extends Stream
# true once a data event has been emitted for every key.
allKeysStreamed: false

# number of keys to list per request. The stream queue caps at ~1.5x this size.
maxKeysPerRequest: 500

# pagination index
marker: null

# queue of keys returned from Amazon but not yet iterated
keyQueue: null

constructor: ({@client, @prefix}) ->
constructor: ({@client, @prefix, @maxKeysPerRequest}) ->
# queue of keys returned from Amazon but not yet iterated
@keyQueue = []

# number of keys to list per request. The stream queue caps at ~1.5x this size.
@maxKeysPerRequest ?= 500

@_replenishKeys @_continueStreaming

pause: ->
Expand All @@ -49,7 +48,7 @@ module.exports = class KeyStream extends Stream
@allKeysStreamed

_keysRunningLow: ->
@keyQueue.length < (@maxKeysPerRequest / 2)
@keyQueue.length <= (@maxKeysPerRequest / 2)

_continueStreaming: =>
while @keyQueue.length > 0
Expand All @@ -61,7 +60,7 @@ module.exports = class KeyStream extends Stream
# Don't want to double end!
@_replenishKeys =>
nextTick =>
@_continueStreaming
@_continueStreaming()

@emit 'data', @keyQueue.shift()

Expand Down

0 comments on commit 6ca0740

Please sign in to comment.