Skip to content

Commit

Permalink
Merge pull request #65 from Photoroom/ben/dynamic_workers
Browse files Browse the repository at this point in the history
Small refactor around worker pool + mmap file loads for the filesystem frontend
  • Loading branch information
blefaudeux authored Jan 19, 2025
2 parents 3bc2636 + 947a8e9 commit b99e9bc
Show file tree
Hide file tree
Showing 18 changed files with 189 additions and 133 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.18.1"
go-version: "1.23"

- run: go version

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.18.1"
go-version: "1.23"

- name: Install linux deps
run: |
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ client_config = {
# some performance options, best settings will depend on your machine
"prefetch_buffer_size": 64,
"samples_buffer_size": 128,
"concurrency": concurrency,
}

client = datago.GetClientFromJSON(json.dumps(config)) # Will return None if something goes wrong, check the logs
Expand Down
6 changes: 2 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
func main() {

cropAndResize := flag.Bool("crop_and_resize", false, "Whether to crop and resize the images and masks")
concurrency := flag.Int("concurrency", 64, "The number of concurrent http requests to make")
itemFetchBuffer := flag.Int("item_fetch_buffer", 256, "The number of items to pre-load")
itemReadyBuffer := flag.Int("item_ready_buffer", 128, "The number of items ready to be served")
limit := flag.Int("limit", 2000, "The number of items to fetch")
Expand All @@ -36,9 +35,8 @@ func main() {
CropAndResize: *cropAndResize,
}
config.SourceConfig = sourceConfig
config.Concurrency = *concurrency
config.PrefetchBufferSize = *itemFetchBuffer
config.SamplesBufferSize = *itemReadyBuffer
config.PrefetchBufferSize = int32(*itemFetchBuffer)
config.SamplesBufferSize = int32(*itemReadyBuffer)
config.Limit = *limit

dataroom_client := datago.GetClient(config)
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
module datago

go 1.18

require golang.org/x/image v0.18.0 // indirect
go 1.23.0

require (
github.com/davidbyttow/govips/v2 v2.15.0
github.com/davidbyttow/govips/v2 v2.16.0
go.uber.org/automaxprocs v1.6.0
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8
)

require (
golang.org/x/net v0.33.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/image v0.18.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/text v0.16.0 // indirect
)
42 changes: 30 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,71 +1,89 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davidbyttow/govips/v2 v2.15.0 h1:h3lF+rQElBzGXbQSSPqmE3XGySPhcQo2x3t5l/dZ+pU=
github.com/davidbyttow/govips/v2 v2.15.0/go.mod h1:3OQCHj0nf5Mnrplh5VlNvmx3IhJXyxbAoTJZPflUjmM=
github.com/davidbyttow/govips/v2 v2.16.0 h1:1nH/Rbx8qZP1hd+oYL9fYQjAnm1+KorX9s07ZGseQmo=
github.com/davidbyttow/govips/v2 v2.16.0/go.mod h1:clH5/IDVmG5eVyc23qYpyi7kmOT0B/1QNTKtci4RkyM=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/image v0.10.0/go.mod h1:jtrku+n79PfroUbvDdeUWMAI+heR786BofxrbiSF+J0=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/image v0.18.0 h1:jGzIakQa/ZXI1I0Fxvaa9W7yP25TqT6cHIHn+6CqvSQ=
golang.org/x/image v0.18.0/go.mod h1:4yyo5vMFQjVjUcVk4jEQcU9MGy/rulF5WvUILseCM2E=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
40 changes: 18 additions & 22 deletions pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ type DatagoConfig struct {
SourceType DatagoSourceType `json:"source_type"`
SourceConfig interface{} `json:"source_config"`
ImageConfig ImageTransformConfig `json:"image_config"`
PrefetchBufferSize int `json:"prefetch_buffer_size"`
SamplesBufferSize int `json:"samples_buffer_size"`
Concurrency int `json:"concurrency"`
PrefetchBufferSize int32 `json:"prefetch_buffer_size"`
SamplesBufferSize int32 `json:"samples_buffer_size"`
Limit int `json:"limit"`
}

Expand All @@ -70,7 +69,6 @@ func (c *DatagoConfig) setDefaults() {
c.ImageConfig.setDefaults()
c.PrefetchBufferSize = 64
c.SamplesBufferSize = 32
c.Concurrency = 64
c.Limit = 0
}

Expand Down Expand Up @@ -131,9 +129,8 @@ func DatagoConfigFromJSON(jsonString string) DatagoConfig {
log.Panicf("Error unmarshalling Image config: %v", err)
}

config.PrefetchBufferSize = int(tempConfig["prefetch_buffer_size"].(float64))
config.SamplesBufferSize = int(tempConfig["samples_buffer_size"].(float64))
config.Concurrency = int(tempConfig["concurrency"].(float64))
config.PrefetchBufferSize = int32(tempConfig["prefetch_buffer_size"].(float64))
config.SamplesBufferSize = int32(tempConfig["samples_buffer_size"].(float64))
if err != nil {
log.Panicf("Error unmarshalling JSON: %v", err)
}
Expand Down Expand Up @@ -187,13 +184,13 @@ func GetClient(config DatagoConfig) *DatagoClient {
if err != nil {
return nil
} else {
backend = BackendHTTP{config: &dbConfig, concurrency: config.Concurrency}
backend = BackendHTTP{config: &dbConfig}
}
case SourceFileSystemConfig:
fmt.Println("Creating a FileSystem-backed dataloader.", config.Limit, " max samples")
fsConfig := config.SourceConfig.(SourceFileSystemConfig)
generator = newDatagoGeneratorFileSystem(fsConfig)
backend = BackendFileSystem{config: &config, concurrency: config.Concurrency}
backend = BackendFileSystem{config: &config}
default:
fmt.Println("Unsupported source type")
log.Panic("Unsupported source type")
Expand Down Expand Up @@ -296,7 +293,8 @@ func (c *DatagoClient) GetSample() Sample {
return Sample{}
}

if sample, ok := <-c.chanSamples; ok {
sample, open := <-c.chanSamples
if open {
c.servedSamples++
return sample
}
Expand All @@ -307,18 +305,16 @@ func (c *DatagoClient) GetSample() Sample {

// Stop the background downloads, will clear the memory and CPU footprint
func (c *DatagoClient) Stop() {
fmt.Println("Stopping the datago client")

// Signal the coroutines that next round should be a stop
if c.cancel == nil {
return // Already stopped
}
fmt.Println("Stopping the datago client. Emptying buffers and cancelling ongoing tasks")
c.cancel()

// Clear the channels, in case a commit is blocking
go consumeChannel(c.chanPages)
go consumeChannel(c.chanSampleMetadata)
go consumeChannel(c.chanSamples)
consumeChannel(c.chanPages)
consumeChannel(c.chanSampleMetadata)
consumeChannel(c.chanSamples)

// Wait for all goroutines to finish
if c.waitGroup != nil {
Expand All @@ -336,25 +332,25 @@ func (c *DatagoClient) Stop() {
func (c *DatagoClient) asyncDispatch() {
// Break down the page results and maintain a list of individual items to be processed

defer close(c.chanSampleMetadata)

for {
select {
case <-c.context.Done():
close(c.chanSampleMetadata)
return
case page, open := <-c.chanPages:
default:
page, open := <-c.chanPages
if !open {
fmt.Println("No more metadata to fetch, wrapping up")
close(c.chanSampleMetadata)
return
}

for _, item := range page.samplesDataPointers {
select {
case <-c.context.Done():
close(c.chanSampleMetadata)
return
case c.chanSampleMetadata <- item:
// Item sent to the channel
default:
c.chanSampleMetadata <- item
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/core.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package datago

import "context"
import (
"context"
)

// --- Sample data structures - these will be exposed to the Python world ---------------------------------------------------------------------------------------------------------------------------------------------------------------
type LatentPayload struct {
Expand Down Expand Up @@ -48,5 +50,5 @@ type Generator interface {

// The backend will be responsible for fetching the payloads and deserializing them
type Backend interface {
collectSamples(chanSampleMetadata chan SampleDataPointers, chanSamples chan Sample, transform *ARAwareTransform, pre_encode_images bool)
collectSamples(chanSampleMetadata chan SampleDataPointers, chanSamples chan Sample, transform *ARAwareTransform, encodeImages bool)
}
4 changes: 2 additions & 2 deletions pkg/generator_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ func (f datagoGeneratorDB) generatePages(ctx context.Context, chanPages chan Pag
return &data, nil
}

defer close(chanPages)

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -310,7 +312,6 @@ func (f datagoGeneratorDB) generatePages(ctx context.Context, chanPages chan Pag
// Check if there are more pages to fetch
if data.Next == "" {
fmt.Println("No more pages to fetch, wrapping up")
close(chanPages)
return
}

Expand All @@ -328,7 +329,6 @@ func (f datagoGeneratorDB) generatePages(ctx context.Context, chanPages chan Pag
// Check if we consumed all the retries
if !valid_page {
fmt.Println("Too many errors fetching new pages, wrapping up")
close(chanPages)
return
}
}
Expand Down
46 changes: 28 additions & 18 deletions pkg/generator_filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,36 +71,46 @@ func (f datagoGeneratorFileSystem) generatePages(ctx context.Context, chanPages
// This is meant to be run in a goroutine

var samples []SampleDataPointers
defer close(chanPages)

err := filepath.Walk(f.config.RootPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
select {
case <-ctx.Done():
return nil
default:
if err != nil {
return err
}

if !info.IsDir() && f.extensions.Contains(filepath.Ext(path)) {
if f.config.WorldSize > 1 && hash(path)%f.config.WorldSize != f.config.Rank || f.config.WorldSize == 1 {
new_sample := fsSampleMetadata{FilePath: path, FileName: info.Name()}
samples = append(samples, SampleDataPointers(new_sample))
if !info.IsDir() && f.extensions.Contains(filepath.Ext(path)) {
if f.config.WorldSize > 1 && hash(path)%f.config.WorldSize != f.config.Rank || f.config.WorldSize == 1 {
new_sample := fsSampleMetadata{FilePath: path, FileName: info.Name()}
samples = append(samples, SampleDataPointers(new_sample))
}
}
}

// Check if we have enough files to send a page
if len(samples) >= f.config.PageSize {
chanPages <- Pages{samples}
samples = nil
// Check if we have enough files to send a page
if len(samples) >= f.config.PageSize {
chanPages <- Pages{samples}
samples = nil
}
return nil
}
return nil
})

if err != nil {
fmt.Println("Error walking the path", f.config.RootPath)
panic(err)
} else {
// Send the last page
if len(samples) > 0 {
chanPages <- Pages{samples}

select {
case <-ctx.Done():
return
default:
// Send the last page
if len(samples) > 0 {
chanPages <- Pages{samples}
}
}
}

close(chanPages)
}
1 change: 1 addition & 0 deletions pkg/transforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ARAwareTransform struct {
maxAspectRatio float64
targetImageSizes []ImageSize // list of [width, height] pairs
aspectRatioToSize map[float64]ImageSize
PreEncodeImages bool
}

func buildImageSizeList(defaultImageSize int, downsamplingRatio int, minAspectRatio float64, maxAspectRatio float64) []ImageSize {
Expand Down
Loading

0 comments on commit b99e9bc

Please sign in to comment.