Source code for preparenovonix.novonix_clean
import sys
import preparenovonix.novonix_variables as nv
from preparenovonix.novonix_io import replace_file
from preparenovonix.novonix_io import icolumn
summary = "[Summary]"
[docs]def count_tests(infile):
"""
Given a Novonix data file, count the number of tests
it contains, by looking for the "[Summary]" line
that starts all Novonix data files.
Parameters
-----------
infile : string
Name of the input Novonix file
Returns
-------
ntests : int
Number of tests found in the file
Examples
---------
>>> from preparenovonix.novonix_clean import count_tests
>>> count_tests('example_data/example_data.csv')
2
"""
ntests = 0
with open(infile, "r") as ff:
for line in ff:
if line.strip(): # Jump empty lines
if summary in line:
# Count failed tests for each start of file
ntests += 1
return ntests
[docs]def header_data_columns(head_line, data_cols, header):
"""
Given a Novonix data file, compare the columns
according to the data and the header.
If there are more data columns than implied in the header,
dummy colum names are added (dum#).
If there are less data columns than implied in the header,
the program stops.
Parameters
-----------
head_line : string
Header line with column names.
data_cols : array of floats
First line with data
header: array of strings
Header. If needed, this header will be modified.
Examples
---------
>>> from preparenovonix.novonix_clean import header_data_columns
>>> header = ['# Example header']
>>> header_data_columns("a,b",[1,2,3],header)
>>> print(header[-1])
a,b,dum0
"""
colnames = head_line.split(",")
# Remove triling blancks and end of lines
colnames = [x.strip() for x in colnames]
# Difference between columns in the header and in the data
diff = len(data_cols) - len(colnames)
if diff > 0:
# Add dum headers
dums = ""
for idiff in range(diff):
dums = dums + ",dum" + str(idiff)
new_head = str(head_line.rstrip()) + dums + " \n"
header.append(new_head)
elif diff < 0:
sys.exit(
"STOP novonix_clean.header_data_columns \n"
+ "REASON less data columns than header names \n"
)
else:
header.append(head_line)
return
[docs]def capacity_failed_tests(icapacity, ntests, infile):
"""
Given a Novonix data file, add up the last capacity
measurement of each failed test in the file.
Parameters
-----------
icapacity: int
Column position for the capacity
ntests : int
Number of tests in the file
infile : string
Input file
Returns
-------
last_capacity: float
Sum of the last capacity measurements for each test
before the last one in the file.
Examples
---------
>>> from preparenovonix.novonix_clean import capacity_failed_tests
>>> capacity_failed_tests(2,"example_data/example_data.csv")
0.4956497995
"""
last_capacity = 0.0
if ntests > 1:
itest = 0
lastline = " "
with open(infile, "r") as ff:
for line in ff:
if line.strip():
if summary in line:
itest += 1
if ntests > 1 and itest > 1:
# Add last capacity of each failed test
last_capacity = last_capacity + float(
lastline.split(",")[icapacity]
)
if itest == ntests:
return last_capacity
lastline = line
return last_capacity
[docs]def cleannovonix(infile):
"""
Given a Novonix file remove blank lines, correct the header,
remove failed tests if needed.
Parameters
-----------
infile : string
Name of the input Novonix file
Notes
-----
This code returns a cleaned Novonix file
Examples
---------
>>> from preparenovonix.novonix_clean import cleannovonix
>>> cleannovonix('example_data/example_data.csv')
"""
# Count the number of tests
ntests = count_tests(infile)
# Find the capacity colum
icapacity = icolumn(infile, nv.col_c)
# Find the run time column
iruntime = icolumn(infile, nv.col_t)
# Deal with the capacity of the failed tests
last_capacity = capacity_failed_tests(icapacity, ntests, infile)
# Start reading the last test
# Remove blank lines if present in the header
itest = 0
header = []
with open(infile, "r") as ff:
for line in ff:
if line.strip():
if summary in line:
itest += 1
if itest == ntests:
header.append(summary + " \n")
break
# Read until the line with [Data]
for line in ff:
if line.strip():
char1 = line.strip()[0]
if char1 == "[":
cleanhead = line.split("]")
header.append(cleanhead[0] + "] \n")
if cleanhead[0] == "[Data":
break
else:
header.append(line)
# From the data header, read the column names
for line in ff:
if line.strip():
break
# Check that the number of data columns matches the header
line_data1 = ff.readline()
data = line_data1.split(",")
header_data_columns(line, data, header)
# Create a temporary file without blanck lines
# and new header if needed
tmp_file = "tmp.csv"
with open(tmp_file, "w") as tf:
for item in header:
tf.write(str(item))
# Append the data jumping any line with time going backwards
with open(tmp_file, "a") as tf:
# Write the first data row
if ntests > 1:
# Modify the Capacity column in case of failed tests
new_capacity = float(data[icapacity]) + float(last_capacity)
data[icapacity] = str(new_capacity)
new_line = data[0]
for col in data[1:]:
new_line = new_line + "," + col
tf.write(new_line)
else:
tf.write(line_data1)
# Write the rest of the data
last_t = -1.0
for line in ff:
columns = line.split(",")
if float(columns[iruntime]) < last_t:
continue
last_t = float(columns[iruntime])
if ntests > 1:
# Modify the Capacity column in case of failed tests
new_capacity = float(columns[icapacity]) + float(last_capacity)
columns[icapacity] = str(new_capacity)
new_line = columns[0]
for col in columns[1:]:
new_line = new_line + "," + col
tf.write(new_line)
else:
tf.write(line)
# Replace the input file with the new one
replace_file(tmp_file, infile, newbigger=False)
return