Skip to content

Commit

Permalink
Fixes hacks from progressreader refactor
Browse files Browse the repository at this point in the history
related to moby#10959

Signed-off-by: bobby abbott <[email protected]>
  • Loading branch information
robertabbott committed Mar 26, 2015
1 parent 667452e commit 0cd6c05
Show file tree
Hide file tree
Showing 18 changed files with 77 additions and 94 deletions.
6 changes: 4 additions & 2 deletions api/client/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"github.com/docker/docker/graph"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/fileutils"
"github.com/docker/docker/pkg/jsonmessage"
flag "github.com/docker/docker/pkg/mflag"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/symlink"
"github.com/docker/docker/pkg/units"
"github.com/docker/docker/pkg/urlutil"
Expand Down Expand Up @@ -198,7 +200,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error {
// Setup an upload progress bar
// FIXME: ProgressReader shouldn't be this annoying to use
if context != nil {
sf := utils.NewStreamFormatter(false)
sf := streamformatter.NewStreamFormatter(false)
body = progressreader.New(progressreader.Config{
In: context,
Out: cli.out,
Expand Down Expand Up @@ -291,7 +293,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error {
headers.Set("Content-Type", "application/tar")
}
err = cli.stream("POST", fmt.Sprintf("/build?%s", v.Encode()), body, cli.out, headers)
if jerr, ok := err.(*utils.JSONError); ok {
if jerr, ok := err.(*jsonmessage.JSONError); ok {
// If no error code is set, default to 1
if jerr.Code == 0 {
jerr.Code = 1
Expand Down
4 changes: 2 additions & 2 deletions api/client/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"github.com/docker/docker/api"
"github.com/docker/docker/autogen/dockerversion"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/docker/pkg/term"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
)

var (
Expand Down Expand Up @@ -164,7 +164,7 @@ func (cli *DockerCli) streamHelper(method, path string, setRawTerminal bool, in
}

if api.MatchesContentType(resp.Header.Get("Content-Type"), "application/json") {
return utils.DisplayJSONMessagesStream(resp.Body, stdout, cli.outFd, cli.isTerminalOut)
return jsonmessage.DisplayJSONMessagesStream(resp.Body, stdout, cli.outFd, cli.isTerminalOut)
}
if stdout != nil || stderr != nil {
// When TTY is ON, use regular copy
Expand Down
7 changes: 4 additions & 3 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/docker/docker/pkg/listenbuffer"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/version"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
Expand Down Expand Up @@ -595,7 +596,7 @@ func postImagesCreate(eng *engine.Engine, version version.Version, w http.Respon
if !job.Stdout.Used() {
return err
}
sf := utils.NewStreamFormatter(version.GreaterThan("1.0"))
sf := streamformatter.NewStreamFormatter(version.GreaterThan("1.0"))
w.Write(sf.FormatError(err))
}

Expand Down Expand Up @@ -680,7 +681,7 @@ func postImagesPush(eng *engine.Engine, version version.Version, w http.Response
if !job.Stdout.Used() {
return err
}
sf := utils.NewStreamFormatter(version.GreaterThan("1.0"))
sf := streamformatter.NewStreamFormatter(version.GreaterThan("1.0"))
w.Write(sf.FormatError(err))
}
return nil
Expand Down Expand Up @@ -1107,7 +1108,7 @@ func postBuild(eng *engine.Engine, version version.Version, w http.ResponseWrite
if !job.Stdout.Used() {
return err
}
sf := utils.NewStreamFormatter(version.GreaterThanOrEqualTo("1.8"))
sf := streamformatter.NewStreamFormatter(version.GreaterThanOrEqualTo("1.8"))
w.Write(sf.FormatError(err))
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion builder/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/docker/docker/daemon"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/fileutils"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/symlink"
"github.com/docker/docker/pkg/tarsum"
Expand Down Expand Up @@ -105,7 +106,7 @@ type Builder struct {

// Deprecated, original writer used for ImagePull. To be removed.
OutOld io.Writer
StreamFormatter *utils.StreamFormatter
StreamFormatter *streamformatter.StreamFormatter

Config *runconfig.Config // runconfig for cmd, run, entrypoint etc.

Expand Down
3 changes: 2 additions & 1 deletion builder/internals.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/chrootarchive"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/stringid"
Expand Down Expand Up @@ -601,7 +602,7 @@ func (b *Builder) run(c *daemon.Container) error {

// Wait for it to finish
if ret, _ := c.WaitStop(-1 * time.Second); ret != 0 {
err := &utils.JSONError{
err := &jsonmessage.JSONError{
Message: fmt.Sprintf("The command %v returned a non-zero code: %d", b.Config.Cmd, ret),
Code: ret,
}
Expand Down
7 changes: 4 additions & 3 deletions builder/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/docker/docker/graph"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/urlutil"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
Expand Down Expand Up @@ -127,16 +128,16 @@ func (b *BuilderJob) CmdBuild(job *engine.Job) error {
}
defer context.Close()

sf := utils.NewStreamFormatter(job.GetenvBool("json"))
sf := streamformatter.NewStreamFormatter(job.GetenvBool("json"))

builder := &Builder{
Daemon: b.Daemon,
Engine: b.Engine,
OutStream: &utils.StdoutFormater{
OutStream: &streamformatter.StdoutFormater{
Writer: job.Stdout,
StreamFormatter: sf,
},
ErrStream: &utils.StderrFormater{
ErrStream: &streamformatter.StderrFormater{
Writer: job.Stdout,
StreamFormatter: sf,
},
Expand Down
14 changes: 7 additions & 7 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ import (
"time"

"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/docker/utils"
)

const eventsLimit = 64

type listener chan<- *utils.JSONMessage
type listener chan<- *jsonmessage.JSONMessage

type Events struct {
mu sync.RWMutex
events []*utils.JSONMessage
events []*jsonmessage.JSONMessage
subscribers []listener
}

func New() *Events {
return &Events{
events: make([]*utils.JSONMessage, 0, eventsLimit),
events: make([]*jsonmessage.JSONMessage, 0, eventsLimit),
}
}

Expand Down Expand Up @@ -63,7 +63,7 @@ func (e *Events) Get(job *engine.Job) error {
timeout.Stop()
}

listener := make(chan *utils.JSONMessage)
listener := make(chan *jsonmessage.JSONMessage)
e.subscribe(listener)
defer e.unsubscribe(listener)

Expand Down Expand Up @@ -107,7 +107,7 @@ func (e *Events) SubscribersCount(job *engine.Job) error {
return nil
}

func writeEvent(job *engine.Job, event *utils.JSONMessage, eventFilters filters.Args) error {
func writeEvent(job *engine.Job, event *jsonmessage.JSONMessage, eventFilters filters.Args) error {
isFiltered := func(field string, filter []string) bool {
if len(filter) == 0 {
return false
Expand Down Expand Up @@ -170,7 +170,7 @@ func (e *Events) subscribersCount() int {
func (e *Events) log(action, id, from string) {
e.mu.Lock()
now := time.Now().UTC().Unix()
jm := &utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: now}
if len(e.events) == cap(e.events) {
// discard oldest event
copy(e.events, e.events[1:])
Expand Down
16 changes: 8 additions & 8 deletions events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"time"

"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
"github.com/docker/docker/pkg/jsonmessage"
)

func TestEventsPublish(t *testing.T) {
e := New()
l1 := make(chan *utils.JSONMessage)
l2 := make(chan *utils.JSONMessage)
l1 := make(chan *jsonmessage.JSONMessage)
l2 := make(chan *jsonmessage.JSONMessage)
e.subscribe(l1)
e.subscribe(l2)
count := e.subscribersCount()
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestEventsPublish(t *testing.T) {

func TestEventsPublishTimeout(t *testing.T) {
e := New()
l := make(chan *utils.JSONMessage)
l := make(chan *jsonmessage.JSONMessage)
e.subscribe(l)

c := make(chan struct{})
Expand Down Expand Up @@ -108,9 +108,9 @@ func TestLogEvents(t *testing.T) {
}
buf = bytes.NewBuffer(buf.Bytes())
dec := json.NewDecoder(buf)
var msgs []utils.JSONMessage
var msgs []jsonmessage.JSONMessage
for {
var jm utils.JSONMessage
var jm jsonmessage.JSONMessage
if err := dec.Decode(&jm); err != nil {
if err == io.EOF {
break
Expand Down Expand Up @@ -138,8 +138,8 @@ func TestEventsCountJob(t *testing.T) {
if err := e.Install(eng); err != nil {
t.Fatal(err)
}
l1 := make(chan *utils.JSONMessage)
l2 := make(chan *utils.JSONMessage)
l1 := make(chan *jsonmessage.JSONMessage)
l2 := make(chan *jsonmessage.JSONMessage)
e.subscribe(l1)
e.subscribe(l2)
job := eng.Job("subscribers_count")
Expand Down
3 changes: 2 additions & 1 deletion graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/truncindex"
"github.com/docker/docker/runconfig"
Expand Down Expand Up @@ -198,7 +199,7 @@ func (graph *Graph) Register(img *image.Image, layerData archive.ArchiveReader)
// The archive is stored on disk and will be automatically deleted as soon as has been read.
// If output is not nil, a human-readable progress bar will be written to it.
// FIXME: does this belong in Graph? How about MktempFile, let the caller use it for archives?
func (graph *Graph) TempLayerArchive(id string, sf *utils.StreamFormatter, output io.Writer) (*archive.TempArchive, error) {
func (graph *Graph) TempLayerArchive(id string, sf *streamformatter.StreamFormatter, output io.Writer) (*archive.TempArchive, error) {
image, err := graph.Get(id)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion graph/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
)
Expand All @@ -23,7 +24,7 @@ func (s *TagStore) CmdImport(job *engine.Job) error {
src = job.Args[0]
repo = job.Args[1]
tag string
sf = utils.NewStreamFormatter(job.GetenvBool("json"))
sf = streamformatter.NewStreamFormatter(job.GetenvBool("json"))
archive archive.ArchiveReader
resp *http.Response
stdoutBuffer = bytes.NewBuffer(nil)
Expand Down
13 changes: 7 additions & 6 deletions graph/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/docker/docker/engine"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
Expand All @@ -28,7 +29,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
var (
localName = job.Args[0]
tag string
sf = utils.NewStreamFormatter(job.GetenvBool("json"))
sf = streamformatter.NewStreamFormatter(job.GetenvBool("json"))
authConfig = &registry.AuthConfig{}
metaHeaders map[string][]string
)
Expand Down Expand Up @@ -107,7 +108,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
return nil
}

func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, askedTag string, sf *utils.StreamFormatter, parallel bool) error {
func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, askedTag string, sf *streamformatter.StreamFormatter, parallel bool) error {
out.Write(sf.FormatStatus("", "Pulling repository %s", repoInfo.CanonicalName))

repoData, err := r.GetRepositoryData(repoInfo.RemoteName)
Expand Down Expand Up @@ -265,7 +266,7 @@ func (s *TagStore) pullRepository(r *registry.Session, out io.Writer, repoInfo *
return nil
}

func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint string, token []string, sf *utils.StreamFormatter) (bool, error) {
func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint string, token []string, sf *streamformatter.StreamFormatter) (bool, error) {
history, err := r.GetRemoteHistory(imgID, endpoint, token)
if err != nil {
return false, err
Expand Down Expand Up @@ -363,7 +364,7 @@ func (s *TagStore) pullImage(r *registry.Session, out io.Writer, imgID, endpoint
return layers_downloaded, nil
}

func WriteStatus(requestedTag string, out io.Writer, sf *utils.StreamFormatter, layers_downloaded bool) {
func WriteStatus(requestedTag string, out io.Writer, sf *streamformatter.StreamFormatter, layers_downloaded bool) {
if layers_downloaded {
out.Write(sf.FormatStatus("", "Status: Downloaded newer image for %s", requestedTag))
} else {
Expand All @@ -382,7 +383,7 @@ type downloadInfo struct {
err chan error
}

func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter, parallel bool) error {
func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out io.Writer, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter, parallel bool) error {
endpoint, err := r.V2RegistryEndpoint(repoInfo.Index)
if err != nil {
if repoInfo.Index.Official {
Expand Down Expand Up @@ -428,7 +429,7 @@ func (s *TagStore) pullV2Repository(eng *engine.Engine, r *registry.Session, out
return nil
}

func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Writer, endpoint *registry.Endpoint, repoInfo *registry.RepositoryInfo, tag string, sf *utils.StreamFormatter, parallel bool, auth *registry.RequestAuthorization) (bool, error) {
func (s *TagStore) pullV2Tag(eng *engine.Engine, r *registry.Session, out io.Writer, endpoint *registry.Endpoint, repoInfo *registry.RepositoryInfo, tag string, sf *streamformatter.StreamFormatter, parallel bool, auth *registry.RequestAuthorization) (bool, error) {
log.Debugf("Pulling tag from V2 registry: %q", tag)

manifestBytes, manifestDigest, err := r.GetV2ImageManifest(endpoint, repoInfo.RemoteName, tag, auth)
Expand Down
Loading

0 comments on commit 0cd6c05

Please sign in to comment.