summaryrefslogtreecommitdiff
path: root/swift/proxy/controllers/container.py
blob: 4102d652ab35549412c4f5a5afe845dd04f85eea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import random

import six
from six.moves.urllib.parse import unquote

from swift.common.memcached import MemcacheConnectionError
from swift.common.utils import public, private, csv_append, Timestamp, \
    config_true_value, ShardRange, cache_from_env, filter_shard_ranges
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
from swift.common.http import HTTP_ACCEPTED, is_success
from swift.common.request_helpers import get_sys_meta_prefix, get_param, \
    constrain_req_limit, validate_container_params
from swift.proxy.controllers.base import Controller, delay_denial, \
    cors_validation, set_info_cache, clear_info_cache, _get_info_from_caches, \
    record_cache_op_metrics, get_cache_key, headers_from_container_info, \
    update_headers
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
    HTTPServiceUnavailable, str_to_wsgi, wsgi_to_str, Response


class ContainerController(Controller):
    """WSGI controller for container requests"""
    server_type = 'Container'

    # Ensure these are all lowercase
    pass_through_headers = ['x-container-read', 'x-container-write',
                            'x-container-sync-key', 'x-container-sync-to',
                            'x-versions-location']

    def __init__(self, app, account_name, container_name, **kwargs):
        super(ContainerController, self).__init__(app)
        self.account_name = unquote(account_name)
        self.container_name = unquote(container_name)

    def _x_remove_headers(self):
        st = self.server_type.lower()
        return ['x-remove-%s-read' % st,
                'x-remove-%s-write' % st,
                'x-remove-versions-location',
                'x-remove-%s-sync-key' % st,
                'x-remove-%s-sync-to' % st]

    def _convert_policy_to_index(self, req):
        """
        Helper method to convert a policy name (from a request from a client)
        to a policy index (for a request to a backend).

        :param req: incoming request
        """
        policy_name = req.headers.get('X-Storage-Policy')
        if not policy_name:
            return
        policy = POLICIES.get_by_name(policy_name)
        if not policy:
            raise HTTPBadRequest(request=req,
                                 content_type="text/plain",
                                 body=("Invalid %s '%s'"
                                       % ('X-Storage-Policy', policy_name)))
        if policy.is_deprecated:
            body = 'Storage Policy %r is deprecated' % (policy.name)
            raise HTTPBadRequest(request=req, body=body)
        return int(policy)

    def clean_acls(self, req):
        if 'swift.clean_acl' in req.environ:
            for header in ('x-container-read', 'x-container-write'):
                if header in req.headers:
                    try:
                        req.headers[header] = \
                            req.environ['swift.clean_acl'](header,
                                                           req.headers[header])
                    except ValueError as err:
                        return HTTPBadRequest(request=req, body=str(err))
        return None

    def _clear_container_info_cache(self, req):
        clear_info_cache(self.app, req.environ,
                         self.account_name, self.container_name)
        clear_info_cache(self.app, req.environ,
                         self.account_name, self.container_name, 'listing')
        # TODO: should we also purge updating shards from cache?

    def _GETorHEAD_from_backend(self, req):
        part = self.app.container_ring.get_part(
            self.account_name, self.container_name)
        concurrency = self.app.container_ring.replica_count \
            if self.app.get_policy_options(None).concurrent_gets else 1
        node_iter = self.app.iter_nodes(self.app.container_ring, part,
                                        self.logger, req)
        resp = self.GETorHEAD_base(
            req, 'Container', node_iter, part,
            req.swift_entity_path, concurrency)
        return resp

    def _make_shard_ranges_response_body(self, req, shard_range_dicts):
        # filter shard ranges according to request constraints and return a
        # serialised list of shard ranges
        marker = get_param(req, 'marker', '')
        end_marker = get_param(req, 'end_marker')
        includes = get_param(req, 'includes')
        reverse = config_true_value(get_param(req, 'reverse'))
        if reverse:
            marker, end_marker = end_marker, marker
        shard_ranges = [
            ShardRange.from_dict(shard_range)
            for shard_range in shard_range_dicts]
        shard_ranges = filter_shard_ranges(shard_ranges, includes, marker,
                                           end_marker)
        if reverse:
            shard_ranges.reverse()
        return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')

    def _get_shard_ranges_from_cache(self, req, headers):
        infocache = req.environ.setdefault('swift.infocache', {})
        memcache = cache_from_env(req.environ, True)
        cache_key = get_cache_key(self.account_name,
                                  self.container_name,
                                  shard='listing')

        resp_body = None
        cached_range_dicts = infocache.get(cache_key)
        if cached_range_dicts:
            cache_state = 'infocache_hit'
            resp_body = self._make_shard_ranges_response_body(
                req, cached_range_dicts)
        elif memcache:
            skip_chance = \
                self.app.container_listing_shard_ranges_skip_cache
            if skip_chance and random.random() < skip_chance:
                cache_state = 'skip'
            else:
                try:
                    cached_range_dicts = memcache.get(
                        cache_key, raise_on_error=True)
                    if cached_range_dicts:
                        cache_state = 'hit'
                        resp_body = self._make_shard_ranges_response_body(
                            req, cached_range_dicts)
                    else:
                        cache_state = 'miss'
                except MemcacheConnectionError:
                    cache_state = 'error'

        if resp_body is None:
            resp = None
        else:
            # shard ranges can be returned from cache
            infocache[cache_key] = tuple(cached_range_dicts)
            self.logger.debug('Found %d shards in cache for %s',
                              len(cached_range_dicts), req.path_qs)
            headers.update({'x-backend-record-type': 'shard',
                            'x-backend-cached-results': 'true'})
            # mimic GetOrHeadHandler.get_working_response...
            # note: server sets charset with content_type but proxy
            # GETorHEAD_base does not, so don't set it here either
            resp = Response(request=req, body=resp_body)
            update_headers(resp, headers)
            resp.last_modified = Timestamp(headers['x-put-timestamp']).ceil()
            resp.environ['swift_x_timestamp'] = headers.get('x-timestamp')
            resp.accept_ranges = 'bytes'
            resp.content_type = 'application/json'

        return resp, cache_state

    def _store_shard_ranges_in_cache(self, req, resp):
        # parse shard ranges returned from backend, store them in infocache and
        # memcache, and return a list of dicts
        cache_key = get_cache_key(self.account_name, self.container_name,
                                  shard='listing')
        data = self._parse_listing_response(req, resp)
        backend_shard_ranges = self._parse_shard_ranges(req, data, resp)
        if backend_shard_ranges is None:
            return None

        cached_range_dicts = [dict(sr) for sr in backend_shard_ranges]
        if resp.headers.get('x-backend-sharding-state') == 'sharded':
            # cache in infocache even if no shard ranges returned; this
            # is unexpected but use that result for this request
            infocache = req.environ.setdefault('swift.infocache', {})
            infocache[cache_key] = tuple(cached_range_dicts)
            memcache = cache_from_env(req.environ, True)
            if memcache and cached_range_dicts:
                # cache in memcache only if shard ranges as expected
                self.logger.debug('Caching %d shards for %s',
                                  len(cached_range_dicts), req.path_qs)
                memcache.set(cache_key, cached_range_dicts,
                             time=self.app.recheck_listing_shard_ranges)
        return cached_range_dicts

    def _get_shard_ranges_from_backend(self, req):
        # Make a backend request for shard ranges. The response is cached and
        # then returned as a list of dicts.
        # Note: We instruct the backend server to ignore name constraints in
        # request params if returning shard ranges so that the response can
        # potentially be cached. Only do this if the container state is
        # 'sharded'. We don't attempt to cache shard ranges for a 'sharding'
        # container as they may include the container itself as a 'gap filler'
        # for shard ranges that have not yet cleaved; listings from 'gap
        # filler' shard ranges are likely to become stale as the container
        # continues to cleave objects to its shards and caching them is
        # therefore more likely to result in stale or incomplete listings on
        # subsequent container GETs.
        req.headers['x-backend-override-shard-name-filter'] = 'sharded'
        resp = self._GETorHEAD_from_backend(req)

        sharding_state = resp.headers.get(
            'x-backend-sharding-state', '').lower()
        resp_record_type = resp.headers.get(
            'x-backend-record-type', '').lower()
        complete_listing = config_true_value(resp.headers.pop(
            'x-backend-override-shard-name-filter', False))
        # given that we sent 'x-backend-override-shard-name-filter=sharded' we
        # should only receive back 'x-backend-override-shard-name-filter=true'
        # if the sharding state is 'sharded', but check them both anyway...
        if (resp_record_type == 'shard' and
                sharding_state == 'sharded' and
                complete_listing):
            cached_range_dicts = self._store_shard_ranges_in_cache(req, resp)
            if cached_range_dicts:
                resp.body = self._make_shard_ranges_response_body(
                    req, cached_range_dicts)
        return resp

    def _record_shard_listing_cache_metrics(
            self, cache_state, resp, resp_record_type, info):
        """
        Record a single cache operation by shard listing into its
        corresponding metrics.

        :param  cache_state: the state of this cache operation, includes
                  infocache_hit, memcache hit, miss, error, skip, force_skip
                  and disabled.
        :param  resp: the response from either backend or cache hit.
        :param  resp_record_type: indicates the type of response record, e.g.
                  'shard' for shard range listing, 'object' for object listing.
        :param  info: the cached container info.
        """
        should_record = False
        if is_success(resp.status_int):
            if resp_record_type == 'shard':
                # Here we either got shard ranges by hitting the cache, or we
                # got shard ranges from backend successfully for cache_state
                # other than cache hit. Note: it's possible that later we find
                # that shard ranges can't be parsed.
                should_record = True
        elif (info and is_success(info['status'])
                and info.get('sharding_state') == 'sharded'):
            # The shard listing request failed when getting shard ranges from
            # backend.
            # Note: In the absence of 'info' we cannot assume the container is
            # sharded, so we don't increment the metric if 'info' is None. Even
            # when we have valid info, we can't be sure that the container is
            # sharded, but we assume info was correct and increment the failure
            # metrics.
            should_record = True
        # else:
        #  The request failed, but in the absence of info we cannot assume
        #  the container is sharded, so we don't increment the metric.

        if should_record:
            record_cache_op_metrics(
                self.logger, 'shard_listing', cache_state, resp)

    def _GET_using_cache(self, req, info):
        # It may be possible to fulfil the request from cache: we only reach
        # here if request record_type is 'shard' or 'auto', so if the container
        # state is 'sharded' then look for cached shard ranges. However, if
        # X-Newest is true then we always fetch from the backend servers.
        headers = headers_from_container_info(info)
        if config_true_value(req.headers.get('x-newest', False)):
            cache_state = 'force_skip'
            self.logger.debug(
                'Skipping shard cache lookup (x-newest) for %s', req.path_qs)
        elif (headers and info and is_success(info['status']) and
                info.get('sharding_state') == 'sharded'):
            # container is sharded so we may have the shard ranges cached; only
            # use cached values if all required backend headers available.
            resp, cache_state = self._get_shard_ranges_from_cache(req, headers)
            if resp:
                return resp, cache_state
        else:
            # container metadata didn't support a cache lookup, this could be
            # the case that container metadata was not in cache and we don't
            # know if the container was sharded, or the case that the sharding
            # state in metadata indicates the container was unsharded.
            cache_state = 'bypass'
        # The request was not fulfilled from cache so send to backend server.
        return self._get_shard_ranges_from_backend(req), cache_state

    def GETorHEAD(self, req):
        """Handler for HTTP GET/HEAD requests."""
        ai = self.account_info(self.account_name, req)
        auto_account = self.account_name.startswith(
            self.app.auto_create_account_prefix)
        if not (auto_account or ai[1]):
            if 'swift.authorize' in req.environ:
                aresp = req.environ['swift.authorize'](req)
                if aresp:
                    # Don't cache this. It doesn't reflect the state of the
                    # container, just that the user can't access it.
                    return aresp
            # Don't cache this. The lack of account will be cached, and that
            # is sufficient.
            return HTTPNotFound(request=req)

        # The read-modify-write of params here is because the Request.params
        # getter dynamically generates a dict of params from the query string;
        # the setter must be called for new params to update the query string.
        params = req.params
        params['format'] = 'json'
        # x-backend-record-type may be sent via internal client e.g. from
        # the sharder or in probe tests
        record_type = req.headers.get('X-Backend-Record-Type', '').lower()
        if not record_type:
            record_type = 'auto'
            req.headers['X-Backend-Record-Type'] = 'auto'
            params['states'] = 'listing'
        req.params = params

        memcache = cache_from_env(req.environ, True)
        if (req.method == 'GET'
                and get_param(req, 'states') == 'listing'
                and record_type != 'object'):
            may_get_listing_shards = True
            info = _get_info_from_caches(self.app, req.environ,
                                         self.account_name,
                                         self.container_name)
        else:
            info = None
            may_get_listing_shards = False

        sr_cache_state = None
        if (may_get_listing_shards and
                self.app.recheck_listing_shard_ranges > 0
                and memcache
                and not config_true_value(
                    req.headers.get('x-backend-include-deleted', False))):
            # This GET might be served from cache or might populate cache.
            # 'x-backend-include-deleted' is not usually expected in requests
            # to the proxy (it is used from sharder to container servers) but
            # it is included in the conditions just in case because we don't
            # cache deleted shard ranges.
            resp, sr_cache_state = self._GET_using_cache(req, info)
        else:
            resp = self._GETorHEAD_from_backend(req)
            if may_get_listing_shards and (
                    not self.app.recheck_listing_shard_ranges or not memcache):
                sr_cache_state = 'disabled'

        resp_record_type = resp.headers.get('X-Backend-Record-Type', '')
        if sr_cache_state:
            self._record_shard_listing_cache_metrics(
                sr_cache_state, resp, resp_record_type, info)

        if all((req.method == "GET", record_type == 'auto',
               resp_record_type.lower() == 'shard')):
            resp = self._get_from_shards(req, resp)

        if not config_true_value(
                resp.headers.get('X-Backend-Cached-Results')):
            # Cache container metadata. We just made a request to a storage
            # node and got up-to-date information for the container.
            resp.headers['X-Backend-Recheck-Container-Existence'] = str(
                self.app.recheck_container_existence)
            set_info_cache(self.app, req.environ, self.account_name,
                           self.container_name, resp)
        if 'swift.authorize' in req.environ:
            req.acl = resp.headers.get('x-container-read')
            aresp = req.environ['swift.authorize'](req)
            if aresp:
                # Don't cache this. It doesn't reflect the state of the
                # container, just that the user can't access it.
                return aresp
        if not req.environ.get('swift_owner', False):
            for key in self.app.swift_owner_headers:
                if key in resp.headers:
                    del resp.headers[key]
        # Expose sharding state in reseller requests
        if req.environ.get('reseller_request', False):
            resp.headers['X-Container-Sharding'] = config_true_value(
                resp.headers.get(get_sys_meta_prefix('container') + 'Sharding',
                                 'False'))
        return resp

    def _get_from_shards(self, req, resp):
        # Construct listing using shards described by the response body.
        # The history of containers that have returned shard ranges is
        # maintained in the request environ so that loops can be avoided by
        # forcing an object listing if the same container is visited again.
        # This can happen in at least two scenarios:
        #   1. a container has filled a gap in its shard ranges with a
        #      shard range pointing to itself
        #   2. a root container returns a (stale) shard range pointing to a
        #      shard that has shrunk into the root, in which case the shrunken
        #      shard may return the root's shard range.
        shard_listing_history = req.environ.setdefault(
            'swift.shard_listing_history', [])
        policy_key = 'X-Backend-Storage-Policy-Index'
        if not (shard_listing_history or policy_key in req.headers):
            # We're handling the original request to the root container: set
            # the root policy index in the request, unless it is already set,
            # so that shards will return listings for that policy index.
            # Note: we only get here if the root responded with shard ranges,
            # or if the shard ranges were cached and the cached root container
            # info has sharding_state==sharded; in both cases we can assume
            # that the response is "modern enough" to include
            # 'X-Backend-Storage-Policy-Index'.
            req.headers[policy_key] = resp.headers[policy_key]
        shard_listing_history.append((self.account_name, self.container_name))
        shard_ranges = [ShardRange.from_dict(data)
                        for data in json.loads(resp.body)]
        self.logger.debug('GET listing from %s shards for: %s',
                          len(shard_ranges), req.path_qs)
        if not shard_ranges:
            # can't find ranges or there was a problem getting the ranges. So
            # return what we have.
            return resp

        objects = []
        req_limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
        params = req.params.copy()
        params.pop('states', None)
        req.headers.pop('X-Backend-Record-Type', None)
        reverse = config_true_value(params.get('reverse'))
        marker = wsgi_to_str(params.get('marker'))
        end_marker = wsgi_to_str(params.get('end_marker'))
        prefix = wsgi_to_str(params.get('prefix'))

        limit = req_limit
        all_resp_status = []
        for i, shard_range in enumerate(shard_ranges):
            params['limit'] = limit
            # Always set marker to ensure that object names less than or equal
            # to those already in the listing are not fetched; if the listing
            # is empty then the original request marker, if any, is used. This
            # allows misplaced objects below the expected shard range to be
            # included in the listing.
            last_name = ''
            last_name_was_subdir = False
            if objects:
                last_name_was_subdir = 'subdir' in objects[-1]
                if last_name_was_subdir:
                    last_name = objects[-1]['subdir']
                else:
                    last_name = objects[-1]['name']

                if six.PY2:
                    last_name = last_name.encode('utf8')
                params['marker'] = str_to_wsgi(last_name)
            elif marker:
                params['marker'] = str_to_wsgi(marker)
            else:
                params['marker'] = ''
            # Always set end_marker to ensure that misplaced objects beyond the
            # expected shard range are not fetched. This prevents a misplaced
            # object obscuring correctly placed objects in the next shard
            # range.
            if end_marker and end_marker in shard_range:
                params['end_marker'] = str_to_wsgi(end_marker)
            elif reverse:
                params['end_marker'] = str_to_wsgi(shard_range.lower_str)
            else:
                params['end_marker'] = str_to_wsgi(shard_range.end_marker)

            headers = {}
            if ((shard_range.account, shard_range.container) in
                    shard_listing_history):
                # directed back to same container - force GET of objects
                headers['X-Backend-Record-Type'] = 'object'
            if config_true_value(req.headers.get('x-newest', False)):
                headers['X-Newest'] = 'true'

            if prefix:
                if prefix > shard_range:
                    continue
                try:
                    just_past = prefix[:-1] + chr(ord(prefix[-1]) + 1)
                except ValueError:
                    pass
                else:
                    if just_past < shard_range:
                        continue

            if last_name_was_subdir and str(
                shard_range.lower if reverse else shard_range.upper
            ).startswith(last_name):
                continue

            self.logger.debug(
                'Getting listing part %d from shard %s %s with %s',
                i, shard_range, shard_range.name, headers)
            objs, shard_resp = self._get_container_listing(
                req, shard_range.account, shard_range.container,
                headers=headers, params=params)
            all_resp_status.append(shard_resp.status_int)

            sharding_state = shard_resp.headers.get('x-backend-sharding-state',
                                                    'unknown')

            if objs is None:
                # give up if any non-success response from shard containers
                self.logger.error(
                    'Aborting listing from shards due to bad response: %r'
                    % all_resp_status)
                return HTTPServiceUnavailable(request=req)
            shard_policy = shard_resp.headers.get(
                'X-Backend-Record-Storage-Policy-Index',
                shard_resp.headers[policy_key]
            )
            if shard_policy != req.headers[policy_key]:
                self.logger.error(
                    'Aborting listing from shards due to bad shard policy '
                    'index: %s (expected %s)',
                    shard_policy, req.headers[policy_key])
                return HTTPServiceUnavailable(request=req)
            self.logger.debug(
                'Found %d objects in shard (state=%s), total = %d',
                len(objs), sharding_state, len(objs) + len(objects))

            if not objs:
                # tolerate empty shard containers
                continue

            objects.extend(objs)
            limit -= len(objs)

            if limit <= 0:
                break
            last_name = objects[-1].get('name',
                                        objects[-1].get('subdir', u''))
            if six.PY2:
                last_name = last_name.encode('utf8')
            if end_marker and reverse and end_marker >= last_name:
                break
            if end_marker and not reverse and end_marker <= last_name:
                break

        resp.body = json.dumps(objects).encode('ascii')
        constrained = any(req.params.get(constraint) for constraint in (
            'marker', 'end_marker', 'path', 'prefix', 'delimiter'))
        if not constrained and len(objects) < req_limit:
            self.logger.debug('Setting object count to %s' % len(objects))
            # prefer the actual listing stats over the potentially outdated
            # root stats. This condition is only likely when a sharded
            # container is shrinking or in tests; typically a sharded container
            # will have more than CONTAINER_LISTING_LIMIT objects so any
            # unconstrained listing will be capped by the limit and total
            # object stats cannot therefore be inferred from the listing.
            resp.headers['X-Container-Object-Count'] = len(objects)
            resp.headers['X-Container-Bytes-Used'] = sum(
                [o['bytes'] for o in objects])
        return resp

    @public
    @delay_denial
    @cors_validation
    def GET(self, req):
        """Handler for HTTP GET requests."""
        # early checks for request validity
        validate_container_params(req)
        return self.GETorHEAD(req)

    @public
    @delay_denial
    @cors_validation
    def HEAD(self, req):
        """Handler for HTTP HEAD requests."""
        return self.GETorHEAD(req)

    @public
    @cors_validation
    def PUT(self, req):
        """HTTP PUT request handler."""
        error_response = \
            self.clean_acls(req) or check_metadata(req, 'container')
        if error_response:
            return error_response
        policy_index = self._convert_policy_to_index(req)
        if not req.environ.get('swift_owner'):
            for key in self.app.swift_owner_headers:
                req.headers.pop(key, None)
        if req.environ.get('reseller_request', False) and \
                'X-Container-Sharding' in req.headers:
            req.headers[get_sys_meta_prefix('container') + 'Sharding'] = \
                str(config_true_value(req.headers['X-Container-Sharding']))
        length_limit = self.get_name_length_limit()
        if len(self.container_name) > length_limit:
            body = 'Container name length of %d longer than %d' % (
                len(self.container_name), length_limit)
            resp = HTTPBadRequest(request=req, body=body)
            return resp
        account_partition, accounts, container_count = \
            self.account_info(self.account_name, req)
        if not accounts and self.app.account_autocreate:
            if not self.autocreate_account(req, self.account_name):
                return HTTPServiceUnavailable(request=req)
            account_partition, accounts, container_count = \
                self.account_info(self.account_name, req)
        if not accounts:
            return HTTPNotFound(request=req)
        if 0 < self.app.max_containers_per_account <= container_count and \
                self.account_name not in self.app.max_containers_whitelist:
            container_info = \
                self.container_info(self.account_name, self.container_name,
                                    req)
            if not is_success(container_info.get('status')):
                body = 'Reached container limit of %s' % (
                    self.app.max_containers_per_account, )
                resp = HTTPForbidden(request=req, body=body)
                return resp
        container_partition, containers = self.app.container_ring.get_nodes(
            self.account_name, self.container_name)
        headers = self._backend_requests(req, len(containers),
                                         account_partition, accounts,
                                         policy_index)
        resp = self.make_requests(
            req, self.app.container_ring,
            container_partition, 'PUT', req.swift_entity_path, headers)
        self._clear_container_info_cache(req)
        return resp

    @public
    @cors_validation
    def POST(self, req):
        """HTTP POST request handler."""
        error_response = \
            self.clean_acls(req) or check_metadata(req, 'container')
        if error_response:
            return error_response
        if not req.environ.get('swift_owner'):
            for key in self.app.swift_owner_headers:
                req.headers.pop(key, None)
        if req.environ.get('reseller_request', False) and \
                'X-Container-Sharding' in req.headers:
            req.headers[get_sys_meta_prefix('container') + 'Sharding'] = \
                str(config_true_value(req.headers['X-Container-Sharding']))
        account_partition, accounts, container_count = \
            self.account_info(self.account_name, req)
        if not accounts:
            return HTTPNotFound(request=req)
        container_partition, containers = self.app.container_ring.get_nodes(
            self.account_name, self.container_name)
        headers = self.generate_request_headers(req, transfer=True)
        self._clear_container_info_cache(req)
        resp = self.make_requests(
            req, self.app.container_ring, container_partition, 'POST',
            req.swift_entity_path, [headers] * len(containers))
        return resp

    @public
    @cors_validation
    def DELETE(self, req):
        """HTTP DELETE request handler."""
        account_partition, accounts, container_count = \
            self.account_info(self.account_name, req)
        if not accounts:
            return HTTPNotFound(request=req)
        container_partition, containers = self.app.container_ring.get_nodes(
            self.account_name, self.container_name)
        headers = self._backend_requests(req, len(containers),
                                         account_partition, accounts)
        self._clear_container_info_cache(req)
        resp = self.make_requests(
            req, self.app.container_ring, container_partition, 'DELETE',
            req.swift_entity_path, headers)
        # Indicates no server had the container
        if resp.status_int == HTTP_ACCEPTED:
            return HTTPNotFound(request=req)
        return resp

    @private
    def UPDATE(self, req):
        """HTTP UPDATE request handler.

        Method to perform bulk operations on container DBs,
        similar to a merge_items REPLICATE request.

        Not client facing; internal clients or middlewares must include
        ``X-Backend-Allow-Private-Methods: true`` header to access.
        """
        container_partition, containers = self.app.container_ring.get_nodes(
            self.account_name, self.container_name)
        # Since this isn't client facing, expect callers to supply an index
        policy_index = req.headers['X-Backend-Storage-Policy-Index']
        headers = self._backend_requests(
            req, len(containers), account_partition=None, accounts=[],
            policy_index=policy_index)
        return self.make_requests(
            req, self.app.container_ring, container_partition, 'UPDATE',
            req.swift_entity_path, headers, body=req.body)

    def _backend_requests(self, req, n_outgoing, account_partition, accounts,
                          policy_index=None):
        additional = {'X-Timestamp': Timestamp.now().internal}
        if policy_index is None:
            additional['X-Backend-Storage-Policy-Default'] = \
                int(POLICIES.default)
        else:
            additional['X-Backend-Storage-Policy-Index'] = str(policy_index)
        headers = [self.generate_request_headers(req, transfer=True,
                                                 additional=additional)
                   for _junk in range(n_outgoing)]

        for i, account in enumerate(accounts):
            i = i % len(headers)

            headers[i]['X-Account-Partition'] = account_partition
            headers[i]['X-Account-Host'] = csv_append(
                headers[i].get('X-Account-Host'),
                '%(ip)s:%(port)s' % account)
            headers[i]['X-Account-Device'] = csv_append(
                headers[i].get('X-Account-Device'),
                account['device'])

        return headers