Skip to content

Commit

Permalink
Update pagination API to take a function callback instead.
Browse files Browse the repository at this point in the history
New API:

	func main() {
		db := dynamodb.New(nil)
		params := &dynamodb.ListTablesInput{Limit: aws.Long(2)}
		i := 0
		db.ListTablesPages(params, func(p *dynamodb.ListTablesOutput, err error) bool {
			if err != nil {
				fmt.Println("An error occurred", err)
			} else if p != nil {
				i++
				fmt.Println(i, awsutil.StringValue(p))
			} else {
				fmt.Println("Finished paging!")
			}
			return true // return false to stop paging
		})

	}

Also adds pagination tests.

References aws#58
  • Loading branch information
lsegal committed May 26, 2015
1 parent 8afc35d commit 6e04917
Show file tree
Hide file tree
Showing 30 changed files with 1,078 additions and 2,972 deletions.
26 changes: 8 additions & 18 deletions aws/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,11 @@ func (r *Request) nextPageToken() interface{} {
if r.Operation.Paginator == nil {
return nil
}
return awsutil.ValueAtPath(r.Data, r.Operation.OutputToken)
v := awsutil.ValuesAtPath(r.Data, r.Operation.OutputToken)
if v != nil && len(v) > 0 {
return v[0]
}
return nil
}

