diff options
author | Igor Drozdov <idrozdov@gitlab.com> | 2022-02-18 13:10:38 +0300 |
---|---|---|
committer | Igor Drozdov <idrozdov@gitlab.com> | 2022-03-07 16:54:15 +0300 |
commit | e1ddbdd161a28ff53ca4d3b3f0fc4fa19687d80b (patch) | |
tree | 9427a8f7add05d0eece7df0c48638209bea88d8d | |
parent | ede95ae77b591fdffab6ea1f7b1c01e4402af2e1 (diff) | |
download | gitlab-shell-e1ddbdd161a28ff53ca4d3b3f0fc4fa19687d80b.tar.gz |
Reuse Gitaly conns and Sidechannel
When gitlab-sshd has been introduced we've started running our
own SSH server. In this case we're able to cache and reuse
Gitaly connections and Registry.
It helps to reduce memory usage.
-rw-r--r-- | cmd/gitlab-shell/main.go | 2 | ||||
-rw-r--r-- | cmd/gitlab-sshd/main.go | 2 | ||||
-rw-r--r-- | internal/command/receivepack/gitalycall.go | 12 | ||||
-rw-r--r-- | internal/command/uploadarchive/gitalycall.go | 12 | ||||
-rw-r--r-- | internal/command/uploadpack/gitalycall.go | 18 | ||||
-rw-r--r-- | internal/command/uploadpack/gitalycall_test.go | 11 | ||||
-rw-r--r-- | internal/config/config.go | 3 | ||||
-rw-r--r-- | internal/gitaly/gitaly.go | 133 | ||||
-rw-r--r-- | internal/gitaly/gitaly_test.go | 53 | ||||
-rw-r--r-- | internal/handler/exec.go | 114 | ||||
-rw-r--r-- | internal/handler/exec_test.go | 102 | ||||
-rw-r--r-- | internal/metrics/metrics.go | 19 |
12 files changed, 333 insertions, 148 deletions
diff --git a/cmd/gitlab-shell/main.go b/cmd/gitlab-shell/main.go index 693140d..370dc2d 100644 --- a/cmd/gitlab-shell/main.go +++ b/cmd/gitlab-shell/main.go @@ -68,6 +68,8 @@ func main() { ctx, finished := command.Setup(executable.Name, config) defer finished() + config.GitalyClient.InitSidechannelRegistry(ctx) + cmdName := reflect.TypeOf(cmd).String() ctxlog := log.ContextLogger(ctx) ctxlog.WithFields(log.Fields{"env": env, "command": cmdName}).Info("gitlab-shell: main: executing command") diff --git a/cmd/gitlab-sshd/main.go b/cmd/gitlab-sshd/main.go index 165c7a5..8040a54 100644 --- a/cmd/gitlab-sshd/main.go +++ b/cmd/gitlab-sshd/main.go @@ -69,6 +69,8 @@ func main() { ctx, finished := command.Setup("gitlab-sshd", cfg) defer finished() + cfg.GitalyClient.InitSidechannelRegistry(ctx) + server, err := sshd.NewServer(cfg) if err != nil { log.WithError(err).Fatal("Failed to start GitLab built-in sshd") diff --git a/internal/command/receivepack/gitalycall.go b/internal/command/receivepack/gitalycall.go index 6bd9f49..c2dd2de 100644 --- a/internal/command/receivepack/gitalycall.go +++ b/internal/command/receivepack/gitalycall.go @@ -13,13 +13,7 @@ import ( ) func (c *Command) performGitalyCall(ctx context.Context, response *accessverifier.Response) error { - gc := &handler.GitalyCommand{ - Config: c.Config, - ServiceName: string(commandargs.ReceivePack), - Address: response.Gitaly.Address, - Token: response.Gitaly.Token, - Features: response.Gitaly.Features, - } + gc := handler.NewGitalyCommand(c.Config, string(commandargs.ReceivePack), response) request := &pb.SSHReceivePackRequest{ Repository: &response.Gitaly.Repo, @@ -30,8 +24,8 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie GitConfigOptions: response.GitConfigOptions, } - return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) { - ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env) + return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) { + ctx, cancel := gc.PrepareContext(ctx, request.Repository, c.Args.Env) defer cancel() rw := c.ReadWriter diff --git a/internal/command/uploadarchive/gitalycall.go b/internal/command/uploadarchive/gitalycall.go index 33f0b71..6051d2c 100644 --- a/internal/command/uploadarchive/gitalycall.go +++ b/internal/command/uploadarchive/gitalycall.go @@ -13,18 +13,12 @@ import ( ) func (c *Command) performGitalyCall(ctx context.Context, response *accessverifier.Response) error { - gc := &handler.GitalyCommand{ - Config: c.Config, - ServiceName: string(commandargs.UploadArchive), - Address: response.Gitaly.Address, - Token: response.Gitaly.Token, - Features: response.Gitaly.Features, - } + gc := handler.NewGitalyCommand(c.Config, string(commandargs.UploadArchive), response) request := &pb.SSHUploadArchiveRequest{Repository: &response.Gitaly.Repo} - return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) { - ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env) + return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) { + ctx, cancel := gc.PrepareContext(ctx, request.Repository, c.Args.Env) defer cancel() rw := c.ReadWriter diff --git a/internal/command/uploadpack/gitalycall.go b/internal/command/uploadpack/gitalycall.go index 43ae4f8..96dd823 100644 --- a/internal/command/uploadpack/gitalycall.go +++ b/internal/command/uploadpack/gitalycall.go @@ -13,26 +13,20 @@ import ( ) func (c *Command) performGitalyCall(ctx context.Context, response *accessverifier.Response) error { - gc := &handler.GitalyCommand{ - Config: c.Config, - ServiceName: string(commandargs.UploadPack), - Address: response.Gitaly.Address, - Token: response.Gitaly.Token, - Features: response.Gitaly.Features, - } + gc := handler.NewGitalyCommand(c.Config, string(commandargs.UploadPack), response) if response.Gitaly.UseSidechannel { - gc.DialSidechannel = true request := &pb.SSHUploadPackWithSidechannelRequest{ Repository: &response.Gitaly.Repo, GitProtocol: c.Args.Env.GitProtocolVersion, GitConfigOptions: response.GitConfigOptions, } - return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) { - ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env) + return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) { + ctx, cancel := gc.PrepareContext(ctx, request.Repository, c.Args.Env) defer cancel() + registry := c.Config.GitalyClient.SidechannelRegistry rw := c.ReadWriter return client.UploadPackWithSidechannel(ctx, conn, registry, rw.In, rw.Out, rw.ErrOut, request) }) @@ -44,8 +38,8 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie GitConfigOptions: response.GitConfigOptions, } - return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) { - ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env) + return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) { + ctx, cancel := gc.PrepareContext(ctx, request.Repository, c.Args.Env) defer cancel() rw := c.ReadWriter diff --git a/internal/command/uploadpack/gitalycall_test.go b/internal/command/uploadpack/gitalycall_test.go index 213b6f9..e0a15ee 100644 --- a/internal/command/uploadpack/gitalycall_test.go +++ b/internal/command/uploadpack/gitalycall_test.go @@ -97,15 +97,18 @@ func TestUploadPack_withSidechannel(t *testing.T) { Env: env, } + ctx := correlation.ContextWithCorrelation(context.Background(), "a-correlation-id") + ctx = correlation.ContextWithClientName(ctx, "gitlab-shell-tests") + + cfg := &config.Config{GitlabUrl: url} + cfg.GitalyClient.InitSidechannelRegistry(ctx) + cmd := &Command{ - Config: &config.Config{GitlabUrl: url}, + Config: cfg, Args: args, ReadWriter: &readwriter.ReadWriter{ErrOut: output, Out: output, In: input}, } - ctx := correlation.ContextWithCorrelation(context.Background(), "a-correlation-id") - ctx = correlation.ContextWithClientName(ctx, "gitlab-shell-tests") - err := cmd.Execute(ctx) require.NoError(t, err) diff --git a/internal/config/config.go b/internal/config/config.go index efce9f9..c4dc073 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,7 @@ import ( yaml "gopkg.in/yaml.v2" "gitlab.com/gitlab-org/gitlab-shell/client" + "gitlab.com/gitlab-org/gitlab-shell/internal/gitaly" "gitlab.com/gitlab-org/gitlab-shell/internal/metrics" ) @@ -59,6 +60,8 @@ type Config struct { httpClient *client.HttpClient httpClientErr error httpClientOnce sync.Once + + GitalyClient gitaly.Client } // The defaults to apply before parsing the config file(s). diff --git a/internal/gitaly/gitaly.go b/internal/gitaly/gitaly.go new file mode 100644 index 0000000..a6d8128 --- /dev/null +++ b/internal/gitaly/gitaly.go @@ -0,0 +1,133 @@ +package gitaly + +import ( + "context" + "fmt" + "sync" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" + + gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" + "gitlab.com/gitlab-org/gitaly/v14/client" + gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/labkit/correlation" + grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" + "gitlab.com/gitlab-org/labkit/log" + grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" + + "gitlab.com/gitlab-org/gitlab-shell/internal/metrics" +) + +type Command struct { + ServiceName string + Address string + Token string + DialSidechannel bool +} + +type connectionsCache struct { + sync.RWMutex + + connections map[Command]*grpc.ClientConn +} + +type Client struct { + SidechannelRegistry *gitalyclient.SidechannelRegistry + + cache connectionsCache +} + +func (c *Client) InitSidechannelRegistry(ctx context.Context) { + c.SidechannelRegistry = gitalyclient.NewSidechannelRegistry(log.ContextLogger(ctx)) +} + +func (c *Client) GetConnection(ctx context.Context, cmd Command) (*grpc.ClientConn, error) { + c.cache.RLock() + conn := c.cache.connections[cmd] + c.cache.RUnlock() + + if conn != nil { + return conn, nil + } + + c.cache.Lock() + defer c.cache.Unlock() + + if conn := c.cache.connections[cmd]; conn != nil { + return conn, nil + } + + conn, err := c.newConnection(ctx, cmd) + if err != nil { + return nil, err + } + + if c.cache.connections == nil { + c.cache.connections = make(map[Command]*grpc.ClientConn) + } + + c.cache.connections[cmd] = conn + + return conn, nil +} + +func (c *Client) newConnection(ctx context.Context, cmd Command) (conn *grpc.ClientConn, err error) { + defer func() { + label := "ok" + if err != nil { + label = "fail" + } + metrics.GitalyConnectionsTotal.WithLabelValues(label).Inc() + }() + + if cmd.Address == "" { + return nil, fmt.Errorf("no gitaly_address given") + } + + serviceName := correlation.ExtractClientNameFromContext(ctx) + if serviceName == "" { + serviceName = "gitlab-shell-unknown" + + log.WithContextFields(ctx, log.Fields{"service_name": serviceName}).Warn("No gRPC service name specified, defaulting to gitlab-shell-unknown") + } + + serviceName = fmt.Sprintf("%s-%s", serviceName, cmd.ServiceName) + + connOpts := client.DefaultDialOpts + connOpts = append( + connOpts, + grpc.WithStreamInterceptor( + grpc_middleware.ChainStreamClient( + grpctracing.StreamClientTracingInterceptor(), + grpc_prometheus.StreamClientInterceptor, + grpccorrelation.StreamClientCorrelationInterceptor( + grpccorrelation.WithClientName(serviceName), + ), + ), + ), + + grpc.WithUnaryInterceptor( + grpc_middleware.ChainUnaryClient( + grpctracing.UnaryClientTracingInterceptor(), + grpc_prometheus.UnaryClientInterceptor, + grpccorrelation.UnaryClientCorrelationInterceptor( + grpccorrelation.WithClientName(serviceName), + ), + ), + ), + ) + + if cmd.Token != "" { + connOpts = append(connOpts, + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cmd.Token)), + ) + } + + if cmd.DialSidechannel { + return client.DialSidechannel(ctx, cmd.Address, c.SidechannelRegistry, connOpts) + } + + return client.DialContext(ctx, cmd.Address, connOpts) +} diff --git a/internal/gitaly/gitaly_test.go b/internal/gitaly/gitaly_test.go new file mode 100644 index 0000000..9e1ee1a --- /dev/null +++ b/internal/gitaly/gitaly_test.go @@ -0,0 +1,53 @@ +package gitaly + +import ( + "context" + "testing" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitlab-shell/internal/metrics" +) + +func TestPrometheusMetrics(t *testing.T) { + metrics.GitalyConnectionsTotal.Reset() + + c := &Client{} + + cmd := Command{ServiceName: "git-upload-pack", Address: "tcp://localhost:9999"} + c.newConnection(context.Background(), cmd) + c.newConnection(context.Background(), cmd) + + require.Equal(t, 1, testutil.CollectAndCount(metrics.GitalyConnectionsTotal)) + require.InDelta(t, 2, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("ok")), 0.1) + require.InDelta(t, 0, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("fail")), 0.1) + + cmd = Command{Address: ""} + c.newConnection(context.Background(), cmd) + + require.InDelta(t, 2, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("ok")), 0.1) + require.InDelta(t, 1, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("fail")), 0.1) +} + +func TestCachedConnections(t *testing.T) { + c := &Client{} + + require.Len(t, c.cache.connections, 0) + + cmd := Command{ServiceName: "git-upload-pack", Address: "tcp://localhost:9999"} + + conn, err := c.GetConnection(context.Background(), cmd) + require.NoError(t, err) + require.Len(t, c.cache.connections, 1) + + newConn, err := c.GetConnection(context.Background(), cmd) + require.NoError(t, err) + require.Len(t, c.cache.connections, 1) + require.Equal(t, conn, newConn) + + cmd = Command{ServiceName: "git-upload-pack", Address: "tcp://localhost:9998"} + _, err = c.GetConnection(context.Background(), cmd) + require.NoError(t, err) + require.Len(t, c.cache.connections, 2) +} diff --git a/internal/handler/exec.go b/internal/handler/exec.go index 9bd8018..44b02a7 100644 --- a/internal/handler/exec.go +++ b/internal/handler/exec.go @@ -6,56 +6,57 @@ import ( "strconv" "strings" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" grpccodes "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" grpcstatus "google.golang.org/grpc/status" "gitlab.com/gitlab-org/gitlab-shell/internal/config" + "gitlab.com/gitlab-org/gitlab-shell/internal/gitaly" "gitlab.com/gitlab-org/gitlab-shell/internal/gitlabnet/accessverifier" "gitlab.com/gitlab-org/gitlab-shell/internal/sshenv" - gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" - "gitlab.com/gitlab-org/gitaly/v14/client" pb "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" - "gitlab.com/gitlab-org/labkit/correlation" - grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" "gitlab.com/gitlab-org/labkit/log" - grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" ) // GitalyHandlerFunc implementations are responsible for making // an appropriate Gitaly call using the provided client and context // and returning an error from the Gitaly call. -type GitalyHandlerFunc func(ctx context.Context, client *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) +type GitalyHandlerFunc func(ctx context.Context, client *grpc.ClientConn) (int32, error) type GitalyCommand struct { - Config *config.Config - ServiceName string - Address string - Token string - Features map[string]string - DialSidechannel bool + Config *config.Config + Response *accessverifier.Response + Command gitaly.Command +} + +func NewGitalyCommand(cfg *config.Config, serviceName string, response *accessverifier.Response) *GitalyCommand { + gc := gitaly.Command{ + ServiceName: serviceName, + Address: response.Gitaly.Address, + Token: response.Gitaly.Token, + DialSidechannel: response.Gitaly.UseSidechannel, + } + + return &GitalyCommand{Config: cfg, Response: response, Command: gc} } // RunGitalyCommand provides a bootstrap for Gitaly commands executed // through GitLab-Shell. It ensures that logging, tracing and other // common concerns are configured before executing the `handler`. func (gc *GitalyCommand) RunGitalyCommand(ctx context.Context, handler GitalyHandlerFunc) error { - registry := client.NewSidechannelRegistry(log.ContextLogger(ctx)) - conn, err := getConn(ctx, gc, registry) + // We leave the connection open for future reuse + conn, err := gc.getConn(ctx) if err != nil { log.ContextLogger(ctx).WithError(fmt.Errorf("RunGitalyCommand: %v", err)).Error("Failed to get connection to execute Git command") return err } - defer conn.Close() - childCtx := withOutgoingMetadata(ctx, gc.Features) + childCtx := withOutgoingMetadata(ctx, gc.Response.Gitaly.Features) ctxlog := log.ContextLogger(childCtx) - exitStatus, err := handler(childCtx, conn, registry) + exitStatus, err := handler(childCtx, conn) if err != nil { if grpcstatus.Convert(err).Code() == grpccodes.Unavailable { @@ -72,35 +73,35 @@ func (gc *GitalyCommand) RunGitalyCommand(ctx context.Context, handler GitalyHan // PrepareContext wraps a given context with a correlation ID and logs the command to // be run. -func (gc *GitalyCommand) PrepareContext(ctx context.Context, repository *pb.Repository, response *accessverifier.Response, env sshenv.Env) (context.Context, context.CancelFunc) { +func (gc *GitalyCommand) PrepareContext(ctx context.Context, repository *pb.Repository, env sshenv.Env) (context.Context, context.CancelFunc) { ctx, cancel := context.WithCancel(ctx) - gc.LogExecution(ctx, repository, response, env) + gc.LogExecution(ctx, repository, env) md, ok := metadata.FromOutgoingContext(ctx) if !ok { md = metadata.New(nil) } - md.Append("key_id", strconv.Itoa(response.KeyId)) - md.Append("key_type", response.KeyType) - md.Append("user_id", response.UserId) - md.Append("username", response.Username) + md.Append("key_id", strconv.Itoa(gc.Response.KeyId)) + md.Append("key_type", gc.Response.KeyType) + md.Append("user_id", gc.Response.UserId) + md.Append("username", gc.Response.Username) md.Append("remote_ip", env.RemoteAddr) ctx = metadata.NewOutgoingContext(ctx, md) return ctx, cancel } -func (gc *GitalyCommand) LogExecution(ctx context.Context, repository *pb.Repository, response *accessverifier.Response, env sshenv.Env) { +func (gc *GitalyCommand) LogExecution(ctx context.Context, repository *pb.Repository, env sshenv.Env) { fields := log.Fields{ - "command": gc.ServiceName, + "command": gc.Command.ServiceName, "gl_project_path": repository.GlProjectPath, "gl_repository": repository.GlRepository, - "user_id": response.UserId, - "username": response.Username, + "user_id": gc.Response.UserId, + "username": gc.Response.Username, "git_protocol": env.GitProtocolVersion, "remote_ip": env.RemoteAddr, - "gl_key_type": response.KeyType, - "gl_key_id": response.KeyId, + "gl_key_type": gc.Response.KeyType, + "gl_key_id": gc.Response.KeyId, } log.WithContextFields(ctx, fields).Info("executing git command") @@ -118,53 +119,6 @@ func withOutgoingMetadata(ctx context.Context, features map[string]string) conte return metadata.NewOutgoingContext(ctx, md) } -func getConn(ctx context.Context, gc *GitalyCommand, registry *client.SidechannelRegistry) (*grpc.ClientConn, error) { - if gc.Address == "" { - return nil, fmt.Errorf("no gitaly_address given") - } - - serviceName := correlation.ExtractClientNameFromContext(ctx) - if serviceName == "" { - serviceName = "gitlab-shell-unknown" - - log.WithContextFields(ctx, log.Fields{"service_name": serviceName}).Warn("No gRPC service name specified, defaulting to gitlab-shell-unknown") - } - - serviceName = fmt.Sprintf("%s-%s", serviceName, gc.ServiceName) - - connOpts := client.DefaultDialOpts - connOpts = append( - connOpts, - grpc.WithStreamInterceptor( - grpc_middleware.ChainStreamClient( - grpctracing.StreamClientTracingInterceptor(), - grpc_prometheus.StreamClientInterceptor, - grpccorrelation.StreamClientCorrelationInterceptor( - grpccorrelation.WithClientName(serviceName), - ), - ), - ), - - grpc.WithUnaryInterceptor( - grpc_middleware.ChainUnaryClient( - grpctracing.UnaryClientTracingInterceptor(), - grpc_prometheus.UnaryClientInterceptor, - grpccorrelation.UnaryClientCorrelationInterceptor( - grpccorrelation.WithClientName(serviceName), - ), - ), - ), - ) - - if gc.Token != "" { - connOpts = append(connOpts, - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(gc.Token)), - ) - } - - if gc.DialSidechannel { - return client.DialSidechannel(ctx, gc.Address, registry, connOpts) - } - - return client.DialContext(ctx, gc.Address, connOpts) +func (gc *GitalyCommand) getConn(ctx context.Context) (*grpc.ClientConn, error) { + return gc.Config.GitalyClient.GetConnection(ctx, gc.Command) } diff --git a/internal/handler/exec_test.go b/internal/handler/exec_test.go index 784cf19..8f1d5b2 100644 --- a/internal/handler/exec_test.go +++ b/internal/handler/exec_test.go @@ -11,15 +11,15 @@ import ( "google.golang.org/grpc/metadata" grpcstatus "google.golang.org/grpc/status" - "gitlab.com/gitlab-org/gitaly/v14/client" pb "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "gitlab.com/gitlab-org/gitlab-shell/internal/command/commandargs" "gitlab.com/gitlab-org/gitlab-shell/internal/config" "gitlab.com/gitlab-org/gitlab-shell/internal/gitlabnet/accessverifier" "gitlab.com/gitlab-org/gitlab-shell/internal/sshenv" ) -func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry) (int32, error) { - return func(ctx context.Context, client *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) { +func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn) (int32, error) { + return func(ctx context.Context, client *grpc.ClientConn) (int32, error) { require.NotNil(t, ctx) require.NotNil(t, client) @@ -28,10 +28,13 @@ func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn } func TestRunGitalyCommand(t *testing.T) { - cmd := GitalyCommand{ - Config: &config.Config{}, - Address: "tcp://localhost:9999", - } + cmd := NewGitalyCommand( + &config.Config{}, + string(commandargs.UploadPack), + &accessverifier.Response{ + Gitaly: accessverifier.Gitaly{Address: "tcp://localhost:9999"}, + }, + ) err := cmd.RunGitalyCommand(context.Background(), makeHandler(t, nil)) require.NoError(t, err) @@ -41,6 +44,32 @@ func TestRunGitalyCommand(t *testing.T) { require.Equal(t, err, expectedErr) } +func TestCachingOfGitalyConnections(t *testing.T) { + ctx := context.Background() + cfg := &config.Config{} + cfg.GitalyClient.InitSidechannelRegistry(ctx) + response := &accessverifier.Response{ + Username: "user", + Gitaly: accessverifier.Gitaly{ + Address: "tcp://localhost:9999", + Token: "token", + UseSidechannel: true, + }, + } + + cmd := NewGitalyCommand(cfg, string(commandargs.UploadPack), response) + + conn, err := cmd.getConn(ctx) + require.NoError(t, err) + + // Reuses connection for different users + response.Username = "another-user" + cmd = NewGitalyCommand(cfg, string(commandargs.UploadPack), response) + newConn, err := cmd.getConn(ctx) + require.NoError(t, err) + require.Equal(t, conn, newConn) +} + func TestMissingGitalyAddress(t *testing.T) { cmd := GitalyCommand{Config: &config.Config{}} @@ -49,10 +78,13 @@ func TestMissingGitalyAddress(t *testing.T) { } func TestUnavailableGitalyErr(t *testing.T) { - cmd := GitalyCommand{ - Config: &config.Config{}, - Address: "tcp://localhost:9999", - } + cmd := NewGitalyCommand( + &config.Config{}, + string(commandargs.UploadPack), + &accessverifier.Response{ + Gitaly: accessverifier.Gitaly{Address: "tcp://localhost:9999"}, + }, + ) expectedErr := grpcstatus.Error(grpccodes.Unavailable, "error") err := cmd.RunGitalyCommand(context.Background(), makeHandler(t, expectedErr)) @@ -68,15 +100,20 @@ func TestRunGitalyCommandMetadata(t *testing.T) { }{ { name: "gitaly_feature_flags", - gc: &GitalyCommand{ - Config: &config.Config{}, - Address: "tcp://localhost:9999", - Features: map[string]string{ - "gitaly-feature-cache_invalidator": "true", - "other-ff": "true", - "gitaly-feature-inforef_uploadpack_cache": "false", + gc: NewGitalyCommand( + &config.Config{}, + string(commandargs.UploadPack), + &accessverifier.Response{ + Gitaly: accessverifier.Gitaly{ + Address: "tcp://localhost:9999", + Features: map[string]string{ + "gitaly-feature-cache_invalidator": "true", + "other-ff": "true", + "gitaly-feature-inforef_uploadpack_cache": "false", + }, + }, }, - }, + ), want: map[string]string{ "gitaly-feature-cache_invalidator": "true", "gitaly-feature-inforef_uploadpack_cache": "false", @@ -87,7 +124,7 @@ func TestRunGitalyCommandMetadata(t *testing.T) { t.Run(tt.name, func(t *testing.T) { cmd := tt.gc - err := cmd.RunGitalyCommand(context.Background(), func(ctx context.Context, _ *grpc.ClientConn, _ *client.SidechannelRegistry) (int32, error) { + err := cmd.RunGitalyCommand(context.Background(), func(ctx context.Context, _ *grpc.ClientConn) (int32, error) { md, exists := metadata.FromOutgoingContext(ctx) require.True(t, exists) require.Equal(t, len(tt.want), md.Len()) @@ -117,10 +154,19 @@ func TestPrepareContext(t *testing.T) { }{ { name: "client_identity", - gc: &GitalyCommand{ - Config: &config.Config{}, - Address: "tcp://localhost:9999", - }, + gc: NewGitalyCommand( + &config.Config{}, + string(commandargs.UploadPack), + &accessverifier.Response{ + KeyId: 1, + KeyType: "key", + UserId: "6", + Username: "jane.doe", + Gitaly: accessverifier.Gitaly{ + Address: "tcp://localhost:9999", + }, + }, + ), env: sshenv.Env{ GitProtocolVersion: "protocol", IsSSHConnection: true, @@ -134,12 +180,6 @@ func TestPrepareContext(t *testing.T) { GlRepository: "project-26", GlProjectPath: "group/private", }, - response: &accessverifier.Response{ - KeyId: 1, - KeyType: "key", - UserId: "6", - Username: "jane.doe", - }, want: map[string]string{ "key_id": "1", "key_type": "key", @@ -153,7 +193,7 @@ func TestPrepareContext(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - ctx, cancel := tt.gc.PrepareContext(ctx, tt.repo, tt.response, tt.env) + ctx, cancel := tt.gc.PrepareContext(ctx, tt.repo, tt.env) defer cancel() md, exists := metadata.FromOutgoingContext(ctx) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index c46fb8f..b328445 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -9,9 +9,10 @@ import ( ) const ( - namespace = "gitlab_shell" - sshdSubsystem = "sshd" - httpSubsystem = "http" + namespace = "gitlab_shell" + sshdSubsystem = "sshd" + httpSubsystem = "http" + gitalySubsystem = "gitaly" httpInFlightRequestsMetricName = "in_flight_requests" httpRequestsTotalMetricName = "requests_total" @@ -20,6 +21,8 @@ const ( sshdConnectionsInFlightName = "in_flight_connections" sshdConnectionDuration = "connection_duration_seconds" sshdHitMaxSessions = "concurrent_limited_sessions_total" + + gitalyConnectionsTotalName = "connections_total" ) var ( @@ -61,6 +64,16 @@ var ( }, ) + GitalyConnectionsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: gitalySubsystem, + Name: gitalyConnectionsTotalName, + Help: "Number of Gitaly connections that have been established", + }, + []string{"status"}, + ) + // The metrics and the buckets size are similar to the ones we have for handlers in Labkit // When the MR: https://gitlab.com/gitlab-org/labkit/-/merge_requests/150 is merged, // these metrics can be refactored out of Gitlab Shell code by using the helper function from Labkit |