diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-10-05 18:13:27 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-10-05 18:13:27 +0000 |
commit | ad41744a177d11ead3268b1ec706e9c26f593060 (patch) | |
tree | 3c06d26fa5577a484e6c096a2b6b28ee14461b23 /workhorse | |
parent | a84626f13d61d190b2db5e44caf71b22fc541276 (diff) | |
download | gitlab-ce-ad41744a177d11ead3268b1ec706e9c26f593060.tar.gz |
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse')
-rw-r--r-- | workhorse/internal/dependencyproxy/dependencyproxy.go | 125 | ||||
-rw-r--r-- | workhorse/internal/dependencyproxy/dependencyproxy_test.go | 98 | ||||
-rw-r--r-- | workhorse/internal/upstream/routes.go | 11 | ||||
-rw-r--r-- | workhorse/main_test.go | 98 |
4 files changed, 3 insertions, 329 deletions
diff --git a/workhorse/internal/dependencyproxy/dependencyproxy.go b/workhorse/internal/dependencyproxy/dependencyproxy.go deleted file mode 100644 index ebc310ca7f6..00000000000 --- a/workhorse/internal/dependencyproxy/dependencyproxy.go +++ /dev/null @@ -1,125 +0,0 @@ -package dependencyproxy - -import ( - "context" - "fmt" - "io" - "net" - "net/http" - "time" - - "gitlab.com/gitlab-org/labkit/correlation" - "gitlab.com/gitlab-org/labkit/log" - "gitlab.com/gitlab-org/labkit/tracing" - - "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/senddata" -) - -// httpTransport defines a http.Transport with values -// that are more restrictive than for http.DefaultTransport, -// they define shorter TLS Handshake, and more aggressive connection closing -// to prevent the connection hanging and reduce FD usage -var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 10 * time.Second, - }).DialContext, - MaxIdleConns: 2, - IdleConnTimeout: 30 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 10 * time.Second, - ResponseHeaderTimeout: 30 * time.Second, -})) - -var httpClient = &http.Client{ - Transport: httpTransport, -} - -type Injector struct { - senddata.Prefix - uploadHandler http.Handler -} - -type entryParams struct { - Url string - Header http.Header -} - -type nullResponseWriter struct { - header http.Header - status int -} - -func (nullResponseWriter) Write(p []byte) (int, error) { - return len(p), nil -} - -func (w *nullResponseWriter) Header() http.Header { - return w.header -} - -func (w *nullResponseWriter) WriteHeader(status int) { - if w.status == 0 { - w.status = status - } -} - -func NewInjector() *Injector { - return &Injector{ - Prefix: "send-dependency:", - } -} - -func (p *Injector) SetUploadHandler(uploadHandler http.Handler) { - p.uploadHandler = uploadHandler -} - -func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) { - dependencyResponse, err := p.fetchUrl(r.Context(), sendData) - if err != nil { - helper.Fail500(w, r, err) - return - } - defer dependencyResponse.Body.Close() - if dependencyResponse.StatusCode >= 400 { - w.WriteHeader(dependencyResponse.StatusCode) - io.Copy(w, dependencyResponse.Body) - return - } - - teeReader := io.TeeReader(dependencyResponse.Body, w) - saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader) - if err != nil { - helper.Fail500(w, r, fmt.Errorf("dependency proxy: failed to create request: %w", err)) - } - saveFileRequest.Header = helper.HeaderClone(r.Header) - saveFileRequest.ContentLength = dependencyResponse.ContentLength - - w.Header().Del("Content-Length") - - nrw := &nullResponseWriter{header: http.Header{}} - p.uploadHandler.ServeHTTP(nrw, saveFileRequest) - - if nrw.status != http.StatusOK { - fields := log.Fields{"code": nrw.status} - - helper.Fail500WithFields(nrw, r, fmt.Errorf("dependency proxy: failed to upload file"), fields) - } -} - -func (p *Injector) fetchUrl(ctx context.Context, sendData string) (*http.Response, error) { - var params entryParams - if err := p.Unpack(¶ms, sendData); err != nil { - return nil, fmt.Errorf("dependency proxy: unpack sendData: %v", err) - } - - r, err := http.NewRequestWithContext(ctx, "GET", params.Url, nil) - if err != nil { - return nil, fmt.Errorf("dependency proxy: failed to fetch dependency: %v", err) - } - r.Header = params.Header - - return httpClient.Do(r) -} diff --git a/workhorse/internal/dependencyproxy/dependencyproxy_test.go b/workhorse/internal/dependencyproxy/dependencyproxy_test.go deleted file mode 100644 index 395ca58f90e..00000000000 --- a/workhorse/internal/dependencyproxy/dependencyproxy_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package dependencyproxy - -import ( - "encoding/base64" - "io" - "net/http" - "net/http/httptest" - "strconv" - "testing" - - "github.com/stretchr/testify/require" -) - -type fakeUploadHandler struct { - request *http.Request - body []byte - handler func(w http.ResponseWriter, r *http.Request) -} - -func (f *fakeUploadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - f.request = r - - f.body, _ = io.ReadAll(r.Body) - - f.handler(w, r) -} - -func TestSuccessfullRequest(t *testing.T) { - content := []byte("result") - originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Length", strconv.Itoa(len(content))) - w.Write(content) - })) - - uploadHandler := &fakeUploadHandler{ - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - }, - } - - injector := NewInjector() - injector.SetUploadHandler(uploadHandler) - - response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`) - - require.Equal(t, "/target/upload", uploadHandler.request.URL.Path) - require.Equal(t, int64(6), uploadHandler.request.ContentLength) - - require.Equal(t, content, uploadHandler.body) - - require.Equal(t, 200, response.Code) - require.Equal(t, string(content), response.Body.String()) -} - -func TestIncorrectSendData(t *testing.T) { - response := makeRequest(NewInjector(), "") - - require.Equal(t, 500, response.Code) - require.Equal(t, "Internal server error\n", response.Body.String()) -} - -func TestIncorrectSendDataUrl(t *testing.T) { - response := makeRequest(NewInjector(), `{"Token": "token", "Url": "url"}`) - - require.Equal(t, 500, response.Code) - require.Equal(t, "Internal server error\n", response.Body.String()) -} - -func TestFailedOriginServer(t *testing.T) { - originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(404) - w.Write([]byte("Not found")) - })) - - uploadHandler := &fakeUploadHandler{ - handler: func(w http.ResponseWriter, r *http.Request) { - require.FailNow(t, "the error response must not be uploaded") - }, - } - - injector := NewInjector() - injector.SetUploadHandler(uploadHandler) - - response := makeRequest(injector, `{"Token": "token", "Url": "`+originResourceServer.URL+`/url"}`) - - require.Equal(t, 404, response.Code) - require.Equal(t, "Not found", response.Body.String()) -} - -func makeRequest(injector *Injector, data string) *httptest.ResponseRecorder { - w := httptest.NewRecorder() - r := httptest.NewRequest("GET", "/target", nil) - - sendData := base64.StdEncoding.EncodeToString([]byte(data)) - injector.Inject(w, r, sendData) - - return w -} diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go index 9e92393dcaa..8c85c5144e5 100644 --- a/workhorse/internal/upstream/routes.go +++ b/workhorse/internal/upstream/routes.go @@ -16,7 +16,6 @@ import ( "gitlab.com/gitlab-org/gitlab/workhorse/internal/builds" "gitlab.com/gitlab-org/gitlab/workhorse/internal/channel" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" - "gitlab.com/gitlab-org/gitlab/workhorse/internal/dependencyproxy" "gitlab.com/gitlab-org/gitlab/workhorse/internal/git" "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab/workhorse/internal/imageresizer" @@ -171,7 +170,7 @@ func (ro *routeEntry) isMatch(cleanedPath string, req *http.Request) bool { return ok } -func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config, dependencyProxyInjector *dependencyproxy.Injector) http.Handler { +func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg config.Config) http.Handler { proxier := proxypkg.NewProxy(backend, version, rt) return senddata.SendData( @@ -184,7 +183,6 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf artifacts.SendEntry, sendurl.SendURL, imageresizer.NewResizer(cfg), - dependencyProxyInjector, ) } @@ -195,8 +193,7 @@ func buildProxy(backend *url.URL, version string, rt http.RoundTripper, cfg conf func configureRoutes(u *upstream) { api := u.APIClient static := &staticpages.Static{DocumentRoot: u.DocumentRoot, Exclude: staticExclude} - dependencyProxyInjector := dependencyproxy.NewInjector() - proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config, dependencyProxyInjector) + proxy := buildProxy(u.Backend, u.Version, u.RoundTripper, u.Config) cableProxy := proxypkg.NewProxy(u.CableBackend, u.Version, u.CableRoundTripper) assetsNotFoundHandler := NotFoundUnless(u.DevelopmentMode, proxy) @@ -210,7 +207,7 @@ func configureRoutes(u *upstream) { } signingTripper := secret.NewRoundTripper(u.RoundTripper, u.Version) - signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config, dependencyProxyInjector) + signingProxy := buildProxy(u.Backend, u.Version, signingTripper, u.Config) preparers := createUploadPreparers(u.Config) uploadPath := path.Join(u.DocumentRoot, "uploads/tmp") @@ -218,8 +215,6 @@ func configureRoutes(u *upstream) { ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", uploadAccelerateProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout) ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration) - dependencyProxyInjector.SetUploadHandler(upload.BodyUploader(api, signingProxy, preparers.packages)) - // Serve static files or forward the requests defaultUpstream := static.ServeExisting( u.URLPrefix, diff --git a/workhorse/main_test.go b/workhorse/main_test.go index f90a07f1d7d..6e61e2fc65a 100644 --- a/workhorse/main_test.go +++ b/workhorse/main_test.go @@ -934,101 +934,3 @@ func TestHealthChecksUnreachable(t *testing.T) { }) } } - -func TestDependencyProxyInjector(t *testing.T) { - token := "token" - bodyLength := 4096 * 12 - expectedBody := strings.Repeat("p", bodyLength) - - testCases := []struct { - desc string - contentLength int - readSize int - finalizeHandler func(*testing.T, http.ResponseWriter) - }{ - { - desc: "the uploading successfully finalized", - contentLength: bodyLength, - readSize: bodyLength, - finalizeHandler: func(t *testing.T, w http.ResponseWriter) { - w.WriteHeader(200) - }, - }, { - desc: "the uploading failed", - contentLength: bodyLength, - readSize: bodyLength, - finalizeHandler: func(t *testing.T, w http.ResponseWriter) { - w.WriteHeader(500) - }, - }, { - desc: "the origin resource server returns partial response", - contentLength: bodyLength + 1000, - readSize: bodyLength, - finalizeHandler: func(t *testing.T, _ http.ResponseWriter) { - t.Fatal("partial file must not be saved") - }, - }, { - desc: "a user does not read the whole file", - contentLength: bodyLength, - readSize: bodyLength - 1000, - finalizeHandler: func(t *testing.T, _ http.ResponseWriter) { - t.Fatal("partial file must not be saved") - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - originResource := "/origin_resource" - - originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, originResource, r.URL.String()) - - w.Header().Set("Content-Length", strconv.Itoa(tc.contentLength)) - - _, err := io.WriteString(w, expectedBody) - require.NoError(t, err) - })) - defer originResourceServer.Close() - - originResourceUrl := originResourceServer.URL + originResource - - ts := testhelper.TestServerWithHandler(regexp.MustCompile(`.`), func(w http.ResponseWriter, r *http.Request) { - switch r.URL.String() { - case "/base": - params := `{"Url": "` + originResourceUrl + `", "Token": "` + token + `"}` - w.Header().Set("Gitlab-Workhorse-Send-Data", `send-dependency:`+base64.URLEncoding.EncodeToString([]byte(params))) - case "/base/upload/authorize": - w.Header().Set("Content-Type", api.ResponseContentType) - _, err := fmt.Fprintf(w, `{"TempPath":"%s"}`, scratchDir) - require.NoError(t, err) - case "/base/upload": - tc.finalizeHandler(t, w) - default: - t.Fatalf("unexpected request: %s", r.URL) - } - }) - defer ts.Close() - - ws := startWorkhorseServer(ts.URL) - defer ws.Close() - - req, err := http.NewRequest("GET", ws.URL+"/base", nil) - require.NoError(t, err) - - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err) - defer resp.Body.Close() - - body := make([]byte, tc.readSize) - _, err = io.ReadFull(resp.Body, body) - require.NoError(t, err) - - require.NoError(t, resp.Body.Close()) // Client closes connection - ws.Close() // Wait for server handler to return - - require.Equal(t, 200, resp.StatusCode, "status code") - require.Equal(t, expectedBody[0:tc.readSize], string(body), "response body") - }) - } -} |