func (r *Request) NextPage() *Request {
Expand All @@ -246,11 +250,11 @@ func (r *Request) NextPage() *Request {
}

if r.Operation.TruncationToken != "" {
tr := awsutil.ValueAtPath(r.Data, r.Operation.TruncationToken)
tr := awsutil.ValuesAtPath(r.Data, r.Operation.TruncationToken)
if tr == nil {
return nil
} else {
switch v := tr.(type) {
} else if len(tr) > 0 {
switch v := tr[0].(type) {
case bool:
if v == false {
return nil
Expand All @@ -265,17 +269,3 @@ func (r *Request) NextPage() *Request {
awsutil.SetValueAtPath(nr.Params, nr.Operation.InputToken, token)
return nr
}

func (r *Request) Pages() <-chan interface{} {
page := r
ch := make(chan interface{})
go func() {
for page != nil {
page.Send()
ch <- page.Data
page = page.NextPage()
}
close(ch)
}()
return ch
}
99 changes: 99 additions & 0 deletions aws/request_pagination_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package aws_test

import (
"fmt"
"testing"

"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/dynamodb"
"github.com/stretchr/testify/assert"
)

// Use DynamoDB methods for simplicity
func TestPagination(t *testing.T) {
db := dynamodb.New(nil)
tokens, pages, numPages, gotToEnd := []string{}, []string{}, 0, false

reqNum := 0
resps := []*dynamodb.ListTablesOutput{
&dynamodb.ListTablesOutput{TableNames: []*string{aws.String("Table1"), aws.String("Table2")}, LastEvaluatedTableName: aws.String("Table2")},
&dynamodb.ListTablesOutput{TableNames: []*string{aws.String("Table3"), aws.String("Table4")}, LastEvaluatedTableName: aws.String("Table4")},
&dynamodb.ListTablesOutput{TableNames: []*string{aws.String("Table5")}},
}

db.Handlers.Send.Init() // mock sending
db.Handlers.Unmarshal.Init()
db.Handlers.UnmarshalMeta.Init()
db.Handlers.ValidateResponse.Init()
db.Handlers.Build.PushBack(func(r *aws.Request) {
in := r.Params.(*dynamodb.ListTablesInput)
if in == nil {
tokens = append(tokens, "")
} else if in.ExclusiveStartTableName != nil {
tokens = append(tokens, *in.ExclusiveStartTableName)
}
})
db.Handlers.Unmarshal.PushBack(func(r *aws.Request) {
r.Data = resps[reqNum]
reqNum++
})

params := &dynamodb.ListTablesInput{Limit: aws.Long(2)}
db.ListTablesPages(params, func(p *dynamodb.ListTablesOutput, err error) bool {
if p != nil {
numPages++
for _, t := range p.TableNames {
pages = append(pages, *t)
}
} else if p == nil && err == nil {
gotToEnd = true
}

return true
})

assert.Equal(t, []string{"Table2", "Table4"}, tokens)
assert.Equal(t, []string{"Table1", "Table2", "Table3", "Table4", "Table5"}, pages)
assert.Equal(t, 3, numPages)
assert.True(t, gotToEnd)
}

// Use DynamoDB methods for simplicity
func TestPaginationEarlyExit(t *testing.T) {
db := dynamodb.New(nil)
numPages, gotToEnd := 0, false

reqNum := 0
resps := []*dynamodb.ListTablesOutput{
&dynamodb.ListTablesOutput{TableNames: []*string{aws.String("Table1"), aws.String("Table2")}, LastEvaluatedTableName: aws.String("Table2")},
&dynamodb.ListTablesOutput{TableNames: []*string{aws.String("Table3"), aws.String("Table4")}, LastEvaluatedTableName: aws.String("Table4")},
&dynamodb.ListTablesOutput{TableNames: []*string{aws.String("Table5")}},
}

db.Handlers.Send.Init() // mock sending
db.Handlers.Unmarshal.Init()
db.Handlers.UnmarshalMeta.Init()
db.Handlers.ValidateResponse.Init()
db.Handlers.Unmarshal.PushBack(func(r *aws.Request) {
r.Data = resps[reqNum]
reqNum++
})

params := &dynamodb.ListTablesInput{Limit: aws.Long(2)}
db.ListTablesPages(params, func(p *dynamodb.ListTablesOutput, err error) bool {
fmt.Println("page", numPages, p)
if p == nil && err == nil {
gotToEnd = true
} else {
numPages++
}
if numPages == 2 {
fmt.Println("BREAKING")
return false
}
return true
})

assert.Equal(t, 2, numPages)
assert.False(t, gotToEnd)
}
19 changes: 8 additions & 11 deletions internal/model/api/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,16 @@ func (c *{{ .API.StructName }}) {{ .ExportedName }}Request(` +
{{ if .Paginator }}
func (c *{{ .API.StructName }}) {{ .ExportedName }}Pages(` +
`input {{ .InputRef.GoType }}) <- chan {{ .OutputRef.GoType }} {
`input {{ .InputRef.GoType }}, fn func({{ .OutputRef.GoType }}, error) bool) {
page, _ := c.{{ .ExportedName }}Request(input)
ch := make(chan {{ .OutputRef.GoType }})
go func() {
for page != nil {
page.Send()
out := page.Data.({{ .OutputRef.GoType }})
ch <- out
page = page.NextPage()
for ; page != nil; page = page.NextPage() {
page.Send()
out := page.Data.({{ .OutputRef.GoType }})
if result := fn(out, page.Error); page.Error != nil || !result {
return
}
close(ch)
}()
return ch
}
fn(nil, nil)
}
{{ end }}
Expand Down
152 changes: 64 additions & 88 deletions service/autoscaling/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,19 +549,16 @@ func (c *AutoScaling) DescribeAutoScalingGroups(input *DescribeAutoScalingGroups
return out, err
}

func (c *AutoScaling) DescribeAutoScalingGroupsPages(input *DescribeAutoScalingGroupsInput) <-chan *DescribeAutoScalingGroupsOutput {
func (c *AutoScaling) DescribeAutoScalingGroupsPages(input *DescribeAutoScalingGroupsInput, fn func(*DescribeAutoScalingGroupsOutput, error) bool) {
page, _ := c.DescribeAutoScalingGroupsRequest(input)
ch := make(chan *DescribeAutoScalingGroupsOutput)
go func() {
for page != nil {
page.Send()
out := page.Data.(*DescribeAutoScalingGroupsOutput)
ch <- out
page = page.NextPage()
for ; page != nil; page = page.NextPage() {
page.Send()
out := page.Data.(*DescribeAutoScalingGroupsOutput)
if result := fn(out, page.Error); page.Error != nil || !result {
return
}
close(ch)
}()
return ch
}
fn(nil, nil)
}

var opDescribeAutoScalingGroups *aws.Operation
Expand Down Expand Up @@ -608,19 +605,16 @@ func (c *AutoScaling) DescribeAutoScalingInstances(input *DescribeAutoScalingIns
return out, err
}

func (c *AutoScaling) DescribeAutoScalingInstancesPages(input *DescribeAutoScalingInstancesInput) <-chan *DescribeAutoScalingInstancesOutput {
func (c *AutoScaling) DescribeAutoScalingInstancesPages(input *DescribeAutoScalingInstancesInput, fn func(*DescribeAutoScalingInstancesOutput, error) bool) {
page, _ := c.DescribeAutoScalingInstancesRequest(input)
ch := make(chan *DescribeAutoScalingInstancesOutput)
go func() {
for page != nil {
page.Send()
out := page.Data.(*DescribeAutoScalingInstancesOutput)
ch <- out
page = page.NextPage()
for ; page != nil; page = page.NextPage() {
page.Send()
out := page.Data.(*DescribeAutoScalingInstancesOutput)
if result := fn(out, page.Error); page.Error != nil || !result {
return
}
close(ch)
}()
return ch
}
fn(nil, nil)
}

var opDescribeAutoScalingInstances *aws.Operation
Expand Down Expand Up @@ -698,19 +692,16 @@ func (c *AutoScaling) DescribeLaunchConfigurations(input *DescribeLaunchConfigur
return out, err
}

func (c *AutoScaling) DescribeLaunchConfigurationsPages(input *DescribeLaunchConfigurationsInput) <-chan *DescribeLaunchConfigurationsOutput {
func (c *AutoScaling) DescribeLaunchConfigurationsPages(input *DescribeLaunchConfigurationsInput, fn func(*DescribeLaunchConfigurationsOutput, error) bool) {
page, _ := c.DescribeLaunchConfigurationsRequest(input)
ch := make(chan *DescribeLaunchConfigurationsOutput)
go func() {
for page != nil {
page.Send()
out := page.Data.(*DescribeLaunchConfigurationsOutput)
ch <- out
page = page.NextPage()
for ; page != nil; page = page.NextPage() {
page.Send()
out := page.Data.(*DescribeLaunchConfigurationsOutput)
if result := fn(out, page.Error); page.Error != nil || !result {
return
}
close(ch)
}()
return ch
}
fn(nil, nil)
}

var opDescribeLaunchConfigurations *aws.Operation
Expand Down Expand Up @@ -852,19 +843,16 @@ func (c *AutoScaling) DescribeNotificationConfigurations(input *DescribeNotifica
return out, err
}

func (c *AutoScaling) DescribeNotificationConfigurationsPages(input *DescribeNotificationConfigurationsInput) <-chan *DescribeNotificationConfigurationsOutput {
func (c *AutoScaling) DescribeNotificationConfigurationsPages(input *DescribeNotificationConfigurationsInput, fn func(*DescribeNotificationConfigurationsOutput, error) bool) {
page, _ := c.DescribeNotificationConfigurationsRequest(input)
ch := make(chan *DescribeNotificationConfigurationsOutput)
go func() {
for page != nil {
page.Send()
out := page.Data.(*DescribeNotificationConfigurationsOutput)
ch <- out
page = page.NextPage()
for ; page != nil; page = page.NextPage() {
page.Send()
out := page.Data.(*DescribeNotificationConfigurationsOutput)
if result := fn(out, page.Error); page.Error != nil || !result {
return
}
close(ch)
}()
return ch
}
fn(nil, nil)
}

var opDescribeNotificationConfigurations *aws.Operation
Expand Down Expand Up @@ -909,19 +897,16 @@ func (c *AutoScaling) DescribePolicies(input *DescribePoliciesInput) (*DescribeP
return out, err
}

func (c *AutoScaling) DescribePoliciesPages(input *DescribePoliciesInput) <-chan *DescribePoliciesOutput {
func (c *AutoScaling) DescribePoliciesPages(input *DescribePoliciesInput, fn func(*DescribePoliciesOutput, error) bool) {
page, _ := c.DescribePoliciesRequest(input)
ch := make(chan *DescribePoliciesOutput)
go func() {
for page != nil {
page.Send()
out := page.Data.(*DescribePoliciesOutput)
ch <- out
page = page.NextPage()
for ; page != nil; page = page.NextPage() {
page.Send()
out := page.Data.(*DescribePoliciesOutput)
if result := fn(out, page.Error); page.Error != nil || !result {
return
}
close(ch)
}()
return ch
}
fn(nil, nil)
}

var opDescribePolicies *aws.Operation
Expand Down Expand Up @@ -969,19 +954,16 @@ func (c *AutoScaling) DescribeScalingActivities(input *DescribeScalingActivities
return out, err
}

func (c *AutoScaling) DescribeScalingActivitiesPages(input *DescribeScalingActivitiesInput) <-chan *DescribeScalingActivitiesOutput {
func (c *AutoScaling) DescribeScalingActivitiesPages(input *DescribeScalingActivitiesInput, fn func(*DescribeScalingActivitiesOutput, error) bool) {
page, _ := c.DescribeScalingActivitiesRequest(input)
ch := make(chan *DescribeScalingActivitiesOutput)
go func() {
for page != nil {
page.Send()
out := page.Data.(*DescribeScalingActivitiesOutput)
ch <- out
page = page.NextPage()
for ; page != nil; page = page.NextPage() {
page.Send()
out := page.Data.(*DescribeScalingActivitiesOutput)
if result := fn(out, page.Error); page.Error != nil || !result {
return
}
close(ch)
}()
return ch
}
fn(nil, nil)
}

var opDescribeScalingActivities *aws.Operation
Expand Down Expand Up @@ -1056,19 +1038,16 @@ func (c *AutoScaling) DescribeScheduledActions(input *DescribeScheduledActionsIn
return out, err
}

func (c *AutoScaling) DescribeScheduledActionsPages(input *DescribeScheduledActionsInput) <-chan *DescribeScheduledActionsOutput {
func (c *AutoScaling) DescribeScheduledActionsPages(input *DescribeScheduledActionsInput, fn func(*DescribeScheduledActionsOutput, error) bool) {
page, _ := c.DescribeScheduledActionsRequest(input)
ch := make(chan *DescribeScheduledActionsOutput)
go func() {
for page != nil {
page.Send()
out := page.Data.(*DescribeScheduledActionsOutput)
ch <- out
page = page.NextPage()
for ; page != nil; page = page.NextPage() {
page.Send()
out := page.Data.(*DescribeScheduledActionsOutput)
if result := fn(out, page.Error); page.Error != nil || !result {
return
}
close(ch)
}()
return ch
}
fn(nil, nil)
}

var opDescribeScheduledActions *aws.Operation
Expand Down Expand Up @@ -1118,19 +1097,16 @@ func (c *AutoScaling) DescribeTags(input *DescribeTagsInput) (*DescribeTagsOutpu
return out, err
}

func (c *AutoScaling) DescribeTagsPages(input *DescribeTagsInput) <-chan *DescribeTagsOutput {
func (c *AutoScaling) DescribeTagsPages(input *DescribeTagsInput, fn func(*DescribeTagsOutput, error) bool) {
page, _ := c.DescribeTagsRequest(input)
ch := make(chan *DescribeTagsOutput)
go func() {
for page != nil {
page.Send()
out := page.Data.(*DescribeTagsOutput)
ch <- out
page = page.NextPage()
for ; page != nil; page = page.NextPage() {
page.Send()
out := page.Data.(*DescribeTagsOutput)
if result := fn(out, page.Error); page.Error != nil || !result {
return
}
close(ch)
}()
return ch
}
fn(nil, nil)
}

var opDescribeTags *aws.Operation
Expand Down
Loading

0 comments on commit 6e04917

Please sign in to comment.