Skip to content

Commit

Permalink
add contextual logging to cencus.go
Browse files Browse the repository at this point in the history
  • Loading branch information
darkdarkdragon committed Dec 14, 2021
1 parent 0747ac6 commit b6cbfda
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 93 deletions.
63 changes: 45 additions & 18 deletions clog/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func Clone(parentCtx, logCtx context.Context) context.Context {
return context.WithValue(parentCtx, clogContextKey, newCmap)
}

// Same creates new context with parentCtx as parent and
// same logging data as in logCtx
func Same(parentCtx, logCtx context.Context) context.Context {
cmap, _ := logCtx.Value(clogContextKey).(*values)
if cmap == nil {
cmap = newValues()
}
return context.WithValue(parentCtx, clogContextKey, cmap)
}

func AddManifestID(ctx context.Context, val string) context.Context {
return AddVal(ctx, manifestID, val)
}
Expand Down Expand Up @@ -104,66 +114,83 @@ func AddVal(ctx context.Context, key, val string) context.Context {
}

func Warningf(ctx context.Context, format string, args ...interface{}) {
glog.WarningDepth(1, formatMessage(ctx, format, args...))
glog.WarningDepth(1, formatMessage(ctx, false, format, args...))
}

func Errorf(ctx context.Context, format string, args ...interface{}) {
glog.ErrorDepth(1, formatMessage(ctx, format, args...))
glog.ErrorDepth(1, formatMessage(ctx, false, format, args...))
}

func Fatalf(ctx context.Context, format string, args ...interface{}) {
glog.FatalDepth(1, formatMessage(ctx, format, args...))
glog.FatalDepth(1, formatMessage(ctx, false, format, args...))
}

func Infof(ctx context.Context, format string, args ...interface{}) {
infof(ctx, format, args...)
infof(ctx, false, format, args...)
}

// Infofe if last argument is not nil it will be printed as " err=%q"
func Infofe(ctx context.Context, format string, args ...interface{}) {
infof(ctx, true, format, args...)
}

