import copy
from keras import Model
from sklearn.model_selection import KFold
from sklearn.metrics import f1_score
from statistics import mean
import numpy as np
import math
from metamorphic_relations.Results import Results
from metamorphic_relations.MR import MR
from metamorphic_relations.Info import Info
from metamorphic_relations.Data import Data
from metamorphic_relations.Transform import Transform
import logging
LOG_FORMAT = '%(levelname) -s %(asctime)s %(message)s'
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger.info('logging started')
[docs]class MRModel:
"""
Creates and MRModel object
:param data: the data to be used with the model
:param model: the ML model
:param function transform_x: the transform of the x from the data representation to the expected input to the model
:param function transform_y: the transform of the y from the data representation to the expected output from the model
:param GMRs: list of Generic Metamorphic Relations (GMRs)
:param DSMRs: list of Domain Specific Metamorphic Relations (DSMRs)
"""
def __init__(self, data: Data, model: Model, transform_x=None, transform_y=None, GMRs: list[Transform] = None,
DSMRs: list[Transform] = None):
self.data = data
self.model = model
self.GMRs = MR(GMRs)
self.DSMRs = MR(DSMRs)
self.all_MRs = MR(GMRs + DSMRs)
if transform_x is not None:
self.transform_x = transform_x
else:
self.transform_x = lambda x: x
if transform_y is not None:
self.transform_y = transform_y
else:
self.transform_y = self.y_1D_to_2D
# The randomly generated initial weights of the model are saved to file so the model can be quickly reset without
# recompiling the whole model
self.model.save_weights("Output/initial_weights.h5")
test_x, test_y = self.transform_data(self.data.test_x, self.data.test_y)
self.data.update_test(test_x, test_y)
[docs] def compare_MR_sets_counts(self, max_composite: int = 1, min_i: int = 4,
compare_sets: tuple[bool, bool, bool, bool] = (True, True, True, True)) -> tuple[Results, list[Model]]:
"""
Trains the model on each set of MRs using increasing proportions of the data
:param max_composite: determines the max number of MRs that can be applied consecutively to produce new training data
:param min_i: the smallest set of data to test calculated as 2**min_i
:param compare_sets:the sets of results to compare of ["original", "GMRs", "DSMRs", "all_MRs"]. E.g. [True, False, True, False] compares ["original", "DSMRs"]
:return: a Results object and list of the best model for each set (in the same order as the input to compare_sets)
"""
if len(compare_sets) != 4:
raise Exception("compare_sets must have four boolean values")
if min_i < 0:
raise Exception("min_i must be greater than or equal to 0")
self.GMRs.update_composite(max_composite)
self.DSMRs.update_composite(max_composite)
self.all_MRs.update_composite(max_composite)
max_i = int(math.ceil(math.log2(len(self.data.train_x))))
i_vals = [int(2 ** i) for i in range(min_i, max_i)]
results = Results()
models = []
if compare_sets[0]:
results.original_results, model = self.get_results(MR([]), i_vals=i_vals)
models.append(model)
if compare_sets[1]:
results.GMR_results, model = self.get_results(self.GMRs, i_vals=i_vals)
models.append(model)
if compare_sets[2]:
results.DSMR_results, model = self.get_results(self.DSMRs, i_vals=i_vals)
models.append(model)
if compare_sets[3]:
results.all_MR_results, model = self.get_results(self.all_MRs, i_vals=i_vals)
models.append(model)
return results, models
[docs] def compare_MR_sets(self, max_composite: int = 1, compare_sets: tuple[bool, bool, bool, bool] = (True, True, True, True)) -> tuple[Results, list[Model]]:
"""
Trains the model on each set of MRs using all the training data
:param max_composite: default = 1, determines the max number of MRs that can be applied consecutively to produce new training data
:param compare_sets:the sets of results to compare of ["original", "GMRs", "DSMRs", "all_MRs"]. E.g. [True, False, True, False] compares ["original", "DSMRs"]
:return: a Results object and list with the model for each set (in the same order as the input to compare_sets)
"""
if len(compare_sets) != 4:
raise Exception("compare_sets must have four boolean values")
self.GMRs.update_composite(max_composite)
self.DSMRs.update_composite(max_composite)
self.all_MRs.update_composite(max_composite)
results = Results()
models = []
if compare_sets[0]:
results.original_results, model = self.get_results(MR([]))
models.append(model)
if compare_sets[1]:
results.GMR_results, model = self.get_results(self.GMRs)
models.append(model)
if compare_sets[2]:
results.DSMR_results, model = self.get_results(self.DSMRs)
models.append(model)
if compare_sets[3]:
results.all_MR_results, model = self.get_results(self.all_MRs)
models.append(model)
return results, models
[docs] def compare_MR_counts(self, max_composite: int = 1, min_i: int = 4) -> tuple[Results, list[Model]]:
"""
Trains the model on each MR individually using increasing proportions of the data
:param max_composite: default = 1, determines the max number of MRs that can be applied consecutively to produce new training data
:param min_i: the smallest set of data to test calculated as 2**min_i
:return: a Results object and list of the best model for each individual MR (in the same order as results)
"""
if min_i < 0:
raise Exception("min_i must be greater than or equal to 0")
self.GMRs.update_composite(max_composite)
self.DSMRs.update_composite(max_composite)
self.all_MRs.update_composite(max_composite)
max_i = int(math.ceil(math.log2(len(self.data.train_x))))
i_vals = [int(2 ** i) for i in range(min_i, max_i)]
results = Results()
models = []
results.original_results = self.get_results(MR([]), i_vals=i_vals)
all_results = []
if max_composite == 1:
for i in range(len(self.GMRs.MR_list)):
res, model = self.get_results(i_vals=i_vals, MR_list=self.GMRs.MR_list[i])
res.set_name(self.GMRs.MR_list_names[i])
all_results.append(res)
models.append(model)
for i in range(len(self.DSMRs.MR_list)):
res, model = self.get_results(i_vals=i_vals, MR_list=self.DSMRs.MR_list[i])
res.set_name(self.DSMRs.MR_list_names[i])
all_results.append(res)
models.append(model)
if max_composite != 1:
for i in range(len(self.all_MRs.MR_list)):
res, model = self.get_results(i_vals=i_vals, MR_list=self.all_MRs.MR_list[i])
res.set_name(self.all_MRs.MR_list_names[i])
all_results.append(res)
models.append(model)
results.individual_results = all_results
return results, models
[docs] def compare_MRs(self, max_composite: int = 1) -> tuple[Results, list[Model]]:
"""
Trains the model on each MR individually using all the training data
:param max_composite: default = 1, determines the max number of MRs that can be applied consecutively to produce new training data
:return: a Results object and list of the best model for each individual MR (in the same order as results)
"""
self.GMRs.update_composite(max_composite)
self.DSMRs.update_composite(max_composite)
self.all_MRs.update_composite(max_composite)
results = Results()
models = []
results.original_results, model = self.get_results(MR([]))
models.append(model)
all_results = []
if max_composite == 1:
for i in range(len(self.GMRs.MR_list)):
res, model = self.get_results(MR_list=self.GMRs.MR_list[i])
res.set_name(self.GMRs.MR_list_names[i])
all_results.append(res)
models.append(model)
for i in range(len(self.DSMRs.MR_list)):
res, model = self.get_results(MR_list=self.DSMRs.MR_list[i])
res.set_name(self.DSMRs.MR_list_names[i])
all_results.append(res)
models.append(model)
if max_composite != 1:
for i in range(len(self.all_MRs.MR_list)):
res, model = self.get_results(MR_list=self.all_MRs.MR_list[i])
res.set_name(self.all_MRs.MR_list_names[i])
all_results.append(res)
models.append(model)
results.individual_results = all_results
return results, models
[docs] def get_results(self, MR_obj: MR = None, MR_list: tuple[Transform, list] = None, i_vals: list[int] = None) -> tuple[Info, Model]:
"""
Returns the results of training the data on the model with the MRs
:param MR_obj: the MR object - this should be given if the MR tree should be used i.e. to get the results when using set of MRs
:param MR_list: a list of transforms to be performed compositely - this should be used when getting results for an individual (potentially composite) MR
:param i_vals: the intervals to get results for
:return: an Info object containing the results for this MR
"""
if i_vals is None:
i_vals = [len(self.data.train_x)]
if (MR_obj is None and MR_list is None) or (MR_obj is not None and MR_list is not None):
raise Exception("Exactly one of: MR_obj or MR_list must be provided")
results = []
best_model = None
best_test_f1 = 0.0
for i in i_vals:
# Takes the first sample of elements
new_train_x, new_train_y = self.data.get_train_subset(i_max=i)
# Performs the MRs and returns only the new data
if MR_list is None:
MR_train_x, MR_train_y = MR_obj.perform_MRs_tree(new_train_x, new_train_y, self.data.max_y)
else:
MR_train_x, MR_train_y = MR.perform_MRs_list(MR_list, new_train_x, new_train_y, self.data.max_y)
new_train_x, new_train_y = self.transform_data(MR_train_x, MR_train_y)
logging.info("got new data")
# Trains and tests the model given the collected data
train_f1 = self.train_model(new_train_x, new_train_y)
logging.info("trained")
test_f1 = self.test_model()
logging.info((i, len(new_train_x), train_f1, test_f1))
results.append((i, len(new_train_x), train_f1, test_f1))
if test_f1 > best_test_f1:
best_model = copy.deepcopy(self.model)
set_result = Info.list_to_info(results)
return set_result, best_model
[docs] def train_model(self, train_x: np.array, train_y: np.array, k: int = 5) -> float:
"""
Trains the model and sets it to the best performing model of the k folds
:param train_x: the x data
:param train_y: the y data
:param k: the number of folds for k fold validation
:return: the mean macro f1 score over the training folds
"""
if len(train_x) != len(train_y):
raise Exception("Training data and labels must have the same length")
if k <= 0:
raise Exception("k must be a positive integer")
# Classic deep learning classification
# https://machinelearningmastery.com/tensorflow-tutorial-deep-learning-with-tf-keras/
kf = KFold(n_splits=k, shuffle=True)
f1 = []
i = 0
# Splits the data into k folds, trains the model with k-1 folds and evaluates the macro f1 with the remaining fold
# The mean of these are taken to give a better estimate of performance than taking out a single validation set
for train_index, val_index in kf.split(train_x):
train_xk, val_xk = train_x[train_index], train_x[val_index]
train_yk, val_yk = train_y[train_index], train_y[val_index]
# Resets the model weights
self.model.load_weights("Output/initial_weights.h5")
self.model.fit(train_xk, train_yk, epochs=20, batch_size=100, verbose=0)
self.model.save("Output/train_weights" + str(i) + ".h5")
f1.append(self.test_model(val_xk, val_yk))
i += 1
# After the f1s have been found for each fold the model is trained using all the available data for the best performance
best_train = np.argmax(np.array(f1))
self.model.load_weights("Output/train_weights" + str(best_train) + ".h5")
return mean(f1)
[docs] def test_model(self, test_x: np.array = None, test_y: np.array = None) -> float:
"""
Tests the model to find macro f1 scores
:param test_x: the x data (default uses the data.train_x)
:param test_y: the y data (default uses the data.train_y)
:return: the macro f1 score
"""
if test_x is None:
test_x = self.data.test_x
if test_y is None:
test_y = self.data.test_y
if len(test_x) != len(test_y):
raise Exception("Testing data and labels must have the same length")
f1 = f1_score(self.y_2D_to_1D(self.model.predict(test_x, verbose=0)), self.y_2D_to_1D(test_y),
average='macro')
return f1
[docs] @staticmethod
def concat(x: np.array, y: np.array, new_x: np.array, new_y: np.array) -> tuple[np.array, np.array]:
"""
Concatenates 2 arrays
:param x: x data
:param y: y data
:param new_x: data to add to x
:param new_y: data to add to y
:return: (x + new_x, y + new_y)
"""
x = np.concatenate((x, new_x))
y = np.concatenate((y, new_y))
return x, y
[docs] @staticmethod
def y_2D_to_1D(y: np.array) -> np.array:
"""
Reshapes a 2D array to a 1D array containing the index of the highest value.
E.g. [[0, 1], [1, 0]] -> [1, 0]
:param y: the original array
:return: the 1D array
"""
if len(y.shape) != 2:
raise Exception("y must have two dimensions")
new_y = np.zeros(y.shape[0])
for i in range(y.shape[0]):
new_y[i] = np.argmax(y[i])
return new_y
[docs] @staticmethod
def y_1D_to_2D(y: np.array, max_y: int) -> np.array:
"""
Reshapes a 1D array to a 2D array containing the index of the highest value.
E.g. [1, 0] -> [[0, 1], [1, 0]]
:param y: the original array
:param max_y: the largest possible value in the array
:return: the 2D array
"""
if len(y.shape) != 1:
raise Exception("y must have one dimensions")
new_y = np.zeros((y.shape[0], max_y))
# Reshapes a 1D array to a 2D array containing the 1 at the index of the y value
# E.g. [1, 0] -> [[0, 1], [1, 0]]
for i in range(len(y)):
if y[i] >= max_y:
raise Exception("max_y must be larger than all values in y")
new_y[i][y[i]] = 1
return new_y