Skip to content

Commit

Permalink
Finish trace for premature error
Browse files Browse the repository at this point in the history
  • Loading branch information
iamqizhao committed Dec 1, 2015
1 parent 2011d72 commit 7aa428f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 8 deletions.
47 changes: 40 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
defer func() {
ss.mu.Lock()
if err != nil && err != io.EOF {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
ss.trInfo.tr.SetError()
}
trInfo.tr.Finish()
trInfo.tr = nil
ss.trInfo.tr.Finish()
ss.trInfo.tr = nil
ss.mu.Unlock()
}()
}
Expand All @@ -430,10 +430,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if trInfo != nil {
ss.mu.Lock()
if ss.statusCode != codes.OK {
trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
trInfo.tr.SetError()
ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
ss.trInfo.tr.SetError()
} else {
trInfo.tr.LazyLog(stringer("OK"), false)
ss.trInfo.tr.LazyLog(stringer("OK"), false)
}
ss.mu.Unlock()
}
Expand All @@ -448,18 +448,40 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
return
}
service := sm[:pos]
method := sm[pos+1:]
srv, ok := s.m[service]
if !ok {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
return
}
// Unary RPC or Streaming RPC?
Expand All @@ -471,9 +493,20 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
}

// Stop stops the gRPC server. Once Stop returns, the server stops accepting
Expand Down
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type clientStream struct {

tracing bool // set to EnableTracing when the clientStream is created.

mu sync.Mutex
mu sync.Mutex
closed bool
// trInfo.tr is set when the clientStream is created (if EnableTracing is true),
// and is set to nil when the clientStream's finish method is called.
Expand Down

0 comments on commit 7aa428f

Please sign in to comment.