func infof(ctx context.Context, format string, args ...interface{}) {
glog.InfoDepth(2, formatMessage(ctx, format, args...))
func infof(ctx context.Context, lastErr bool, format string, args ...interface{}) {
glog.InfoDepth(2, formatMessage(ctx, lastErr, format, args...))
}

// Infof is equivalent to the global Infof function, guarded by the value of v.
// See the documentation of V for usage.
func (v Verbose) Infof(ctx context.Context, format string, args ...interface{}) {
if v {
infof(ctx, format, args...)
infof(ctx, false, format, args...)
}
}

func messageFromContext(ctx context.Context) string {
func (v Verbose) Infofe(ctx context.Context, format string, args ...interface{}) {
if v {
infof(ctx, true, format, args...)
}
}

func messageFromContext(ctx context.Context, sb *strings.Builder) {
if ctx == nil {
return ""
return
}
cmap, _ := ctx.Value(clogContextKey).(*values)
if cmap == nil {
return ""
return
}
cmap.mu.RLock()
var sb strings.Builder
for _, key := range stdKeysOrder {
if val, ok := cmap.vals[key]; ok {
sb.WriteString(key)
sb.WriteString("=")
sb.WriteString(val)
sb.WriteString(" ")
}
}
for key, val := range cmap.vals {
if _, ok := stdKeys[key]; !ok {
sb.WriteString(key)
sb.WriteString("=")
sb.WriteString(val)
sb.WriteString(" ")
}
}
cmap.mu.RUnlock()
return sb.String()
}

func formatMessage(ctx context.Context, format string, args ...interface{}) string {
msg := fmt.Sprintf(format, args...)
mfc := messageFromContext(ctx)
if mfc != "" {
msg = mfc + " " + msg
func formatMessage(ctx context.Context, lastErr bool, format string, args ...interface{}) string {
var sb strings.Builder
messageFromContext(ctx, &sb)
var err interface{}
if lastErr && len(args) > 0 {
err = args[len(args)-1]
args = args[:len(args)-1]
}
return msg
sb.WriteString(fmt.Sprintf(format, args...))
if err != nil {
sb.WriteString(fmt.Sprintf(" err=%q", err))
}
return sb.String()
}
2 changes: 1 addition & 1 deletion core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func (n *LivepeerNode) transcodeSeg(ctx context.Context, config transcodeConfig,
took := time.Since(start)
clog.V(common.DEBUG).Infof(ctx, "Transcoding of segment took=%v", took)
if monitor.Enabled {
monitor.SegmentTranscoded(0, seg.SeqNo, md.Duration, took, common.ProfilesNames(md.Profiles), true, true)
monitor.SegmentTranscoded(ctx, 0, seg.SeqNo, md.Duration, took, common.ProfilesNames(md.Profiles), true, true)
}

// Prepare the result object
Expand Down
4 changes: 2 additions & 2 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMeta
// When orchestrator works as transcoder, `fname` will be relative path to file in local
// filesystem and will not contain seqNo in it. For that case `SegmentTranscoded` will
// be called in orchestrator.go
monitor.SegmentTranscoded(0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles), true, true)
monitor.SegmentTranscoded(ctx, 0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles), true, true)
}

return resToTranscodeData(ctx, res, opts)
Expand Down Expand Up @@ -108,7 +108,7 @@ func (nv *NvidiaTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
// When orchestrator works as transcoder, `fname` will be relative path to file in local
// filesystem and will not contain seqNo in it. For that case `SegmentTranscoded` will
// be called in orchestrator.go
monitor.SegmentTranscoded(0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles), true, true)
monitor.SegmentTranscoded(ctx, 0, seqNo, md.Duration, time.Since(start), common.ProfilesNames(profiles), true, true)
}

return resToTranscodeData(ctx, res, out)
Expand Down
50 changes: 25 additions & 25 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/clog"

"contrib.go.opencensus.io/exporter/prometheus"
rprom "github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -1004,7 +1005,7 @@ func FastVerificationEnabledAndUsingCurrentSessions(enabled, using int) {
stats.Record(census.ctx, census.mFastVerificationEnabledCurrentSessions.M(int64(enabled)), census.mFastVerificationUsingCurrentSessions.M(int64(using)))
}

