-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupload.go
137 lines (120 loc) · 3.2 KB
/
upload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package smugmug
import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"time"
"golang.org/x/sync/errgroup"
)
// UploadService is the API for the upload endpoint
type UploadService service
// Uploadables is a factory for Uploadable instances
type Uploadables interface {
// Uploadables returns a channel of Uploadable instances
Uploadables(context.Context) (<-chan *Uploadable, <-chan error)
}
// Upload an image to an album
func (s *UploadService) Upload(ctx context.Context, up *Uploadable) (*Upload, error) {
if up.AlbumKey == "" {
return nil, errors.New("missing albumKey")
}
uri := fmt.Sprintf("%s/%s", s.client.uploadURL, up.Name)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uri, up.Reader)
if err != nil {
return nil, err
}
headers := map[string]string{
"Accept": "application/json",
"Content-MD5": up.MD5,
"Content-Length": strconv.FormatInt(up.Size, 10),
"User-Agent": userAgent,
"X-Smug-Version": "v2",
"X-Smug-AlbumUri": "/api/v2/album/" + up.AlbumKey,
"X-Smug-ResponseType": "JSON",
}
if up.Replaces != "" {
headers["X-Smug-ImageUri"] = up.Replaces
}
for key, val := range headers {
req.Header.Set(key, val)
}
t := time.Now()
ur := &uploadResponse{}
err = s.client.do(req, ur)
if err != nil {
return nil, err
}
return ur.Upload(up, time.Since(t)), nil
}
// Uploads consumes Uploadables from uploadables, uploads them to SmugMug returning status in Upload instances
func (s *UploadService) Uploads(ctx context.Context, uploadables Uploadables) (uploads <-chan *Upload, errs <-chan error) {
updc := make(chan *Upload)
errc := make(chan error, 1)
grp, ctx := errgroup.WithContext(ctx)
uploadablesc, uperrc := uploadables.Uploadables(ctx)
grp.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-uperrc:
return err
}
})
for i := 0; i < s.client.concurrency; i++ {
grp.Go(s.uploads(ctx, uploadablesc, updc))
}
go func() {
defer close(errc)
defer close(updc)
if err := grp.Wait(); err != nil {
errc <- err
}
}()
return updc, errc
}
func (s *UploadService) uploads(ctx context.Context, uploadablesc <-chan *Uploadable, updc chan<- *Upload) func() error {
return func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case up, ok := <-uploadablesc:
if !ok {
return nil
}
upload, err := s.Upload(ctx, up)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case updc <- upload:
}
}
}
}
}
type uploadResponse struct {
Stat string `json:"stat"`
Method string `json:"method"`
UploadedImage struct {
StatusImageReplaceURI string `json:"StatusImageReplaceUri"`
ImageURI string `json:"ImageUri"`
AlbumImageURI string `json:"AlbumImageUri"`
URL string `json:"URL"`
} `json:"Image"`
}
func (u *uploadResponse) Upload(up *Uploadable, elapsed time.Duration) *Upload {
return &Upload{
Uploadable: up,
Status: u.Stat,
Method: u.Method,
Elapsed: elapsed,
URL: u.UploadedImage.URL,
ImageURI: u.UploadedImage.ImageURI,
AlbumImageURI: u.UploadedImage.AlbumImageURI,
}
}