#!/usr/bin/env python # # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import os import re import datetime import sys import string from optparse import OptionParser from datetime import datetime, timedelta import shutil def showUsage(): log("./processTests.py [-b|--broker-log-dir] [-t|--test-dir] ") ACCESS="Access" MODIFY="Modify" BROKER_LOG="broker.log" BROKER_PID="broker.pid" BROKER_CPU="broker_cpu.log" BROKER_CPU_DATED="broker_cpu.log.dated" BROKER_STATS="broker.stats" BROKER_GC="gc.log" GRAPH_DATA="graph.data" _verbose = False _debug = False _brokerLogs = "" def exitError(message): log(message) sys.exit(1) def main(): global _log, _verbose, _debug, _brokerLogs # Load the parser = OptionParser() parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="enable verbose output") parser.add_option("-d", "--debug", dest="debug", action="store_true", default=False, help="enable debug output") parser.add_option("-b", "--broker-log-dir", dest="brokerLogs", action="store", default=True, help="Broker Logs") parser.add_option("-t", "--test-dir", dest="testDir", action="store", default="", help="Test Results") (options, args) = parser.parse_args() _verbose = options.verbose _debug = options.debug testDir = options.testDir _brokerLogs = options.brokerLogs if testDir == "" or _brokerLogs == "" : log("Broker Log Dir and Test Dir are both requried.") showUsage() if not os.path.exists(testDir): exitError("Test directory does not exist:" + testDir) if not os.path.exists(_brokerLogs): exitError("Broker log directory does not exist:" + _brokerLogs) # Standardize the format of the broker logs preProcessBrokerLogs(_brokerLogs) # Get list of test results from test_dir processTestResults(testDir) # # Process the log files we know of # def preProcessBrokerLogs(resultDir): print "Pre Processing Broker Logs" # Pre-Process GC - no pre processing required # Process Log4j - no processing required as file is already time stamped. # Pre-Process broker_cpu processCPUUsage(resultDir) # # Process the broker CPU log file and create an output file of format # # # def processCPUUsage(resultDir): logfile=resultDir+os.sep+BROKER_CPU datedFile=resultDir+os.sep+BROKER_CPU_DATED start = extractTime(ACCESS, logfile+".stat") pid = getPID(BROKER_PID) topRate = getFirstLine(_brokerLogs+os.sep+"top.rate") # # Calulate addition required per process line output # if topRate.find(".") == -1: seconds = topRate millis = 0 else: split = topRate.split('.') seconds = split[0] # Convert millis = float("0."+split[1]) * 1000 offset = timedelta(seconds=int(seconds),milliseconds=int(millis)) # # Process the CPU log file and make a file of format: # datetime # # Open log CPU file for reading logfile = open(logfile, "r") # Open the output file, erasing any existing version # Keep track of the min/max sum and entries,. minCPU=float(sys.maxint) maxCPU=0.0 minMem=float(sys.maxint) maxMem=0.0 entries=0 sumCPU=0.0 sumMem=0.0 output= open(datedFile, "w") for line in logfile: # # Data format # 0 1 2 3 4 5 6 7 8 9 10 11 # PID USER PR NI %CPU TIME+ %MEM VIRT RES SHR S COMMAND # PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND # # %CPU and %MEM are vary, probably based on os/version of top. # lets auto-detect where it is. # # Index is 0 based for array usage. index = 0 if line.find("PID") != -1: for key in line.split(" "): strippedKey = key.lstrip() if len(strippedKey) > 0: # Record the key index if (strippedKey == "%CPU"): cpuIndex=index if (strippedKey == "%MEM"): memIndex=index # Increase count for next key index = index + 1 # Find lines that contain our broker process if line.find("QPBRKR") != -1: # Split line on whitespace data = line.split() #Write out the date time (ISO-8601 format) output.write(str(start)) # Output the %CPU value output.write(" "+str(data[cpuIndex])) # Output the %MEM value output.write(" "+str(data[memIndex])) output.write('\n') # Add the offset based on the logging rate start = start + offset # Record entires entries = entries + 1 # Record Metrics # Record CPU data cpu = float(data[cpuIndex]) if (cpu < minCPU): minCPU = cpu if (cpu > maxCPU): maxCPU = cpu sumCPU = sumCPU + cpu # Record Mem data mem = float(data[memIndex]) if (mem < minMem): minMem = mem if (mem > maxMem): maxMem = mem sumMem = sumMem + mem #end for # Close the files logfile.close output.close # Output stats file statFile=resultDir+os.sep+BROKER_CPU+".stats" output= open(statFile, "w") output.write("#type:min/max/avg") output.write('\n') output.write("CPU:"+str(minCPU)+"/"+str(maxCPU)+"/"+str(float(sumCPU)/float(entries))) output.write('\n') output.write("MEM:"+str(minMem)+"/"+str(maxMem)+"/"+str(float(sumMem)/float(entries))) output.write('\n') output.close log("Pre Process of CPU Log file '"+BROKER_CPU+"' complete") # # Give an known process type get the recorded PID. # def getPID(process): return getFirstLine(_brokerLogs+os.sep+process) # # Get the first line of the file without EOL chars. # NOTE: this will load the entire file into memory to do it. # def getFirstLine(fileName): f = open(fileName,"r") line = f.read().splitlines()[0] f.close return line # # Walk the directory given and process all csv test results # def processTestResults(resultDir): for root, dirs, files in os.walk(resultDir, topdown=False): if len(files) == 0: exitError("Test result directory is empty:" + resultDir) for file in files: if file.endswith(".csv"): processTestResult(root , file) def processTestResult(root, resultFile): # Open stat file and extract test times, we determine: # -start time based on the 'Access' value # -end time based on the 'Modify' value 'Change' would also work statFile=root+os.sep+resultFile+".stat" if not os.path.exists(statFile): log("Unable to process : Unable to open stat file:" + statFile) return createResultSetPackage(root, resultFile) def extractTime(field, statFile): stats = open(statFile, "r") for line in stats: if line.startswith(field): if line.find("(") == -1: dt = lineToDate(" ".join(line.split()[1:])) # # TODO We need to handle time time zone issues as I'm sure we will have issues with the # log4j matching. stats.close return dt # # Given a text line in ISO format convert it to a date object # def lineToDate(line): #2009-06-22 17:04:44,320 #2009-06-22 17:04:44.320 pattern = re.compile(r'(?P^[0-9][0-9][0-9][0-9])-(?P[0-9][0-9])-(?P[0-9][0-9]) (?P[0-9][0-9]):(?P[0-9][0-9]):(?P[0-9][0-9])') m = pattern.match(line) if m: year = int(m.group('year')) month = int(m.group('month')) day = int(m.group('day')) hour = int(m.group('hour')) minute = int(m.group('minute')) seconds = int(m.group('seconds')) pattern = re.compile(r'(?P^[0-9][0-9][0-9][0-9])-(?P[0-9][0-9])-(?P[0-9][0-9]) (?P[0-9][0-9]):(?P[0-9][0-9]):(?P[0-9][0-9])[.|,](?P[0-9]+)') m = pattern.match(line) micro = None if m: micro = m.group('micro') if micro == None: micro = 0 # Correct issue where micros are actually nanos if int(micro) > 999999: micro = int(micro) / 1000 return datetime(year,month,day,hour,minute,seconds,int(micro)) else: # Error we shouldn't get here return null def createResultSetPackage(root, resultFile): # Get the Name of the test to make a directory with said name testName = resultFile.split(".csv")[0] resultDir = root+ os.sep + testName log("Processing Result set for:"+ testName) mkdir(resultDir) # Move result file to new directory shutil.move(root + os.sep + resultFile, resultDir) # Move stat file to new directory shutil.move(root + os.sep + resultFile + ".stat", resultDir) statFile=resultDir + os.sep + resultFile + ".stat" # # Get start and end time for test run # start = extractTime(ACCESS, statFile) end = extractTime(MODIFY, statFile) sliceBrokerLogs(resultDir, start, end) createGraphData(resultDir, testName) createTestStatData(resultDir, testName) log("Created Result Package for:"+ testName) def sliceBrokerLogs(resultDir, start, end): sliceCPULog(resultDir, start, end) sliceLog4j(resultDir, start, end) sliceGCLog(resultDir, start, end) def sliceCPULog(resultDir, start, end): global _brokerLogs logfilePath=_brokerLogs+os.sep+BROKER_CPU_DATED cpuSliceFile=resultDir+os.sep+BROKER_CPU # Process the CPU log file and make a file of format: # datetime # # Open log CPU file for reading logFile = open(logfilePath, "r") # Open the output file, erasing any existing version # Keep track of the min/max sum and entries,. minCPU=float(sys.maxint) maxCPU=0.0 minMem=float(sys.maxint) maxMem=0.0 entries=0 sumCPU=0.0 sumMem=0.0 # # Create outputfile # cpuslice = open(cpuSliceFile,"w") for line in logFile: data = line.split() # # //fixme remove tz addition. # lineTime = lineToDate(" ".join(data[0:2])+" +0000") if lineTime > start: if lineTime < end: # Write the data though to the new file cpuslice.writelines(line) # Perform stat processing for the min/max/avg data = line.split() # # Data format is #