summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorankitagrawal <ankit11.agrawal@nttdata.com>2014-09-19 04:46:11 -0700
committerankitagrawal <ankit11.agrawal@nttdata.com>2014-09-24 23:52:46 -0700
commitebeca911fa291e258c2c0b1ef55a26ff5ac009d2 (patch)
treec75294c9329eb3ccf8fed27c6b2f9a14772ab252
parent7684d956476254d4a297e62d5a3debe27c461d7c (diff)
downloadpython-keystoneclient-ebeca911fa291e258c2c0b1ef55a26ff5ac009d2.tar.gz
Redact x-subject-token from response headers
When you invoke any OpenStack API of any of the OpenStack services e.g. glance, neutron, cinder, heat, ceilometer, nova, keystone then it logs readable x-subject-token at the debug log level in the respective log files. Simply redacting the x-subject-token in keystone client response header before logging it. SecurityImpact Closes-Bug: #1371355 Change-Id: Iac16c6358250677544761beea9f5c5d8ba29afac
-rw-r--r--keystoneclient/session.py22
-rw-r--r--keystoneclient/tests/test_session.py15
2 files changed, 25 insertions, 12 deletions
diff --git a/keystoneclient/session.py b/keystoneclient/session.py
index 3833304..a382cc7 100644
--- a/keystoneclient/session.py
+++ b/keystoneclient/session.py
@@ -116,6 +116,15 @@ class Session(object):
if user_agent is not None:
self.user_agent = user_agent
+ @classmethod
+ def process_header(cls, header):
+ """Redacts the secure headers to be logged."""
+ secure_headers = ('authorization', 'x-auth-token',
+ 'x-subject-token',)
+ if header[0].lower() in secure_headers:
+ return (header[0], 'TOKEN_REDACTED')
+ return header
+
@utils.positional()
def _http_log_request(self, url, method=None, data=None,
json=None, headers=None):
@@ -125,13 +134,6 @@ class Session(object):
# debug log.
return
- def process_header(header):
- secure_headers = ('authorization', 'x-auth-token',
- 'x-subject-token',)
- if header[0].lower() in secure_headers:
- return (header[0], 'TOKEN_REDACTED')
- return header
-
string_parts = ['REQ: curl -i']
# NOTE(jamielennox): None means let requests do its default validation
@@ -146,7 +148,8 @@ class Session(object):
if headers:
for header in six.iteritems(headers):
- string_parts.append('-H "%s: %s"' % process_header(header))
+ string_parts.append('-H "%s: %s"'
+ % Session.process_header(header))
if json:
data = jsonutils.dumps(json)
if data:
@@ -175,7 +178,8 @@ class Session(object):
if status_code:
string_parts.append('[%s]' % status_code)
if headers:
- string_parts.append('%s' % headers)
+ for header in six.iteritems(headers):
+ string_parts.append('%s: %s' % Session.process_header(header))
if text:
string_parts.append('\nRESP BODY: %s\n' % text)
diff --git a/keystoneclient/tests/test_session.py b/keystoneclient/tests/test_session.py
index 9a66801..4c5b460 100644
--- a/keystoneclient/tests/test_session.py
+++ b/keystoneclient/tests/test_session.py
@@ -138,6 +138,10 @@ class SessionTests(utils.TestCase):
session.get, self.TEST_URL)
def test_session_debug_output(self):
+ """Test request and response headers in debug logs
+
+ in order to redact secure headers while debug is true.
+ """
session = client_session.Session(verify=False)
headers = {'HEADERA': 'HEADERVALB'}
security_headers = {'Authorization': uuid.uuid4().hex,
@@ -145,10 +149,11 @@ class SessionTests(utils.TestCase):
'X-Subject-Token': uuid.uuid4().hex, }
body = 'BODYRESPONSE'
data = 'BODYDATA'
- self.stub_url('POST', text=body)
all_headers = dict(
itertools.chain(headers.items(), security_headers.items()))
- session.post(self.TEST_URL, headers=all_headers, data=data)
+ self.stub_url('POST', text=body, headers=all_headers)
+ resp = session.post(self.TEST_URL, headers=all_headers, data=data)
+ self.assertEqual(resp.status_code, 200)
self.assertIn('curl', self.logger.output)
self.assertIn('POST', self.logger.output)
@@ -159,8 +164,12 @@ class SessionTests(utils.TestCase):
for k, v in six.iteritems(headers):
self.assertIn(k, self.logger.output)
self.assertIn(v, self.logger.output)
+
+ # Assert that response headers contains actual values and
+ # only debug logs has been masked
for k, v in six.iteritems(security_headers):
- self.assertIn(k, self.logger.output)
+ self.assertIn('%s: TOKEN_REDACTED' % k, self.logger.output)
+ self.assertEqual(v, resp.headers[k])
self.assertNotIn(v, self.logger.output)
def test_connect_retries(self):