func TranscodeTry(nonce, seqNo uint64) {
func TranscodeTry(ctx context.Context, nonce, seqNo uint64) {
census.lock.Lock()
defer census.lock.Unlock()
if av, ok := census.success[nonce]; ok {
Expand All @@ -1029,7 +1030,7 @@ func TranscodeTry(nonce, seqNo uint64) {
} else {
av.tries[seqNo] = tryData{tries: 1, first: time.Now()}
}
glog.V(logLevel).Infof("Trying to transcode segment nonce=%d seqNo=%d try=%d", nonce, seqNo, try)
clog.V(logLevel).Infof(ctx, "Trying to transcode segment nonce=%d seqNo=%d try=%d", nonce, seqNo, try)
}
}

Expand All @@ -1041,8 +1042,8 @@ func SetTranscodersNumberAndLoad(load, capacity, number int) {
stats.Record(census.ctx, census.mTranscodersNumber.M(int64(number)))
}

func SegmentEmerged(nonce, seqNo uint64, profilesNum int, dur float64) {
glog.V(logLevel).Infof("Logging SegmentEmerged... nonce=%d seqNo=%d duration=%v", nonce, seqNo, dur)
func SegmentEmerged(ctx context.Context, nonce, seqNo uint64, profilesNum int, dur float64) {
clog.V(logLevel).Infof(ctx, "Logging SegmentEmerged... duration=%v", dur)
if census.nodeType == Broadcaster {
census.segmentEmerged(nonce, seqNo, profilesNum)
}
Expand All @@ -1062,9 +1063,8 @@ func (cen *censusMetricsCounter) segmentEmerged(nonce, seqNo uint64, profilesNum
stats.Record(cen.ctx, cen.mSegmentEmergedUnprocessed.M(1))
}

func SourceSegmentAppeared(nonce, seqNo uint64, manifestID, profile string, recordingEnabled bool) {
glog.V(logLevel).Infof("Logging SourceSegmentAppeared... nonce=%d manifestID=%s seqNo=%d profile=%s", nonce,
manifestID, seqNo, profile)
func SourceSegmentAppeared(ctx context.Context, nonce, seqNo uint64, manifestID, profile string, recordingEnabled bool) {
clog.V(logLevel).Infof(ctx, "Logging SourceSegmentAppeared... profile=%s", profile)
census.segmentSourceAppeared(nonce, seqNo, profile, recordingEnabled)
}

Expand All @@ -1088,17 +1088,17 @@ func (cen *censusMetricsCounter) segmentSourceAppeared(nonce, seqNo uint64, prof
stats.Record(ctx, cen.mSegmentSourceAppeared.M(1))
}

func SegmentUploaded(nonce, seqNo uint64, uploadDur time.Duration) {
glog.V(logLevel).Infof("Logging SegmentUploaded... nonce=%d seqNo=%d dur=%s", nonce, seqNo, uploadDur)
func SegmentUploaded(ctx context.Context, nonce, seqNo uint64, uploadDur time.Duration) {
clog.V(logLevel).Infof(ctx, "Logging SegmentUploaded... dur=%s", uploadDur)
census.segmentUploaded(nonce, seqNo, uploadDur)
}

func (cen *censusMetricsCounter) segmentUploaded(nonce, seqNo uint64, uploadDur time.Duration) {
stats.Record(cen.ctx, cen.mSegmentUploaded.M(1), cen.mUploadTime.M(uploadDur.Seconds()))
}

func SegmentDownloaded(nonce, seqNo uint64, downloadDur time.Duration) {
glog.V(logLevel).Infof("Logging SegmentDownloaded... nonce=%d seqNo=%d dur=%s", nonce, seqNo, downloadDur)
func SegmentDownloaded(ctx context.Context, nonce, seqNo uint64, downloadDur time.Duration) {
clog.V(logLevel).Infof(ctx, "Logging SegmentDownloaded... dur=%s", downloadDur)
census.segmentDownloaded(nonce, seqNo, downloadDur)
}

Expand Down Expand Up @@ -1141,7 +1141,7 @@ func (cen *censusMetricsCounter) authWebhookFinished(dur time.Duration) {
stats.Record(cen.ctx, cen.mAuthWebhookTime.M(float64(dur)/float64(time.Millisecond)))
}

func SegmentUploadFailed(nonce, seqNo uint64, code SegmentUploadError, err error, permanent bool) {
func SegmentUploadFailed(ctx context.Context, nonce, seqNo uint64, code SegmentUploadError, err error, permanent bool) {
if code == SegmentUploadErrorUnknown {
reason := err.Error()
var timedout bool
Expand All @@ -1156,7 +1156,7 @@ func SegmentUploadFailed(nonce, seqNo uint64, code SegmentUploadError, err error
code = SegmentUploadErrorSessionEnded
}
}
glog.Errorf("Logging SegmentUploadFailed... code=%v reason='%s'", code, err.Error())
clog.Errorf(ctx, "Logging SegmentUploadFailed... code=%v reason='%s'", code, err.Error())

census.segmentUploadFailed(nonce, seqNo, code, permanent)
}
Expand All @@ -1180,10 +1180,10 @@ func (cen *censusMetricsCounter) segmentUploadFailed(nonce, seqNo uint64, code S
}
}

func SegmentTranscoded(nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string,
func SegmentTranscoded(ctx context.Context, nonce, seqNo uint64, sourceDur time.Duration, transcodeDur time.Duration, profiles string,
trusted, verified bool) {

glog.V(logLevel).Infof("Logging SegmentTranscode nonce=%d seqNo=%d dur=%s trusted=%v verified=%v", nonce, seqNo, transcodeDur, trusted, verified)
clog.V(logLevel).Infof(ctx, "Logging SegmentTranscode nonce=%d seqNo=%d dur=%s trusted=%v verified=%v", nonce, seqNo, transcodeDur, trusted, verified)
census.segmentTranscoded(nonce, seqNo, sourceDur, transcodeDur, profiles, trusted, verified)
}

Expand All @@ -1208,8 +1208,8 @@ func (cen *censusMetricsCounter) segmentTranscoded(nonce, seqNo uint64, sourceDu
stats.Record(ctx, cen.mSegmentTranscoded.M(1), cen.mTranscodeTime.M(transcodeDur.Seconds()), cen.mTranscodeScore.M(sourceDur.Seconds()/transcodeDur.Seconds()))
}

func SegmentTranscodeFailed(subType SegmentTranscodeError, nonce, seqNo uint64, err error, permanent bool) {
glog.Errorf("Logging SegmentTranscodeFailed subtype=%v nonce=%d seqNo=%d err=%q", subType, nonce, seqNo, err.Error())
func SegmentTranscodeFailed(ctx context.Context, subType SegmentTranscodeError, nonce, seqNo uint64, err error, permanent bool) {
clog.Errorf(ctx, "Logging SegmentTranscodeFailed subtype=%v err=%q", subType, err.Error())
census.segmentTranscodeFailed(nonce, seqNo, subType, permanent)
}

Expand Down Expand Up @@ -1290,15 +1290,15 @@ func RecordingSegmentSaved(dur time.Duration, err error) {
}
}

func TranscodedSegmentAppeared(nonce, seqNo uint64, profile string, recordingEnabled bool) {
glog.V(logLevel).Infof("Logging LogTranscodedSegmentAppeared... nonce=%d seqNo=%d profile=%s", nonce, seqNo, profile)
census.segmentTranscodedAppeared(nonce, seqNo, profile, recordingEnabled)
func TranscodedSegmentAppeared(ctx context.Context, nonce, seqNo uint64, profile string, recordingEnabled bool) {
clog.V(logLevel).Infof(ctx, "Logging LogTranscodedSegmentAppeared... profile=%s", profile)
census.segmentTranscodedAppeared(ctx, nonce, seqNo, profile, recordingEnabled)
}

func (cen *censusMetricsCounter) segmentTranscodedAppeared(nonce, seqNo uint64, profile string, recordingEnabled bool) {
func (cen *censusMetricsCounter) segmentTranscodedAppeared(ctx context.Context, nonce, seqNo uint64, profile string, recordingEnabled bool) {
cen.lock.Lock()
defer cen.lock.Unlock()
ctx, err := tag.New(cen.ctx, tag.Insert(cen.kProfile, profile))
ctx, err := tag.New(clog.Same(cen.ctx, ctx), tag.Insert(cen.kProfile, profile))
if err != nil {
glog.Error("Error creating context", err)
return
Expand All @@ -1307,7 +1307,7 @@ func (cen *censusMetricsCounter) segmentTranscodedAppeared(nonce, seqNo uint64,
// cen.transcodedSegments[nonce] = cen.transcodedSegments[nonce] + 1
if st, ok := cen.emergeTimes[nonce][seqNo]; ok {
latency := time.Since(st)
glog.V(logLevel).Infof("Recording latency for segment nonce=%d seqNo=%d profile=%s latency=%s", nonce, seqNo, profile, latency)
clog.V(logLevel).Infof(ctx, "Recording latency for segment profile=%s latency=%s", profile, latency)
stats.Record(ctx, cen.mTranscodeLatency.M(latency.Seconds()))
}

Expand Down Expand Up @@ -1364,8 +1364,8 @@ func (cen *censusMetricsCounter) streamStarted(nonce uint64) {
stats.Record(cen.ctx, cen.mStreamStarted.M(1))
}

func StreamEnded(nonce uint64) {
glog.V(logLevel).Infof("Logging StreamEnded... nonce=%d", nonce)
func StreamEnded(ctx context.Context, nonce uint64) {
clog.V(logLevel).Infof(ctx, "Logging StreamEnded... nonce=%d", nonce)
census.streamEnded(nonce)
}

Expand Down
28 changes: 14 additions & 14 deletions monitor/census_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,36 +57,36 @@ func TestLastSegmentTimeout(t *testing.T) {
if len(census.success) != 1 {
t.Fatal("Should be one stream")
}
SegmentEmerged(1, 1, 3, 1)
SegmentEmerged(context.TODO(), 1, 1, 3, 1)
if sr := census.successRate(); sr != 1 {
t.Fatalf("Success rate should be 1, not %f", sr)
}
SegmentFullyTranscoded(1, 1, "ps", "")
if sr := census.successRate(); sr != 1 {
t.Fatalf("Success rate should be 1, not %f", sr)
}
SegmentEmerged(1, 2, 3, 1)
SegmentTranscodeFailed(SegmentTranscodeErrorOrchestratorBusy, 1, 2, fmt.Errorf("some"), true)
SegmentEmerged(context.TODO(), 1, 2, 3, 1)
SegmentTranscodeFailed(context.TODO(), SegmentTranscodeErrorOrchestratorBusy, 1, 2, fmt.Errorf("some"), true)
if sr := census.successRate(); sr != 0.5 {
t.Fatalf("Success rate should be 0.5, not %f", sr)
}
SegmentEmerged(1, 3, 3, 1)
SegmentTranscodeFailed(SegmentTranscodeErrorSessionEnded, 1, 3, fmt.Errorf("some"), true)
SegmentEmerged(1, 4, 3, 1)
SegmentEmerged(context.TODO(), 1, 3, 3, 1)
SegmentTranscodeFailed(context.TODO(), SegmentTranscodeErrorSessionEnded, 1, 3, fmt.Errorf("some"), true)
SegmentEmerged(context.TODO(), 1, 4, 3, 1)
SegmentFullyTranscoded(1, 4, "ps", "")
if sr := census.successRate(); sr != 0.75 {
t.Fatalf("Success rate should be 0.75, not %f", sr)
}
StreamEnded(1)
StreamEnded(context.TODO(), 1)
if len(census.success) != 0 {
t.Fatalf("Should be no streams, instead have %d", len(census.success))
}

StreamCreated("h1", 2)
SegmentEmerged(2, 1, 3, 1)
SegmentEmerged(context.TODO(), 2, 1, 3, 1)
SegmentFullyTranscoded(2, 1, "ps", "")
SegmentEmerged(2, 2, 3, 1)
StreamEnded(2)
SegmentEmerged(context.TODO(), 2, 2, 3, 1)
StreamEnded(context.TODO(), 2)
if len(census.success) != 1 {
t.Fatalf("Should be one stream, instead have %d", len(census.success))
}
Expand All @@ -109,17 +109,17 @@ func TestLastSegmentTimeout(t *testing.T) {
timeToWaitForError = old1

StreamCreated("h3", 3)
SegmentEmerged(3, 1, 3, 1)
SegmentEmerged(context.TODO(), 3, 1, 3, 1)
SegmentFullyTranscoded(3, 1, "ps", "")
SegmentEmerged(3, 2, 3, 1)
StreamEnded(3)
SegmentEmerged(context.TODO(), 3, 2, 3, 1)
StreamEnded(context.TODO(), 3)
if len(census.success) != 1 {
t.Fatalf("Should be one stream, instead have %d", len(census.success))
}
if sr := census.successRate(); sr != 1 {
t.Fatalf("Success rate should be 1, not %f", sr)
}
SegmentTranscodeFailed(SegmentTranscodeErrorOrchestratorBusy, 3, 2, fmt.Errorf("some"), true)
SegmentTranscodeFailed(context.TODO(), SegmentTranscodeErrorOrchestratorBusy, 3, 2, fmt.Errorf("some"), true)
if sr := census.successRate(); sr != 0.5 {
t.Fatalf("Success rate should be 0.5, not %f", sr)
}
Expand Down
Loading

0 comments on commit b6cbfda

Please sign in to comment.