#!/usr/bin/env python
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import re
import datetime
import sys
import string
from optparse import OptionParser
from datetime import datetime, timedelta
import shutil
def showUsage():
log("./processTests.py [-b|--broker-log-dir]
[-t|--test-dir] ")
ACCESS="Access"
MODIFY="Modify"
BROKER_LOG="broker.log"
BROKER_PID="broker.pid"
BROKER_CPU="broker_cpu.log"
BROKER_CPU_DATED="broker_cpu.log.dated"
BROKER_STATS="broker.stats"
BROKER_GC="gc.log"
GRAPH_DATA="graph.data"
_verbose = False
_debug = False
_brokerLogs = ""
def exitError(message):
log(message)
sys.exit(1)
def main():
global _log, _verbose, _debug, _brokerLogs
# Load the
parser = OptionParser()
parser.add_option("-v", "--verbose", dest="verbose",
action="store_true", default=False, help="enable verbose output")
parser.add_option("-d", "--debug", dest="debug",
action="store_true", default=False, help="enable debug output")
parser.add_option("-b", "--broker-log-dir", dest="brokerLogs",
action="store", default=True, help="Broker Logs")
parser.add_option("-t", "--test-dir", dest="testDir",
action="store", default="", help="Test Results")
(options, args) = parser.parse_args()
_verbose = options.verbose
_debug = options.debug
testDir = options.testDir
_brokerLogs = options.brokerLogs
if testDir == "" or _brokerLogs == "" :
log("Broker Log Dir and Test Dir are both requried.")
showUsage()
if not os.path.exists(testDir):
exitError("Test directory does not exist:" + testDir)
if not os.path.exists(_brokerLogs):
exitError("Broker log directory does not exist:" + _brokerLogs)
# Standardize the format of the broker logs
preProcessBrokerLogs(_brokerLogs)
# Get list of test results from test_dir
processTestResults(testDir)
#
# Process the log files we know of
#
def preProcessBrokerLogs(resultDir):
print "Pre Processing Broker Logs"
# Pre-Process GC - no pre processing required
# Process Log4j - no processing required as file is already time stamped.
# Pre-Process broker_cpu
processCPUUsage(resultDir)
#
# Process the broker CPU log file and create an output file of format
#
#
#
def processCPUUsage(resultDir):
logfile=resultDir+os.sep+BROKER_CPU
datedFile=resultDir+os.sep+BROKER_CPU_DATED
start = extractTime(ACCESS, logfile+".stat")
pid = getPID(BROKER_PID)
topRate = getFirstLine(_brokerLogs+os.sep+"top.rate")
#
# Calulate addition required per process line output
#
if topRate.find(".") == -1:
seconds = topRate
millis = 0
else:
split = topRate.split('.')
seconds = split[0]
# Convert
millis = float("0."+split[1]) * 1000
offset = timedelta(seconds=int(seconds),milliseconds=int(millis))
#
# Process the CPU log file and make a file of format:
# datetime
#
# Open log CPU file for reading
logfile = open(logfile, "r")
# Open the output file, erasing any existing version
# Keep track of the min/max sum and entries,.
minCPU=float(sys.maxint)
maxCPU=0.0
minMem=float(sys.maxint)
maxMem=0.0
entries=0
sumCPU=0.0
sumMem=0.0
output= open(datedFile, "w")
for line in logfile:
#
# Data format
# 0 1 2 3 4 5 6 7 8 9 10 11
# PID USER PR NI %CPU TIME+ %MEM VIRT RES SHR S COMMAND
# PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
#
# %CPU and %MEM are vary, probably based on os/version of top.
# lets auto-detect where it is.
#
# Index is 0 based for array usage.
index = 0
if line.find("PID") != -1:
for key in line.split(" "):
strippedKey = key.lstrip()
if len(strippedKey) > 0:
# Record the key index
if (strippedKey == "%CPU"):
cpuIndex=index
if (strippedKey == "%MEM"):
memIndex=index
# Increase count for next key
index = index + 1
# Find lines that contain our broker process
if line.find("QPBRKR") != -1:
# Split line on whitespace
data = line.split()
#Write out the date time (ISO-8601 format)
output.write(str(start))
# Output the %CPU value
output.write(" "+str(data[cpuIndex]))
# Output the %MEM value
output.write(" "+str(data[memIndex]))
output.write('\n')
# Add the offset based on the logging rate
start = start + offset
# Record entires
entries = entries + 1
# Record Metrics
# Record CPU data
cpu = float(data[cpuIndex])
if (cpu < minCPU):
minCPU = cpu
if (cpu > maxCPU):
maxCPU = cpu
sumCPU = sumCPU + cpu
# Record Mem data
mem = float(data[memIndex])
if (mem < minMem):
minMem = mem
if (mem > maxMem):
maxMem = mem
sumMem = sumMem + mem
#end for
# Close the files
logfile.close
output.close
# Output stats file
statFile=resultDir+os.sep+BROKER_CPU+".stats"
output= open(statFile, "w")
output.write("#type:min/max/avg")
output.write('\n')
output.write("CPU:"+str(minCPU)+"/"+str(maxCPU)+"/"+str(float(sumCPU)/float(entries)))
output.write('\n')
output.write("MEM:"+str(minMem)+"/"+str(maxMem)+"/"+str(float(sumMem)/float(entries)))
output.write('\n')
output.close
log("Pre Process of CPU Log file '"+BROKER_CPU+"' complete")
#
# Give an known process type get the recorded PID.
#
def getPID(process):
return getFirstLine(_brokerLogs+os.sep+process)
#
# Get the first line of the file without EOL chars.
# NOTE: this will load the entire file into memory to do it.
#
def getFirstLine(fileName):
f = open(fileName,"r")
line = f.read().splitlines()[0]
f.close
return line
#
# Walk the directory given and process all csv test results
#
def processTestResults(resultDir):
for root, dirs, files in os.walk(resultDir, topdown=False):
if len(files) == 0:
exitError("Test result directory is empty:" + resultDir)
for file in files:
if file.endswith(".csv"):
processTestResult(root , file)
def processTestResult(root, resultFile):
# Open stat file and extract test times, we determine:
# -start time based on the 'Access' value
# -end time based on the 'Modify' value 'Change' would also work
statFile=root+os.sep+resultFile+".stat"
if not os.path.exists(statFile):
log("Unable to process : Unable to open stat file:" + statFile)
return
createResultSetPackage(root, resultFile)
def extractTime(field, statFile):
stats = open(statFile, "r")
for line in stats:
if line.startswith(field):
if line.find("(") == -1:
dt = lineToDate(" ".join(line.split()[1:]))
#
# TODO We need to handle time time zone issues as I'm sure we will have issues with the
# log4j matching.
stats.close
return dt
#
# Given a text line in ISO format convert it to a date object
#
def lineToDate(line):
#2009-06-22 17:04:44,320
#2009-06-22 17:04:44.320
pattern = re.compile(r'(?P^[0-9][0-9][0-9][0-9])-(?P[0-9][0-9])-(?P[0-9][0-9]) (?P[0-9][0-9]):(?P[0-9][0-9]):(?P[0-9][0-9])')
m = pattern.match(line)
if m:
year = int(m.group('year'))
month = int(m.group('month'))
day = int(m.group('day'))
hour = int(m.group('hour'))
minute = int(m.group('minute'))
seconds = int(m.group('seconds'))
pattern = re.compile(r'(?P^[0-9][0-9][0-9][0-9])-(?P[0-9][0-9])-(?P[0-9][0-9]) (?P[0-9][0-9]):(?P[0-9][0-9]):(?P[0-9][0-9])[.|,](?P[0-9]+)')
m = pattern.match(line)
micro = None
if m:
micro = m.group('micro')
if micro == None:
micro = 0
# Correct issue where micros are actually nanos
if int(micro) > 999999:
micro = int(micro) / 1000
return datetime(year,month,day,hour,minute,seconds,int(micro))
else:
# Error we shouldn't get here
return null
def createResultSetPackage(root, resultFile):
# Get the Name of the test to make a directory with said name
testName = resultFile.split(".csv")[0]
resultDir = root+ os.sep + testName
log("Processing Result set for:"+ testName)
mkdir(resultDir)
# Move result file to new directory
shutil.move(root + os.sep + resultFile, resultDir)
# Move stat file to new directory
shutil.move(root + os.sep + resultFile + ".stat", resultDir)
statFile=resultDir + os.sep + resultFile + ".stat"
#
# Get start and end time for test run
#
start = extractTime(ACCESS, statFile)
end = extractTime(MODIFY, statFile)
sliceBrokerLogs(resultDir, start, end)
createGraphData(resultDir, testName)
createTestStatData(resultDir, testName)
log("Created Result Package for:"+ testName)
def sliceBrokerLogs(resultDir, start, end):
sliceCPULog(resultDir, start, end)
sliceLog4j(resultDir, start, end)
sliceGCLog(resultDir, start, end)
def sliceCPULog(resultDir, start, end):
global _brokerLogs
logfilePath=_brokerLogs+os.sep+BROKER_CPU_DATED
cpuSliceFile=resultDir+os.sep+BROKER_CPU
# Process the CPU log file and make a file of format:
# datetime
#
# Open log CPU file for reading
logFile = open(logfilePath, "r")
# Open the output file, erasing any existing version
# Keep track of the min/max sum and entries,.
minCPU=float(sys.maxint)
maxCPU=0.0
minMem=float(sys.maxint)
maxMem=0.0
entries=0
sumCPU=0.0
sumMem=0.0
#
# Create outputfile
#
cpuslice = open(cpuSliceFile,"w")
for line in logFile:
data = line.split()
#
# //fixme remove tz addition.
#
lineTime = lineToDate(" ".join(data[0:2])+" +0000")
if lineTime > start:
if lineTime < end:
# Write the data though to the new file
cpuslice.writelines(line)
# Perform stat processing for the min/max/avg
data = line.split()
#
# Data format is
#