summaryrefslogtreecommitdiff
path: root/workhorse
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2021-10-05 18:13:27 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2021-10-05 18:13:27 +0000
commitad41744a177d11ead3268b1ec706e9c26f593060 (patch)
tree3c06d26fa5577a484e6c096a2b6b28ee14461b23 /workhorse
parenta84626f13d61d190b2db5e44caf71b22fc541276 (diff)
downloadgitlab-ce-ad41744a177d11ead3268b1ec706e9c26f593060.tar.gz
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse')
-rw-r--r--workhorse/internal/dependencyproxy/dependencyproxy.go125
-rw-r--r--workhorse/internal/dependencyproxy/dependencyproxy_test.go98
-rw-r--r--workhorse/internal/upstream/routes.go11
-rw-r--r--workhorse/main_test.go98
4 files changed, 3 insertions, 329 deletions
diff --git a/workhorse/internal/dependencyproxy/dependencyproxy.go b/workhorse/internal/dependencyproxy/dependencyproxy.go
deleted file mode 100644
index ebc310ca7f6..00000000000
--- a/workhorse/internal/dependencyproxy/dependencyproxy.go
+++ /dev/null
@@ -1,125 +0,0 @@
-package dependencyproxy
-
-import (
- "context"
- "fmt"
- "io"
- "net"
- "net/http"
- "time"
-
- "gitlab.com/gitlab-org/labkit/correlation"
- "gitlab.com/gitlab-org/labkit/log"
- "gitlab.com/gitlab-org/labkit/tracing"
-
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/senddata"
-)
-
-// httpTransport defines a http.Transport with values
-// that are more restrictive than for http.DefaultTransport,
-// they define shorter TLS Handshake, and more aggressive connection closing
-// to prevent the connection hanging and reduce FD usage
-var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{
- Proxy: http.ProxyFromEnvironment,
- DialContext: (&net.Dialer{
- Timeout: 30 * time.Second,
- KeepAlive: 10 * time.Second,
- }).DialContext,
- MaxIdleConns: 2,
- IdleConnTimeout: 30 * time.Second,
- TLSHandshakeTimeout: 10 * time.Second,
- ExpectContinueTimeout: 10 * time.Second,
- ResponseHeaderTimeout: 30 * time.Second,
-}))
-
-var httpClient = &http.Client{
- Transport: httpTransport,
-}
-
-type Injector struct {
- senddata.Prefix
- uploadHandler http.Handler
-}
-
-type entryParams struct {
- Url string
- Header http.Header
-}
-
-type nullResponseWriter struct {
- header http.Header
- status int
-}
-
-func (nullResponseWriter) Write(p []byte) (int, error) {
- return len(p), nil
-}
-
-func (w *nullResponseWriter) Header() http.Header {
- return w.header
-}
-
-func (w *nullResponseWriter) WriteHeader(status int) {
- if w.status == 0 {
- w.status = status
- }
-}
-
-func NewInjector() *Injector {
- return &Injector{
- Prefix: "send-dependency:",
- }
-}
-
-func (p *Injector) SetUploadHandler(uploadHandler http.Handler) {
- p.uploadHandler = uploadHandler
-}
-
-func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
- dependencyResponse, err := p.fetchUrl(r.Context(), sendData)
- if err != nil {
- helper.Fail500(w, r, err)
- return
- }
- defer dependencyResponse.Body.Close()
- if dependencyResponse.StatusCode >= 400 {
- w.WriteHeader(dependencyResponse.StatusCode)
- io.Copy(w, dependencyResponse.Body)
- return
- }
-
- teeReader := io.TeeReader(dependencyResponse.Body, w)
- saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader)
- if err != nil {
- helper.Fail500(w, r, fmt.Errorf("dependency proxy: failed to create request: %w", err))
- }
- saveFileRequest.Header = helper.HeaderClone(r.Header)
- saveFileRequest.ContentLength = dependencyResponse.ContentLength
-
- w.Header().Del("Content-Length")
-
- nrw := &nullResponseWriter{header: http.Header{}}
- p.uploadHandler.ServeHTTP(nrw, saveFileRequest)
-
- if nrw.status != http.StatusOK {
- fields := log.Fields{"code": nrw.status}
-
- helper.Fail500WithFields(nrw, r, fmt.Errorf("dependency proxy: failed to upload file"), fields)
- }
-}
-
-func (p *Injector) fetchUrl(ctx context.Context, sendData string) (*http.Response, error) {
- var params entryParams
- if err := p.Unpack(&params, sendData); err != nil {
- return nil, fmt.Errorf("dependency proxy: unpack sendData: %v", err)
- }
-
- r, err := http.NewRequestWithContext(ctx, "GET", params.Url, nil)
- if err != nil {
- return nil, fmt.Errorf("dependency proxy: failed to fetch dependency: %v", err)
- }
- r.Header = params.Header
-
- return httpClient.Do(r)
-}
diff --git a/workhorse/internal/dependencyproxy/dependencyproxy_test.go b/workhorse/internal/dependencyproxy/dependencyproxy_test.go
deleted file mode 100644
index 395ca58f90e..00000000000
--- a/workhorse/internal/dependencyproxy/dependencyproxy_test.go
+++ /dev/null
@@ -1,98 +0,0 @@
-package dependencyproxy
-
-import (
- "encoding/base64"
- "io"
- "net/http"
- "net/http/httptest"
- "strconv"
- "testing"
-
- "github.com/stretchr/testify/require"
-)
-
-type fakeUploadHandler struct {
- request *http.Request
- body []byte
- handler func(w http.ResponseWriter, r *http.Request)
-}
-
-func (f *fakeUploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- f.request = r
-
- f.body, _ = io.ReadAll(r.Body)
-
- f.handler(w, r)
-}
-
-func TestSuccessfullRequest(t *testing.T) {
- content := []byte("result")
- originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Length", strconv.Itoa(len(content)))
- w.Write(content)
- }))
-
- uploadHandler := &fakeUploadHandler{
- handler: func(w http.ResponseWriter, r *http.Request) {
- w.WriteHeader(200)
- },
- }
-
- injector := NewInjector()
- injector.SetUploadHandler(uploadHandler)
-
- response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`)
-
- require.Equal(t, "/target/upload", uploadHandler.request.URL.Path)
- require.Equal(t, int64(6), uploadHandler.request.ContentLength)
-
- require.Equal(t, content, uploadHandler.body)
-
- require.Equal(t, 200, response.Code)
- require.Equal(t, string(content), response.Body.String())
-}
-
-func TestIncorrectSendData(t *testing.T) {
- response := makeRequest(NewInjector(), "")
-
- require.Equal(t, 500, response.Code)
- require.Equal(t, "Internal server error\n", response.Body.String())
-}
-
-func TestIncorrectSendDataUrl(t *testing.T) {
- response := makeRequest(NewInjector(), `{"Token": "token", "Url": "url"}`)
-
- require.Equal(t, 500, response.Code)
- require.Equal(t, "Internal server error\n", response.Body.String())
-}
-
-func TestFailedOriginServer(t *testing.T) {
- originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- w.WriteHeader(404)
- w.Write([]byte("Not found"))
- }))
-
- uploadHandler := &fakeUploadHandler{
- handler: func(w http.ResponseWriter, r *http.Request) {
- require.FailNow(t, "the error response must not be uploaded")
- },
- }
-
- injector := NewInjector()
- injector.SetUploadHandler(uploadHandler)
-
- response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`)
-
- require.Equal(t, 404, response.Code)
- require.Equal(t, "Not found", response.Body.String())
-}
-
-func makeRequest(injector *Injector, data string) *httptest.ResponseRecorder {
- w := httptest.NewRecorder()
- r := httptest.NewRequest("GET", "/target", nil)
-
- sendData := base64.StdEncoding.EncodeToString([]byte(data))
- injector.Inject(w, r, sendData)
-
- return w
-}
diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go
index 9e92393dcaa..8c85c5144e5 100644
--- a/workhorse/internal/upstream/routes.go
+++ b/workhorse/internal/upstream/routes.go
@@ -16,7 +16,6 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/builds"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/channel"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/dependencyproxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/imageresizer"
@@ -171,7 +170,7 @@ func (ro *routeEntry) isMatch(cleanedPath string, req *http.Request) bool {
return ok
}
-func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config, dependencyProxyInjector *dependencyproxy.Injector) http.Handler {
+func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config) http.Handler {
proxier := proxypkg.NewProxy(backend, version, rt)
return senddata.SendData(
@@ -184,7 +183,6 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf
artifacts.SendEntry,
sendurl.SendURL,
imageresizer.NewResizer(cfg),
- dependencyProxyInjector,
)
}
@@ -195,8 +193,7 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf
func configureRoutes(u *upstream) {
api := u.APIClient
static := &staticpages.Static{DocumentRoot: u.DocumentRoot, Exclude: staticExclude}
- dependencyProxyInjector := dependencyproxy.NewInjector()
- proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config, dependencyProxyInjector)
+ proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config)
cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper)
assetsNotFoundHandler := NotFoundUnless(u.DevelopmentMode, proxy)
@@ -210,7 +207,7 @@ func configureRoutes(u *upstream) {
}
signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version)
- signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config, dependencyProxyInjector)
+ signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config)
preparers := createUploadPreparers(u.Config)
uploadPath := path.Join(u.DocumentRoot, "uploads/tmp")
@@ -218,8 +215,6 @@ func configureRoutes(u *upstream) {
ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
- dependencyProxyInjector.SetUploadHandler(upload.BodyUploader(api, signingProxy, preparers.packages))
-
// Serve static files or forward the requests
defaultUpstream := static.ServeExisting(
u.URLPrefix,
diff --git a/workhorse/main_test.go b/workhorse/main_test.go
index f90a07f1d7d..6e61e2fc65a 100644
--- a/workhorse/main_test.go
+++ b/workhorse/main_test.go
@@ -934,101 +934,3 @@ func TestHealthChecksUnreachable(t *testing.T) {
})
}
}
-
-func TestDependencyProxyInjector(t *testing.T) {
- token := "token"
- bodyLength := 4096 * 12
- expectedBody := strings.Repeat("p", bodyLength)
-
- testCases := []struct {
- desc string
- contentLength int
- readSize int
- finalizeHandler func(*testing.T, http.ResponseWriter)
- }{
- {
- desc: "the uploading successfully finalized",
- contentLength: bodyLength,
- readSize: bodyLength,
- finalizeHandler: func(t *testing.T, w http.ResponseWriter) {
- w.WriteHeader(200)
- },
- }, {
- desc: "the uploading failed",
- contentLength: bodyLength,
- readSize: bodyLength,
- finalizeHandler: func(t *testing.T, w http.ResponseWriter) {
- w.WriteHeader(500)
- },
- }, {
- desc: "the origin resource server returns partial response",
- contentLength: bodyLength + 1000,
- readSize: bodyLength,
- finalizeHandler: func(t *testing.T, _ http.ResponseWriter) {
- t.Fatal("partial file must not be saved")
- },
- }, {
- desc: "a user does not read the whole file",
- contentLength: bodyLength,
- readSize: bodyLength - 1000,
- finalizeHandler: func(t *testing.T, _ http.ResponseWriter) {
- t.Fatal("partial file must not be saved")
- },
- },
- }
-
- for _, tc := range testCases {
- t.Run(tc.desc, func(t *testing.T) {
- originResource := "/origin_resource"
-
- originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- require.Equal(t, originResource, r.URL.String())
-
- w.Header().Set("Content-Length", strconv.Itoa(tc.contentLength))
-
- _, err := io.WriteString(w, expectedBody)
- require.NoError(t, err)
- }))
- defer originResourceServer.Close()
-
- originResourceUrl := originResourceServer.URL + originResource
-
- ts := testhelper.TestServerWithHandler(regexp.MustCompile(`.`), func(w http.ResponseWriter, r *http.Request) {
- switch r.URL.String() {
- case "/base":
- params := `{"Url": "` + originResourceUrl + `", "Token": "` + token + `"}`
- w.Header().Set("Gitlab-Workhorse-Send-Data", `send-dependency:`+base64.URLEncoding.EncodeToString([]byte(params)))
- case "/base/upload/authorize":
- w.Header().Set("Content-Type", api.ResponseContentType)
- _, err := fmt.Fprintf(w, `{"TempPath":"%s"}`, scratchDir)
- require.NoError(t, err)
- case "/base/upload":
- tc.finalizeHandler(t, w)
- default:
- t.Fatalf("unexpected request: %s", r.URL)
- }
- })
- defer ts.Close()
-
- ws := startWorkhorseServer(ts.URL)
- defer ws.Close()
-
- req, err := http.NewRequest("GET", ws.URL+"/base", nil)
- require.NoError(t, err)
-
- resp, err := http.DefaultClient.Do(req)
- require.NoError(t, err)
- defer resp.Body.Close()
-
- body := make([]byte, tc.readSize)
- _, err = io.ReadFull(resp.Body, body)
- require.NoError(t, err)
-
- require.NoError(t, resp.Body.Close()) // Client closes connection
- ws.Close() // Wait for server handler to return
-
- require.Equal(t, 200, resp.StatusCode, "status code")
- require.Equal(t, expectedBody[0:tc.readSize], string(body), "response body")
- })
- }
-}