#!/usr/bin/env python # Copyright 2020 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Transform CBCM Takeout API Data (Python3).""" from __future__ import print_function from __future__ import unicode_literals import argparse import csv import json import sys import time import google_auth_httplib2 from httplib2 import Http from google.oauth2.service_account import Credentials from builtins import bytes from builtins import str from io import open def ComputeExtensionsList(extensions_list, data): """Computes list of machines that have an extension. This sample function processes the |data| retrieved from the Takeout API and calculates the list of machines that have installed each extension listed in the data. Args: extensions_list: the extension list dictionary to fill. data: the data fetched from the Takeout API. """ for device in data['browsers']: if 'browsers' not in device: continue for browser in device['browsers']: if 'profiles' not in browser: continue for profile in browser['profiles']: if 'extensions' not in profile: continue for extension in profile['extensions']: key = extension['extensionId'] if 'version' in extension: key = key + ' @ ' + extension['version'] if key not in extensions_list: current_extension = { 'name': extension.get('name', ''), 'permissions': extension.get('permissions', ''), 'installed': set(), 'disabled': set(), 'forced': set() } else: current_extension = extensions_list[key] machine_name = device['machineName'] current_extension['installed'].add(machine_name) if extension.get('installType', '') == 'ADMIN': current_extension['forced'].add(machine_name) if extension.get('disabled', False): current_extension['disabled'].add(machine_name) extensions_list[key] = current_extension def DictToList(data, key_name='id'): """Converts a dict into a list. The value of each member of |data| must also be a dict. The original key for the value will be inlined into the value, under the |key_name| key. Args: data: a dict where every value is a dict key_name: the name given to the key that is inlined into the dict's values Yields: The values from |data|, with each value's key inlined into the value. """ assert isinstance(data, dict), '|data| must be a dict' for key, value in data.items(): assert isinstance(value, dict), '|value| must contain dict items' value[key_name] = key yield value def Flatten(data, all_columns): """Flattens lists inside |data|, one level deep. This function will flatten each dictionary key in |data| into a single row so that it can be written to a CSV file. Args: data: the data to be flattened. all_columns: set of all columns that are found in the result (this will be filled by the function). Yields: A list of dict objects whose lists or sets have been flattened. """ SEPARATOR = ', ' # Max length of a cell in Excel is technically 32767 characters but if we get # too close to this limit Excel seems to create weird results when we open # the CSV file. To protect against this, give a little more buffer to the max # characters. MAX_CELL_LENGTH = 32700 for item in data: added_item = {} for prop, value in item.items(): # Non-container properties can be added directly. if not isinstance(value, (list, set)): added_item[prop] = value continue # Otherwise join the container together into a single cell. num_prop = 'num_' + prop added_item[num_prop] = len(value) # For long lists, the cell contents may go over MAX_CELL_LENGTH, so # split the list into chunks that will fit into MAX_CELL_LENGTH. flat_list = SEPARATOR.join(sorted(value)) overflow_prop_index = 0 while True: current_column = prop if overflow_prop_index: current_column = prop + '_' + str(overflow_prop_index) flat_list_len = len(flat_list) if flat_list_len > MAX_CELL_LENGTH: last_separator = flat_list.rfind(SEPARATOR, 0, MAX_CELL_LENGTH - flat_list_len) if last_separator != -1: added_item[current_column] = flat_list[0:last_separator] flat_list = flat_list[last_separator + 2:] overflow_prop_index = overflow_prop_index + 1 continue # Fall-through case where no more splitting is possible, this is the # lass cell to add for this list. added_item[current_column] = flat_list break assert isinstance(added_item[prop], (int, bool, str)), ('unexpected type for item: %s' % type(added_item[prop]).__name__) all_columns.update(added_item.keys()) yield added_item def ExtensionListAsCsv(extensions_list, csv_filename, sort_column='name'): """Saves an extensions list to a CSV file. Args: extensions_list: an extensions list as returned by ComputeExtensionsList csv_filename: the name of the CSV file to save sort_column: the name of the column by which to sort the data """ all_columns = set() flattened_list = list(Flatten(DictToList(extensions_list), all_columns)) desired_column_order = [ 'id', 'name', 'num_permissions', 'num_installed', 'num_disabled', 'num_forced', 'permissions', 'installed', 'disabled', 'forced' ] # Order the columns as desired. Columns other than those in # |desired_column_order| will be in an unspecified order after these columns. ordered_fieldnames = [] for c in desired_column_order: matching_columns = [] for f in all_columns: if f == c or f.startswith(c): matching_columns.append(f) ordered_fieldnames.extend(sorted(matching_columns)) ordered_fieldnames.extend( [x for x in desired_column_order if x not in ordered_fieldnames]) with open(csv_filename, mode='w', newline='', encoding='utf-8') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=ordered_fieldnames) writer.writeheader() for row in sorted(flattened_list, key=lambda ext: ext[sort_column]): writer.writerow(row) def main(args): if not args.admin_email: print('admin_email must be specified.') sys.exit(1) if not args.service_account_key_path: print('service_account_key_path must be specified.') sys.exit(1) # Load the json format key that you downloaded from the Google API # Console when you created your service account. For p12 keys, use the # from_p12_keyfile method of ServiceAccountCredentials and specify the # service account email address, p12 keyfile, and scopes. service_credentials = Credentials.from_service_account_file( args.service_account_key_path, scopes=[ 'https://www.googleapis.com/auth/admin.directory.device.chromebrowsers.readonly' ], subject=args.admin_email) try: http = google_auth_httplib2.AuthorizedHttp(service_credentials, http=Http()) extensions_list = {} base_request_url = 'https://admin.googleapis.com/admin/directory/v1.1beta1/customer/my_customer/devices/chromebrowsers' request_parameters = '' browsers_processed = 0 while True: print('Making request to server ...') retrycount = 0 while retrycount < 5: response = http.request(base_request_url + '?' + request_parameters, 'GET')[1] if isinstance(response, bytes): response = response.decode('utf-8') data = json.loads(response) if 'browsers' not in data: print('Response error, retrying...') time.sleep(3) retrycount += 1 else: break browsers_in_data = len(data['browsers']) print('Request returned %s results, analyzing ...' % (browsers_in_data)) ComputeExtensionsList(extensions_list, data) browsers_processed += browsers_in_data if 'nextPageToken' not in data or not data['nextPageToken']: break print('%s browsers processed.' % (browsers_processed)) if (args.max_browsers_to_process is not None and args.max_browsers_to_process <= browsers_processed): print('Stopping at %s browsers processed.' % (browsers_processed)) break request_parameters = ('pageToken={}').format(data['nextPageToken']) finally: print('Analyze results ...') ExtensionListAsCsv(extensions_list, args.extension_list_csv) print("Results written to '%s'" % (args.extension_list_csv)) if __name__ == '__main__': parser = argparse.ArgumentParser(description='CBCM Extension Analyzer') parser.add_argument( '-k', '--service_account_key_path', metavar='FILENAME', required=True, help='The service account key file used to make API requests.') parser.add_argument( '-a', '--admin_email', required=True, help='The admin user used to make the API requests.') parser.add_argument( '-x', '--extension_list_csv', metavar='FILENAME', default='./extension_list.csv', help='Generate an extension list to the specified CSV ' 'file') parser.add_argument( '-m', '--max_browsers_to_process', type=int, help='Maximum number of browsers to process. (Must be > 0).') args = parser.parse_args() if (args.max_browsers_to_process is not None and args.max_browsers_to_process <= 0): print('max_browsers_to_process must be > 0.') parser.print_help() sys.exit(1) main(args)