Skip to content

Commit

Permalink
feat: redirect daemon stdout stderr to file (dragonflyoss#1244)
Browse files Browse the repository at this point in the history
1. redirect daemon stdout stderr to file
2. fix wrong zap grpc log output

Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Apr 14, 2022
1 parent 90fd177 commit 0d10ee4
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 37 deletions.
24 changes: 24 additions & 0 deletions cmd/dfget/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package cmd
import (
"context"
"os"
"path"
"syscall"
"time"

"github.com/gofrs/flock"
Expand Down Expand Up @@ -64,6 +66,8 @@ it supports container engine, wget and other downloading tools through proxy fun
return errors.Wrap(err, "init client daemon logger")
}

redirectStdoutAndStderr(cfg.Console, d.LogDir())

// Convert config
if err := cfg.Convert(); err != nil {
return err
Expand All @@ -78,6 +82,26 @@ it supports container engine, wget and other downloading tools through proxy fun
},
}

// daemon will be launched by dfget command
// redirect stdout and stderr to file for debugging
func redirectStdoutAndStderr(console bool, logDir string) {
// when console log is enabled, skip redirect stdout
if !console {
stdoutPath := path.Join(logDir, "daemon", "stdout.log")
if stdout, err := os.OpenFile(stdoutPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_SYNC, 0644); err != nil {
logger.Warnf("open %s error: %s", stdoutPath, err)
} else if err := syscall.Dup2(int(stdout.Fd()), 1); err != nil {
logger.Warnf("redirect stdout error: %s", err)
}
}
stderrPath := path.Join(logDir, "daemon", "stderr.log")
if stderr, err := os.OpenFile(stderrPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_SYNC, 0644); err != nil {
logger.Warnf("open %s error: %s", stderrPath, err)
} else if err := syscall.Dup2(int(stderr.Fd()), 2); err != nil {
logger.Warnf("redirect stderr error: %s", err)
}
}

func init() {
// Add the command to parent
rootCmd.AddCommand(daemonCmd)
Expand Down
28 changes: 16 additions & 12 deletions manager/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,22 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/manager"
)

var defaultStreamMiddleWares = []grpc.StreamServerInterceptor{
grpc_validator.StreamServerInterceptor(),
grpc_recovery.StreamServerInterceptor(),
grpc_prometheus.StreamServerInterceptor,
grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()),
func defaultStreamMiddleWares() []grpc.StreamServerInterceptor {
return []grpc.StreamServerInterceptor{
grpc_validator.StreamServerInterceptor(),
grpc_recovery.StreamServerInterceptor(),
grpc_prometheus.StreamServerInterceptor,
grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()),
}
}

var defaultUnaryMiddleWares = []grpc.UnaryServerInterceptor{
grpc_validator.UnaryServerInterceptor(),
grpc_recovery.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()),
func defaultUnaryMiddleWares() []grpc.UnaryServerInterceptor {
return []grpc.UnaryServerInterceptor{
grpc_validator.UnaryServerInterceptor(),
grpc_recovery.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()),
}
}

type Server struct {
Expand All @@ -74,8 +78,8 @@ func New(database *database.Database, cache *cache.Cache, searcher searcher.Sear
}

grpcServer := grpc.NewServer(append([]grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(defaultStreamMiddleWares...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(defaultUnaryMiddleWares...)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(defaultStreamMiddleWares()...)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(defaultUnaryMiddleWares()...)),
}, opts...)...)

// Register servers on grpc server
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/cdnsystem/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type proxy struct {
}

func New(seederServer SeederServer, opts ...grpc.ServerOption) *grpc.Server {
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...)
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions(), opts...)...)

// Register servers on grpc server
cdnsystem.RegisterSeederServer(grpcServer, &proxy{server: seederServer})
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/dfdaemon/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type proxy struct {
}

func New(daemonServer DaemonServer, opts ...grpc.ServerOption) *grpc.Server {
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...)
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions(), opts...)...)
dfdaemon.RegisterDaemonServer(grpcServer, &proxy{server: daemonServer})
return grpcServer
}
Expand Down
46 changes: 24 additions & 22 deletions pkg/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,30 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/base/common"
)

var DefaultServerOptions = []grpc.ServerOption{
grpc.ConnectionTimeout(10 * time.Second),
grpc.InitialConnWindowSize(8 * 1024 * 1024),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 1 * time.Minute,
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute,
}),
grpc.MaxConcurrentStreams(100),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
streamServerInterceptor,
grpc_prometheus.StreamServerInterceptor,
grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()),
grpc_validator.StreamServerInterceptor(),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
unaryServerInterceptor,
grpc_prometheus.UnaryServerInterceptor,
grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()),
grpc_validator.UnaryServerInterceptor(),
)),
func DefaultServerOptions() []grpc.ServerOption {
return []grpc.ServerOption{
grpc.ConnectionTimeout(10 * time.Second),
grpc.InitialConnWindowSize(8 * 1024 * 1024),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 1 * time.Minute,
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute,
}),
grpc.MaxConcurrentStreams(100),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
streamServerInterceptor,
grpc_prometheus.StreamServerInterceptor,
grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()),
grpc_validator.StreamServerInterceptor(),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
unaryServerInterceptor,
grpc_prometheus.UnaryServerInterceptor,
grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()),
grpc_validator.UnaryServerInterceptor(),
)),
}
}

func streamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Server struct {
// New returns a new transparent scheduler server from the given options
func New(service *service.Service, opts ...grpc.ServerOption) *grpc.Server {
svr := &Server{service: service}
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...)
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions(), opts...)...)

// Register servers on grpc server
scheduler.RegisterSchedulerServer(grpcServer, svr)
Expand Down

0 comments on commit 0d10ee4

Please sign in to comment.