summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Drozdov <idrozdov@gitlab.com>2022-02-18 13:10:38 +0300
committerIgor Drozdov <idrozdov@gitlab.com>2022-03-07 16:54:15 +0300
commite1ddbdd161a28ff53ca4d3b3f0fc4fa19687d80b (patch)
tree9427a8f7add05d0eece7df0c48638209bea88d8d
parentede95ae77b591fdffab6ea1f7b1c01e4402af2e1 (diff)
downloadgitlab-shell-e1ddbdd161a28ff53ca4d3b3f0fc4fa19687d80b.tar.gz
Reuse Gitaly conns and Sidechannel
When gitlab-sshd has been introduced we've started running our own SSH server. In this case we're able to cache and reuse Gitaly connections and Registry. It helps to reduce memory usage.
-rw-r--r--cmd/gitlab-shell/main.go2
-rw-r--r--cmd/gitlab-sshd/main.go2
-rw-r--r--internal/command/receivepack/gitalycall.go12
-rw-r--r--internal/command/uploadarchive/gitalycall.go12
-rw-r--r--internal/command/uploadpack/gitalycall.go18
-rw-r--r--internal/command/uploadpack/gitalycall_test.go11
-rw-r--r--internal/config/config.go3
-rw-r--r--internal/gitaly/gitaly.go133
-rw-r--r--internal/gitaly/gitaly_test.go53
-rw-r--r--internal/handler/exec.go114
-rw-r--r--internal/handler/exec_test.go102
-rw-r--r--internal/metrics/metrics.go19
12 files changed, 333 insertions, 148 deletions
diff --git a/cmd/gitlab-shell/main.go b/cmd/gitlab-shell/main.go
index 693140d..370dc2d 100644
--- a/cmd/gitlab-shell/main.go
+++ b/cmd/gitlab-shell/main.go
@@ -68,6 +68,8 @@ func main() {
ctx, finished := command.Setup(executable.Name, config)
defer finished()
+ config.GitalyClient.InitSidechannelRegistry(ctx)
+
cmdName := reflect.TypeOf(cmd).String()
ctxlog := log.ContextLogger(ctx)
ctxlog.WithFields(log.Fields{"env": env, "command": cmdName}).Info("gitlab-shell: main: executing command")
diff --git a/cmd/gitlab-sshd/main.go b/cmd/gitlab-sshd/main.go
index 165c7a5..8040a54 100644
--- a/cmd/gitlab-sshd/main.go
+++ b/cmd/gitlab-sshd/main.go
@@ -69,6 +69,8 @@ func main() {
ctx, finished := command.Setup("gitlab-sshd", cfg)
defer finished()
+ cfg.GitalyClient.InitSidechannelRegistry(ctx)
+
server, err := sshd.NewServer(cfg)
if err != nil {
log.WithError(err).Fatal("Failed to start GitLab built-in sshd")
diff --git a/internal/command/receivepack/gitalycall.go b/internal/command/receivepack/gitalycall.go
index 6bd9f49..c2dd2de 100644
--- a/internal/command/receivepack/gitalycall.go
+++ b/internal/command/receivepack/gitalycall.go
@@ -13,13 +13,7 @@ import (
)
func (c *Command) performGitalyCall(ctx context.Context, response *accessverifier.Response) error {
- gc := &handler.GitalyCommand{
- Config: c.Config,
- ServiceName: string(commandargs.ReceivePack),
- Address: response.Gitaly.Address,
- Token: response.Gitaly.Token,
- Features: response.Gitaly.Features,
- }
+ gc := handler.NewGitalyCommand(c.Config, string(commandargs.ReceivePack), response)
request := &pb.SSHReceivePackRequest{
Repository: &response.Gitaly.Repo,
@@ -30,8 +24,8 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie
GitConfigOptions: response.GitConfigOptions,
}
- return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
- ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
+ return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
+ ctx, cancel := gc.PrepareContext(ctx, request.Repository, c.Args.Env)
defer cancel()
rw := c.ReadWriter
diff --git a/internal/command/uploadarchive/gitalycall.go b/internal/command/uploadarchive/gitalycall.go
index 33f0b71..6051d2c 100644
--- a/internal/command/uploadarchive/gitalycall.go
+++ b/internal/command/uploadarchive/gitalycall.go
@@ -13,18 +13,12 @@ import (
)
func (c *Command) performGitalyCall(ctx context.Context, response *accessverifier.Response) error {
- gc := &handler.GitalyCommand{
- Config: c.Config,
- ServiceName: string(commandargs.UploadArchive),
- Address: response.Gitaly.Address,
- Token: response.Gitaly.Token,
- Features: response.Gitaly.Features,
- }
+ gc := handler.NewGitalyCommand(c.Config, string(commandargs.UploadArchive), response)
request := &pb.SSHUploadArchiveRequest{Repository: &response.Gitaly.Repo}
- return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
- ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
+ return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
+ ctx, cancel := gc.PrepareContext(ctx, request.Repository, c.Args.Env)
defer cancel()
rw := c.ReadWriter
diff --git a/internal/command/uploadpack/gitalycall.go b/internal/command/uploadpack/gitalycall.go
index 43ae4f8..96dd823 100644
--- a/internal/command/uploadpack/gitalycall.go
+++ b/internal/command/uploadpack/gitalycall.go
@@ -13,26 +13,20 @@ import (
)
func (c *Command) performGitalyCall(ctx context.Context, response *accessverifier.Response) error {
- gc := &handler.GitalyCommand{
- Config: c.Config,
- ServiceName: string(commandargs.UploadPack),
- Address: response.Gitaly.Address,
- Token: response.Gitaly.Token,
- Features: response.Gitaly.Features,
- }
+ gc := handler.NewGitalyCommand(c.Config, string(commandargs.UploadPack), response)
if response.Gitaly.UseSidechannel {
- gc.DialSidechannel = true
request := &pb.SSHUploadPackWithSidechannelRequest{
Repository: &response.Gitaly.Repo,
GitProtocol: c.Args.Env.GitProtocolVersion,
GitConfigOptions: response.GitConfigOptions,
}
- return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
- ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
+ return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
+ ctx, cancel := gc.PrepareContext(ctx, request.Repository, c.Args.Env)
defer cancel()
+ registry := c.Config.GitalyClient.SidechannelRegistry
rw := c.ReadWriter
return client.UploadPackWithSidechannel(ctx, conn, registry, rw.In, rw.Out, rw.ErrOut, request)
})
@@ -44,8 +38,8 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie
GitConfigOptions: response.GitConfigOptions,
}
- return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
- ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
+ return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
+ ctx, cancel := gc.PrepareContext(ctx, request.Repository, c.Args.Env)
defer cancel()
rw := c.ReadWriter
diff --git a/internal/command/uploadpack/gitalycall_test.go b/internal/command/uploadpack/gitalycall_test.go
index 213b6f9..e0a15ee 100644
--- a/internal/command/uploadpack/gitalycall_test.go
+++ b/internal/command/uploadpack/gitalycall_test.go
@@ -97,15 +97,18 @@ func TestUploadPack_withSidechannel(t *testing.T) {
Env: env,
}
+ ctx := correlation.ContextWithCorrelation(context.Background(), "a-correlation-id")
+ ctx = correlation.ContextWithClientName(ctx, "gitlab-shell-tests")
+
+ cfg := &config.Config{GitlabUrl: url}
+ cfg.GitalyClient.InitSidechannelRegistry(ctx)
+
cmd := &Command{
- Config: &config.Config{GitlabUrl: url},
+ Config: cfg,
Args: args,
ReadWriter: &readwriter.ReadWriter{ErrOut: output, Out: output, In: input},
}
- ctx := correlation.ContextWithCorrelation(context.Background(), "a-correlation-id")
- ctx = correlation.ContextWithClientName(ctx, "gitlab-shell-tests")
-
err := cmd.Execute(ctx)
require.NoError(t, err)
diff --git a/internal/config/config.go b/internal/config/config.go
index efce9f9..c4dc073 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -12,6 +12,7 @@ import (
yaml "gopkg.in/yaml.v2"
"gitlab.com/gitlab-org/gitlab-shell/client"
+ "gitlab.com/gitlab-org/gitlab-shell/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-shell/internal/metrics"
)
@@ -59,6 +60,8 @@ type Config struct {
httpClient *client.HttpClient
httpClientErr error
httpClientOnce sync.Once
+
+ GitalyClient gitaly.Client
}
// The defaults to apply before parsing the config file(s).
diff --git a/internal/gitaly/gitaly.go b/internal/gitaly/gitaly.go
new file mode 100644
index 0000000..a6d8128
--- /dev/null
+++ b/internal/gitaly/gitaly.go
@@ -0,0 +1,133 @@
+package gitaly
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
+ "google.golang.org/grpc"
+
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/labkit/correlation"
+ grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
+ "gitlab.com/gitlab-org/labkit/log"
+ grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
+
+ "gitlab.com/gitlab-org/gitlab-shell/internal/metrics"
+)
+
+type Command struct {
+ ServiceName string
+ Address string
+ Token string
+ DialSidechannel bool
+}
+
+type connectionsCache struct {
+ sync.RWMutex
+
+ connections map[Command]*grpc.ClientConn
+}
+
+type Client struct {
+ SidechannelRegistry *gitalyclient.SidechannelRegistry
+
+ cache connectionsCache
+}
+
+func (c *Client) InitSidechannelRegistry(ctx context.Context) {
+ c.SidechannelRegistry = gitalyclient.NewSidechannelRegistry(log.ContextLogger(ctx))
+}
+
+func (c *Client) GetConnection(ctx context.Context, cmd Command) (*grpc.ClientConn, error) {
+ c.cache.RLock()
+ conn := c.cache.connections[cmd]
+ c.cache.RUnlock()
+
+ if conn != nil {
+ return conn, nil
+ }
+
+ c.cache.Lock()
+ defer c.cache.Unlock()
+
+ if conn := c.cache.connections[cmd]; conn != nil {
+ return conn, nil
+ }
+
+ conn, err := c.newConnection(ctx, cmd)
+ if err != nil {
+ return nil, err
+ }
+
+ if c.cache.connections == nil {
+ c.cache.connections = make(map[Command]*grpc.ClientConn)
+ }
+
+ c.cache.connections[cmd] = conn
+
+ return conn, nil
+}
+
+func (c *Client) newConnection(ctx context.Context, cmd Command) (conn *grpc.ClientConn, err error) {
+ defer func() {
+ label := "ok"
+ if err != nil {
+ label = "fail"
+ }
+ metrics.GitalyConnectionsTotal.WithLabelValues(label).Inc()
+ }()
+
+ if cmd.Address == "" {
+ return nil, fmt.Errorf("no gitaly_address given")
+ }
+
+ serviceName := correlation.ExtractClientNameFromContext(ctx)
+ if serviceName == "" {
+ serviceName = "gitlab-shell-unknown"
+
+ log.WithContextFields(ctx, log.Fields{"service_name": serviceName}).Warn("No gRPC service name specified, defaulting to gitlab-shell-unknown")
+ }
+
+ serviceName = fmt.Sprintf("%s-%s", serviceName, cmd.ServiceName)
+
+ connOpts := client.DefaultDialOpts
+ connOpts = append(
+ connOpts,
+ grpc.WithStreamInterceptor(
+ grpc_middleware.ChainStreamClient(
+ grpctracing.StreamClientTracingInterceptor(),
+ grpc_prometheus.StreamClientInterceptor,
+ grpccorrelation.StreamClientCorrelationInterceptor(
+ grpccorrelation.WithClientName(serviceName),
+ ),
+ ),
+ ),
+
+ grpc.WithUnaryInterceptor(
+ grpc_middleware.ChainUnaryClient(
+ grpctracing.UnaryClientTracingInterceptor(),
+ grpc_prometheus.UnaryClientInterceptor,
+ grpccorrelation.UnaryClientCorrelationInterceptor(
+ grpccorrelation.WithClientName(serviceName),
+ ),
+ ),
+ ),
+ )
+
+ if cmd.Token != "" {
+ connOpts = append(connOpts,
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(cmd.Token)),
+ )
+ }
+
+ if cmd.DialSidechannel {
+ return client.DialSidechannel(ctx, cmd.Address, c.SidechannelRegistry, connOpts)
+ }
+
+ return client.DialContext(ctx, cmd.Address, connOpts)
+}
diff --git a/internal/gitaly/gitaly_test.go b/internal/gitaly/gitaly_test.go
new file mode 100644
index 0000000..9e1ee1a
--- /dev/null
+++ b/internal/gitaly/gitaly_test.go
@@ -0,0 +1,53 @@
+package gitaly
+
+import (
+ "context"
+ "testing"
+
+ "github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab-shell/internal/metrics"
+)
+
+func TestPrometheusMetrics(t *testing.T) {
+ metrics.GitalyConnectionsTotal.Reset()
+
+ c := &Client{}
+
+ cmd := Command{ServiceName: "git-upload-pack", Address: "tcp://localhost:9999"}
+ c.newConnection(context.Background(), cmd)
+ c.newConnection(context.Background(), cmd)
+
+ require.Equal(t, 1, testutil.CollectAndCount(metrics.GitalyConnectionsTotal))
+ require.InDelta(t, 2, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("ok")), 0.1)
+ require.InDelta(t, 0, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("fail")), 0.1)
+
+ cmd = Command{Address: ""}
+ c.newConnection(context.Background(), cmd)
+
+ require.InDelta(t, 2, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("ok")), 0.1)
+ require.InDelta(t, 1, testutil.ToFloat64(metrics.GitalyConnectionsTotal.WithLabelValues("fail")), 0.1)
+}
+
+func TestCachedConnections(t *testing.T) {
+ c := &Client{}
+
+ require.Len(t, c.cache.connections, 0)
+
+ cmd := Command{ServiceName: "git-upload-pack", Address: "tcp://localhost:9999"}
+
+ conn, err := c.GetConnection(context.Background(), cmd)
+ require.NoError(t, err)
+ require.Len(t, c.cache.connections, 1)
+
+ newConn, err := c.GetConnection(context.Background(), cmd)
+ require.NoError(t, err)
+ require.Len(t, c.cache.connections, 1)
+ require.Equal(t, conn, newConn)
+
+ cmd = Command{ServiceName: "git-upload-pack", Address: "tcp://localhost:9998"}
+ _, err = c.GetConnection(context.Background(), cmd)
+ require.NoError(t, err)
+ require.Len(t, c.cache.connections, 2)
+}
diff --git a/internal/handler/exec.go b/internal/handler/exec.go
index 9bd8018..44b02a7 100644
--- a/internal/handler/exec.go
+++ b/internal/handler/exec.go
@@ -6,56 +6,57 @@ import (
"strconv"
"strings"
- grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
- grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
"gitlab.com/gitlab-org/gitlab-shell/internal/config"
+ "gitlab.com/gitlab-org/gitlab-shell/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-shell/internal/gitlabnet/accessverifier"
"gitlab.com/gitlab-org/gitlab-shell/internal/sshenv"
- gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth"
- "gitlab.com/gitlab-org/gitaly/v14/client"
pb "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
- "gitlab.com/gitlab-org/labkit/correlation"
- grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
"gitlab.com/gitlab-org/labkit/log"
- grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
)
// GitalyHandlerFunc implementations are responsible for making
// an appropriate Gitaly call using the provided client and context
// and returning an error from the Gitaly call.
-type GitalyHandlerFunc func(ctx context.Context, client *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error)
+type GitalyHandlerFunc func(ctx context.Context, client *grpc.ClientConn) (int32, error)
type GitalyCommand struct {
- Config *config.Config
- ServiceName string
- Address string
- Token string
- Features map[string]string
- DialSidechannel bool
+ Config *config.Config
+ Response *accessverifier.Response
+ Command gitaly.Command
+}
+
+func NewGitalyCommand(cfg *config.Config, serviceName string, response *accessverifier.Response) *GitalyCommand {
+ gc := gitaly.Command{
+ ServiceName: serviceName,
+ Address: response.Gitaly.Address,
+ Token: response.Gitaly.Token,
+ DialSidechannel: response.Gitaly.UseSidechannel,
+ }
+
+ return &GitalyCommand{Config: cfg, Response: response, Command: gc}
}
// RunGitalyCommand provides a bootstrap for Gitaly commands executed
// through GitLab-Shell. It ensures that logging, tracing and other
// common concerns are configured before executing the `handler`.
func (gc *GitalyCommand) RunGitalyCommand(ctx context.Context, handler GitalyHandlerFunc) error {
- registry := client.NewSidechannelRegistry(log.ContextLogger(ctx))
- conn, err := getConn(ctx, gc, registry)
+ // We leave the connection open for future reuse
+ conn, err := gc.getConn(ctx)
if err != nil {
log.ContextLogger(ctx).WithError(fmt.Errorf("RunGitalyCommand: %v", err)).Error("Failed to get connection to execute Git command")
return err
}
- defer conn.Close()
- childCtx := withOutgoingMetadata(ctx, gc.Features)
+ childCtx := withOutgoingMetadata(ctx, gc.Response.Gitaly.Features)
ctxlog := log.ContextLogger(childCtx)
- exitStatus, err := handler(childCtx, conn, registry)
+ exitStatus, err := handler(childCtx, conn)
if err != nil {
if grpcstatus.Convert(err).Code() == grpccodes.Unavailable {
@@ -72,35 +73,35 @@ func (gc *GitalyCommand) RunGitalyCommand(ctx context.Context, handler GitalyHan
// PrepareContext wraps a given context with a correlation ID and logs the command to
// be run.
-func (gc *GitalyCommand) PrepareContext(ctx context.Context, repository *pb.Repository, response *accessverifier.Response, env sshenv.Env) (context.Context, context.CancelFunc) {
+func (gc *GitalyCommand) PrepareContext(ctx context.Context, repository *pb.Repository, env sshenv.Env) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx)
- gc.LogExecution(ctx, repository, response, env)
+ gc.LogExecution(ctx, repository, env)
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}
- md.Append("key_id", strconv.Itoa(response.KeyId))
- md.Append("key_type", response.KeyType)
- md.Append("user_id", response.UserId)
- md.Append("username", response.Username)
+ md.Append("key_id", strconv.Itoa(gc.Response.KeyId))
+ md.Append("key_type", gc.Response.KeyType)
+ md.Append("user_id", gc.Response.UserId)
+ md.Append("username", gc.Response.Username)
md.Append("remote_ip", env.RemoteAddr)
ctx = metadata.NewOutgoingContext(ctx, md)
return ctx, cancel
}
-func (gc *GitalyCommand) LogExecution(ctx context.Context, repository *pb.Repository, response *accessverifier.Response, env sshenv.Env) {
+func (gc *GitalyCommand) LogExecution(ctx context.Context, repository *pb.Repository, env sshenv.Env) {
fields := log.Fields{
- "command": gc.ServiceName,
+ "command": gc.Command.ServiceName,
"gl_project_path": repository.GlProjectPath,
"gl_repository": repository.GlRepository,
- "user_id": response.UserId,
- "username": response.Username,
+ "user_id": gc.Response.UserId,
+ "username": gc.Response.Username,
"git_protocol": env.GitProtocolVersion,
"remote_ip": env.RemoteAddr,
- "gl_key_type": response.KeyType,
- "gl_key_id": response.KeyId,
+ "gl_key_type": gc.Response.KeyType,
+ "gl_key_id": gc.Response.KeyId,
}
log.WithContextFields(ctx, fields).Info("executing git command")
@@ -118,53 +119,6 @@ func withOutgoingMetadata(ctx context.Context, features map[string]string) conte
return metadata.NewOutgoingContext(ctx, md)
}
-func getConn(ctx context.Context, gc *GitalyCommand, registry *client.SidechannelRegistry) (*grpc.ClientConn, error) {
- if gc.Address == "" {
- return nil, fmt.Errorf("no gitaly_address given")
- }
-
- serviceName := correlation.ExtractClientNameFromContext(ctx)
- if serviceName == "" {
- serviceName = "gitlab-shell-unknown"
-
- log.WithContextFields(ctx, log.Fields{"service_name": serviceName}).Warn("No gRPC service name specified, defaulting to gitlab-shell-unknown")
- }
-
- serviceName = fmt.Sprintf("%s-%s", serviceName, gc.ServiceName)
-
- connOpts := client.DefaultDialOpts
- connOpts = append(
- connOpts,
- grpc.WithStreamInterceptor(
- grpc_middleware.ChainStreamClient(
- grpctracing.StreamClientTracingInterceptor(),
- grpc_prometheus.StreamClientInterceptor,
- grpccorrelation.StreamClientCorrelationInterceptor(
- grpccorrelation.WithClientName(serviceName),
- ),
- ),
- ),
-
- grpc.WithUnaryInterceptor(
- grpc_middleware.ChainUnaryClient(
- grpctracing.UnaryClientTracingInterceptor(),
- grpc_prometheus.UnaryClientInterceptor,
- grpccorrelation.UnaryClientCorrelationInterceptor(
- grpccorrelation.WithClientName(serviceName),
- ),
- ),
- ),
- )
-
- if gc.Token != "" {
- connOpts = append(connOpts,
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(gc.Token)),
- )
- }
-
- if gc.DialSidechannel {
- return client.DialSidechannel(ctx, gc.Address, registry, connOpts)
- }
-
- return client.DialContext(ctx, gc.Address, connOpts)
+func (gc *GitalyCommand) getConn(ctx context.Context) (*grpc.ClientConn, error) {
+ return gc.Config.GitalyClient.GetConnection(ctx, gc.Command)
}
diff --git a/internal/handler/exec_test.go b/internal/handler/exec_test.go
index 784cf19..8f1d5b2 100644
--- a/internal/handler/exec_test.go
+++ b/internal/handler/exec_test.go
@@ -11,15 +11,15 @@ import (
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
- "gitlab.com/gitlab-org/gitaly/v14/client"
pb "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitlab-shell/internal/command/commandargs"
"gitlab.com/gitlab-org/gitlab-shell/internal/config"
"gitlab.com/gitlab-org/gitlab-shell/internal/gitlabnet/accessverifier"
"gitlab.com/gitlab-org/gitlab-shell/internal/sshenv"
)
-func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry) (int32, error) {
- return func(ctx context.Context, client *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
+func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn) (int32, error) {
+ return func(ctx context.Context, client *grpc.ClientConn) (int32, error) {
require.NotNil(t, ctx)
require.NotNil(t, client)
@@ -28,10 +28,13 @@ func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn
}
func TestRunGitalyCommand(t *testing.T) {
- cmd := GitalyCommand{
- Config: &config.Config{},
- Address: "tcp://localhost:9999",
- }
+ cmd := NewGitalyCommand(
+ &config.Config{},
+ string(commandargs.UploadPack),
+ &accessverifier.Response{
+ Gitaly: accessverifier.Gitaly{Address: "tcp://localhost:9999"},
+ },
+ )
err := cmd.RunGitalyCommand(context.Background(), makeHandler(t, nil))
require.NoError(t, err)
@@ -41,6 +44,32 @@ func TestRunGitalyCommand(t *testing.T) {
require.Equal(t, err, expectedErr)
}
+func TestCachingOfGitalyConnections(t *testing.T) {
+ ctx := context.Background()
+ cfg := &config.Config{}
+ cfg.GitalyClient.InitSidechannelRegistry(ctx)
+ response := &accessverifier.Response{
+ Username: "user",
+ Gitaly: accessverifier.Gitaly{
+ Address: "tcp://localhost:9999",
+ Token: "token",
+ UseSidechannel: true,
+ },
+ }
+
+ cmd := NewGitalyCommand(cfg, string(commandargs.UploadPack), response)
+
+ conn, err := cmd.getConn(ctx)
+ require.NoError(t, err)
+
+ // Reuses connection for different users
+ response.Username = "another-user"
+ cmd = NewGitalyCommand(cfg, string(commandargs.UploadPack), response)
+ newConn, err := cmd.getConn(ctx)
+ require.NoError(t, err)
+ require.Equal(t, conn, newConn)
+}
+
func TestMissingGitalyAddress(t *testing.T) {
cmd := GitalyCommand{Config: &config.Config{}}
@@ -49,10 +78,13 @@ func TestMissingGitalyAddress(t *testing.T) {
}
func TestUnavailableGitalyErr(t *testing.T) {
- cmd := GitalyCommand{
- Config: &config.Config{},
- Address: "tcp://localhost:9999",
- }
+ cmd := NewGitalyCommand(
+ &config.Config{},
+ string(commandargs.UploadPack),
+ &accessverifier.Response{
+ Gitaly: accessverifier.Gitaly{Address: "tcp://localhost:9999"},
+ },
+ )
expectedErr := grpcstatus.Error(grpccodes.Unavailable, "error")
err := cmd.RunGitalyCommand(context.Background(), makeHandler(t, expectedErr))
@@ -68,15 +100,20 @@ func TestRunGitalyCommandMetadata(t *testing.T) {
}{
{
name: "gitaly_feature_flags",
- gc: &GitalyCommand{
- Config: &config.Config{},
- Address: "tcp://localhost:9999",
- Features: map[string]string{
- "gitaly-feature-cache_invalidator": "true",
- "other-ff": "true",
- "gitaly-feature-inforef_uploadpack_cache": "false",
+ gc: NewGitalyCommand(
+ &config.Config{},
+ string(commandargs.UploadPack),
+ &accessverifier.Response{
+ Gitaly: accessverifier.Gitaly{
+ Address: "tcp://localhost:9999",
+ Features: map[string]string{
+ "gitaly-feature-cache_invalidator": "true",
+ "other-ff": "true",
+ "gitaly-feature-inforef_uploadpack_cache": "false",
+ },
+ },
},
- },
+ ),
want: map[string]string{
"gitaly-feature-cache_invalidator": "true",
"gitaly-feature-inforef_uploadpack_cache": "false",
@@ -87,7 +124,7 @@ func TestRunGitalyCommandMetadata(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
cmd := tt.gc
- err := cmd.RunGitalyCommand(context.Background(), func(ctx context.Context, _ *grpc.ClientConn, _ *client.SidechannelRegistry) (int32, error) {
+ err := cmd.RunGitalyCommand(context.Background(), func(ctx context.Context, _ *grpc.ClientConn) (int32, error) {
md, exists := metadata.FromOutgoingContext(ctx)
require.True(t, exists)
require.Equal(t, len(tt.want), md.Len())
@@ -117,10 +154,19 @@ func TestPrepareContext(t *testing.T) {
}{
{
name: "client_identity",
- gc: &GitalyCommand{
- Config: &config.Config{},
- Address: "tcp://localhost:9999",
- },
+ gc: NewGitalyCommand(
+ &config.Config{},
+ string(commandargs.UploadPack),
+ &accessverifier.Response{
+ KeyId: 1,
+ KeyType: "key",
+ UserId: "6",
+ Username: "jane.doe",
+ Gitaly: accessverifier.Gitaly{
+ Address: "tcp://localhost:9999",
+ },
+ },
+ ),
env: sshenv.Env{
GitProtocolVersion: "protocol",
IsSSHConnection: true,
@@ -134,12 +180,6 @@ func TestPrepareContext(t *testing.T) {
GlRepository: "project-26",
GlProjectPath: "group/private",
},
- response: &accessverifier.Response{
- KeyId: 1,
- KeyType: "key",
- UserId: "6",
- Username: "jane.doe",
- },
want: map[string]string{
"key_id": "1",
"key_type": "key",
@@ -153,7 +193,7 @@ func TestPrepareContext(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
- ctx, cancel := tt.gc.PrepareContext(ctx, tt.repo, tt.response, tt.env)
+ ctx, cancel := tt.gc.PrepareContext(ctx, tt.repo, tt.env)
defer cancel()
md, exists := metadata.FromOutgoingContext(ctx)
diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go
index c46fb8f..b328445 100644
--- a/internal/metrics/metrics.go
+++ b/internal/metrics/metrics.go
@@ -9,9 +9,10 @@ import (
)
const (
- namespace = "gitlab_shell"
- sshdSubsystem = "sshd"
- httpSubsystem = "http"
+ namespace = "gitlab_shell"
+ sshdSubsystem = "sshd"
+ httpSubsystem = "http"
+ gitalySubsystem = "gitaly"
httpInFlightRequestsMetricName = "in_flight_requests"
httpRequestsTotalMetricName = "requests_total"
@@ -20,6 +21,8 @@ const (
sshdConnectionsInFlightName = "in_flight_connections"
sshdConnectionDuration = "connection_duration_seconds"
sshdHitMaxSessions = "concurrent_limited_sessions_total"
+
+ gitalyConnectionsTotalName = "connections_total"
)
var (
@@ -61,6 +64,16 @@ var (
},
)
+ GitalyConnectionsTotal = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: namespace,
+ Subsystem: gitalySubsystem,
+ Name: gitalyConnectionsTotalName,
+ Help: "Number of Gitaly connections that have been established",
+ },
+ []string{"status"},
+ )
+
// The metrics and the buckets size are similar to the ones we have for handlers in Labkit
// When the MR: https://gitlab.com/gitlab-org/labkit/-/merge_requests/150 is merged,
// these metrics can be refactored out of Gitlab Shell code by using the helper function from Labkit