-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
392 lines (310 loc) · 14.3 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
import time
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import re
from pathlib import Path
from ast import literal_eval
import warnings
start_time = time.perf_counter()
# Import data from files in ParsedFiles folder. Data parsed and generated by parse_tribometer.py
master = pd.read_csv('ParsedFiles/Master.csv', converters={"Avg_Friction": literal_eval})
valid_master = pd.read_csv('ParsedFiles/ValidMaster.csv', converters={"Avg_Friction": literal_eval})
# TODO Rename first column to 'index'
# Plot all files?
plot_all = False
# Export friction data as a .csv file?
export_csv = True
# If export_csv is True, should only the tests with the median wear scar diameter be exported?
export_median_only = False
# If not, which test(s) should be analyzed?
plot_some = False
plot_test_list = ['ValidTribometerLogsMay16/Sample 12 OA-10 - May 7/PAO4_OA-10_20N_20mms_test11_May13']
# Plot all tests separately by OA concentration? (unaveraged)
plot_by_oa = False
# Average all tests of a given combination of OA and force?
average_all_tests = False
# Plot files individually? Which ones?
# plot_test_list = ['TribometerLogs/Sample 9 OA-1 - Feb 19/PAO4+OA-1_20N_20mms_test3_Feb19',
# 'TribometerLogs/Sample 9 OA-1 - Feb 19/PAO4+OA-1_20N_20mms_test4_Feb19',
# 'TribometerLogs/Sample 9 OA-1 - Feb 19/PAO4+OA-1_20N_100mms_test6_Feb19']
# Define a dictionary to store the DataFrame subsets
oa_groups = {}
oa_conc_values = [0, 1, 10, 20]
force_values = [10, 20]
# Iterate through each combination of OAConc and Force values
for oa_conc in oa_conc_values:
for force in force_values:
key = f"oa{oa_conc}_force{force}"
# Store the subset in the dictionary with the appropriate key
oa_groups[key] = valid_master.loc[
(valid_master['OAConc'] == oa_conc) & (valid_master['Force'] == force)
]
# TODO IMPLEMENT
test_averages = oa_groups
def friction_avg(friction_column: pd.Series):
"""
Calculate the average friction given many series
Computes the average index-wise
:param friction_column: A column of a dataframe containing the friction series data
:return param_avg_friction: A series containing the averaged values in each index
"""
param_avg_friction = np.mean([friction_column.iloc[i] for i in range(len(friction_column))], axis=0)
# OLD VERSION THAT USES *ARGS WITH EACH SERIES AS AN INPUT INSTEAD OF TAKING AN ENTIRE DATAFRAME
# zipped_column = [zip(friction_column.iloc[i]) for i in range(len(friction_column))]
#
# # Calculate the sum of values at each index across all lists
# sums = [sum(x) for x in zipped_column]
#
# # Calculate the average for each index
# param_avg_friction = [x / len(zipped_column) for x in sums]
return param_avg_friction
# Find average friction across tests for a given combination of OA/force parameters
for key, value in test_averages.items():
one_param_data = test_averages[key]
# For each dataframe in test_averages:
for test_index in range(len(one_param_data)):
# For friction series with a length less than the others, append the last element so all lists are equal length
# for averaging
max_cycles = one_param_data['Cycles'].max()
# Just the series that contains the average friction
friction_data = one_param_data.iloc[test_index]['Avg_Friction']
# Add the last element until the list is sufficiently long
while len(friction_data) < max_cycles:
friction_data.append(friction_data[-1])
# Set the Avg_Friction value in the first row to the average friction across all rows
# Find the average across all tests for a given combination of parameters
friction_column = one_param_data['Avg_Friction']
test_avg_friction = friction_avg(friction_column)
# ----------------------------------
# PLOTTING FRICTION COEFFICIENT DATA
# ----------------------------------
# def rolling_avg_plotter(friction_test: pd.DataFrame):
# """
# Plots the estimated friction coefficient graph for a test using a rolling average
# Saves the plot as a .png file
# :param friction_test: The tests to be analyzed
# :return: None
# """
#
# # Calculate the moving average
# window_size = 500
#
# # Plot rolling values against indices
# plt.plot(friction_test['Tx'].index,
# friction_test['Tx'].rolling(window=window_size).mean(),
# label=f'Moving Average (Window={window_size})',
# linestyle='--')
#
# plt.ylim(0, 0.2)
# plt.title('Estimated Friction Coefficient - Moving Average (500)')
# plt.xlabel('Count')
# plt.ylabel('Estimated Friction Coefficient')
# plt.legend()
#
# plot_folder_name = 'Friction Coefficient Graphs/'
# subfolder_name = 'OA_' + str(friction_test['OAConc']) + '/'
# # TODO Create subfolders if they don't already exist
#
# plot_name = plot_folder_name + \
# subfolder_name + \
# 'PAO4+OA-' + \
# str(friction_test['OAConc']) + '%OA_' + \
# str(friction_test['Force']) + 'N_' + \
# str(friction_test['Speed']) + 'mms_Test' + \
# str(friction_test['TestNo']) + '_' + \
# str(friction_test['Date'])
# plt.savefig(plot_name + '.png', dpi=1000)
# plt.clf()
#
# return None
def cycle_avg_plotter(friction_test: pd.DataFrame):
"""
Plots the estimated friction coefficient graph for a test after averaging all values for each semi-cycle
Saves the plot as a .png file
:param friction_test: The test to be analyzed
:param export_csv: Export friction coefficient data as a .csv file?
:return: None
"""
index_list = [i for i in range(len(friction_test['Avg_Friction']))]
if friction_test['Speed'] in [10, 20]:
color = 0.1
elif friction_test['Speed'] == 100:
color = 0.6
else:
print(f'Unknown speed for test no. {friction_test["TestNo"]}, sample no. {friction_test["SampleNo"]}.'
f' Mapping with color = 1 by default.')
color = 1
# Colour map to be used for graphs
viridis = mpl.colormaps['viridis']
# Plot friction coefficient graph
plt.plot(index_list,
friction_test['Avg_Friction'],
linestyle='solid',
linewidth=0.6,
color=viridis(color),
alpha=0.5,
label=str(friction_test['Speed']) + 'mm/s, ' + str(friction_test['Force']) + 'N, Test No. ' +
str(int(friction_test['TestNo']) + 1) + ', Sample No. ' + str(friction_test['SampleNo']))
plt.ylim(0, 0.35)
plt.title('Estimated Friction Coefficient Averaged \n Per Cycle for ' +
str(friction_test['OAConc']) + '% OA, ' +
str(friction_test['Force']) + 'N, and ' +
str(friction_test['Speed']) + 'mm/s')
plt.xlabel('Cycle Number')
plt.ylabel('Average Estimated Friction Coefficient')
plt.legend(fontsize=6)
plt.viridis()
plot_folder_name = 'Friction Coefficient Graphs/Individual/'
# For oil tests
if friction_test['SampleNo'] in range(10, 15):
test_name = 'PAO4+OA' + \
str(friction_test['OAConc']) + '_' + \
str(friction_test['Force']) + 'N_' + \
str(friction_test['Speed']) + 'mms' + \
'_Test' + str(friction_test['TestNo'])
# TODO Reformat with f strings
subfolder_name = f'OA_{str(friction_test["OAConc"])}/'
# For Jackson's greases
elif friction_test['SampleNo'] == 15:
test_name = f'C20A-20_{friction_test["Force"]}N_{friction_test["Speed"]}mms_Test{friction_test["TestNo"]}'
subfolder_name = f'C20A-20/'
elif friction_test['SampleNo'] == 16:
test_name = f'C20A-20_OA-2_{friction_test["Force"]}N_{friction_test["Speed"]}mms_Test{friction_test["TestNo"]}'
subfolder_name = f'C20A-20_OA-2/'
elif friction_test['SampleNo'] == 17:
test_name = f'TOCN-10_C20A-1_OA-7.5_{friction_test["Force"]}N_{friction_test["Speed"]}mms_Test{friction_test["TestNo"]}'
subfolder_name = f'TOCN-10_C20A-1_OA-7.5/'
Path(plot_folder_name + subfolder_name).mkdir(parents=True, exist_ok=True)
# TODO Eliminate TestNo and Date if friction_dataframe has only one row
plot_name = plot_folder_name + subfolder_name + test_name + '.png'
plt.savefig(plot_name, dpi=1000)
plt.clf()
def cycle_avg_plotter_by_oa(friction_dataframe: pd.DataFrame):
"""
Plots the estimated friction coefficient graph for all tests for a given combination of OA and force
(split by speed). Values are averaged for each semi-cycle and averaged across tests.
Saves the plot as a .png file
:param friction_dataframe: The tests to be analyzed
:return: None
"""
# Specifying colors for different plots
color_set = np.linspace(0, 1, 10)
tab10 = mpl.colormaps['tab10']
for index, test in enumerate(friction_dataframe.iloc()):
print("Plotting test no. ", index + 1)
# averaged_test = cycle_avg(test)
index_list = [i for i in range(len(test['Avg_Friction']))]
# Setting colour logic
color = color_set[index]
# Add legends to this list — this is because the graph itself requires a very thin line with a low alpha,
# but it won't show up in the legend unless the legend parameters are different
# legend_lines = []
plt.plot(index_list,
test['Avg_Friction'],
linestyle='-',
linewidth=0.8,
color=tab10(color),
alpha=0.5,
label=str(test['OAConc']) + '% OA, ' + str(test['Force']) + 'N')
test_speed = friction_dataframe['Speed']
if not all(x == test_speed.iloc[0] for x in test_speed):
print('Test speeds are not the same for each test to be plotted')
plt.ylim(0, 0.35)
plt.title('Estimated Friction Coefficient Averaged \n Per Cycle for ' +
str(test_speed.iloc[0]) + 'mm/s')
plt.xlabel('Cycle Number')
plt.ylabel('Average Estimated Friction Coefficient')
plt.legend(fontsize=6)
plt.viridis()
plot_folder_name = 'Friction Coefficient Graphs/'
subfolder_name = f'OA_ {str(test["OAConc"])}/'
Path(plot_folder_name + subfolder_name).mkdir(parents=True, exist_ok=True)
# TODO Eliminate TestNo and Date if friction_dataframe has only one row
plot_name = plot_folder_name + subfolder_name + \
'AveragedByOAandForce_' + \
str(friction_dataframe['Speed'].iloc[0]) + 'mms'
plt.savefig(plot_name + '.png', dpi=1000)
plt.clf()
if plot_all:
for i in range(len(master)):
test = master.loc[i]
if test['Validity']:
print(f"Plotting test no. {test['TestNo']}, sample no. {test['SampleNo']}")
cycle_avg_plotter(test)
elif plot_some:
for plot_test in plot_test_list:
one_var_master = master.loc[master['Filename'] == plot_test]
if len(one_var_master) != 1:
warnings.warn('Invalid selection of master dataframe')
elif len(one_var_master) == 1:
# Turn one_var_master from a DataFrame with one row to a Series
test = one_var_master.T.squeeze()
print(f"Plotting test no. {test['TestNo']}, sample no. {test['SampleNo']}")
cycle_avg_plotter(test)
elif plot_by_oa:
for oa_dataset in oa_groups.values():
cycle_avg_plotter(oa_dataset)
# TODO BUGGED
elif average_all_tests:
# ---------------
# ORGANIZING DATA
# ---------------
# Create the dictionary of averaged tests by each combination of OA/force, split by speed
avg_groups = {}
speed_values = [20, 100]
for speed in speed_values:
key = f"speed{speed}"
# Store the subset in the dictionary with the appropriate key
avg_groups[key] = []
for oa_key in oa_groups.keys():
param = oa_groups[oa_key]
# Calculate the average friction across tests for a given combination of parameters
param_avg_friction = friction_avg(
param.loc[(param["Speed"] == speed)]["Avg_Friction"]
)
# Append avg_groups[key] with the first row of param for a given speed
avg_groups[key].append(
param.loc[(param['Speed'] == speed)]
.iloc[0]
.drop(["SampleNo", "TestNo", "Date", "Filename"])
.replace({'Avg_Friction': param_avg_friction}))
avg_groups[key] = pd.DataFrame(avg_groups[key])
# --------
# PLOTTING
# --------
for avg_group in avg_groups.values():
cycle_avg_plotter_by_oa(avg_group)
if export_csv:
print('Creating friction .csv file')
csv_path = 'Friction Coefficient CSV/'
csv_name = 'FrictionCoefficientData.csv'
# The DataFrame to be exported — only median tests are included if export_median_only is True
csv_df = master.loc[master['Validity']]
if export_median_only:
csv_df = csv_df.loc[master['Median']]
# Sorry to whoever reads this for this ungodly mess...
# This takes median_df and makes a new dataframe, friction_df, with Filenames as column headers and expands the
# series under Avg_Friction. I am sure there are many much more elegant ways to do this :(
friction_df = csv_df[['Filename', 'Avg_Friction']].set_index('Filename') \
['Avg_Friction'].apply(lambda x: pd.Series(x)).T.fillna(method='ffill')
# Rename column names
def renamer(df: pd.DataFrame, sep='/'):
# Removes everything in the header of the dataframe before the last instance of sep
renamed = df.rsplit(sep, 1)[1]
return renamed
friction_df = friction_df.rename(renamer, axis=1)
# Also get rid of the date (everything after the last instance of '_') in column names
friction_df.columns = friction_df.columns.str.replace("_(?!.*_).*", "", regex=True)
friction_df.to_csv(csv_path + csv_name, index_label='CycleNumber')
# Export AverageFrictionCoefficientData.csv
means = friction_df.mean()
avg_friction_df = pd.DataFrame([means], columns=friction_df.columns)
avg_friction_csv_name = 'AverageFrictionCoefficientData.csv'
avg_friction_df.to_csv(csv_path + avg_friction_csv_name)
# TODO IMPORT LOGGING
# ENABLE FULL TRACEBACK WHAT IS GOING ON??
# Print how long the program took to execute
end_time = time.perf_counter()
print(f'Code executed in {end_time - start_time} seconds.')