Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s4585375 VQVAE OASIS #475

Open
wants to merge 13 commits into
base: topic-recognition
Choose a base branch
from
8 changes: 8 additions & 0 deletions recognition/45853757-VQVAE/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Implementing a VQ-VAE for the OASIS dataset
VQ-VAEs are a variety of variational autoencoders (hence VAE), which take on the ideas of vector quantisation (VQ) to improve their effectiveness.

My VQVAE model was constructed based on the implementation from Keras at: https://keras.io/examples/generative/vq_vae/

The OASIS dataset was downloaded through blackboard and preprocessing it included normalising the images.

I did not get very conclusive results as my GPU cannot handle this many data samples and I could not finid an adequate solution.
62 changes: 62 additions & 0 deletions recognition/45853757-VQVAE/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
import numpy as np
from tensorflow.keras.utils import load_img, img_to_array


def preprocess_data(training_data, validation_data, testing_data):
"""
Normalises each data set and finds the variance of training data

Parameters:
training_data (list): list of arrays representing the images to train the model on
validation_data (list): list of arrays representing the images to validate the model on
testing_data (list): list of arrays representing the images to test the model on

Returns:
training_data (list): normalised list of arrays
validation_data (list): normalised list of arrays
testing_data (list): normalised list of arrays
variance (float): variance of the training data
"""
training_data = np.array(training_data)
training_data = training_data.astype('float16') / 255.

validation_data = np.array(validation_data)
validation_data = validation_data.astype('float16') / 255.

testing_data = np.array(testing_data)
testing_data = testing_data.astype('float16') / 255.

variance = np.var(training_data / 255.)

return training_data, validation_data, testing_data, variance


def load_data():
"""
Loads the data to be used for training, validating and testing the model.

Params: None

Returns:
Three normalised data sets of images for training,
validation and testing and the variance of the training dataset.
"""
# Initialise three empty lists for our data to be stored appropriately
training_data = []
validation_data = []
testing_data = []

# Create list pairs of the directories for the files with the images, and
# which list they should be sorted into
location_and_data_category = [["D:/keras_png_slices_data/keras_png_slices_train", \
training_data], ["D:/keras_png_slices_data/keras_png_slices_vaildate", \
validation_data], ["D:/keras_png_slices_data/keras_png_slices_test", \
testing_data]]

# Find and store each image in each file into the correct list
for dataset in location_and_data_category:
for file_name in os.listdir(dataset[0]):
dataset[1].append(img_to_array(load_img(os.path.join(dataset[0], file_name), color_mode="grayscale")))

return preprocess_data(training_data, validation_data, testing_data)
118 changes: 118 additions & 0 deletions recognition/45853757-VQVAE/modules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
import numpy as np

length = 256
depth = 16
kernel = 3

def create_encoder(latent_dim=16):
""" Create a simple encoder sequential layer """
encoder = tf.keras.Sequential(name="encoder")
encoder.add(layers.Conv2D(depth, kernel, activation="relu", strides=2, padding="same", input_shape=(length, length, 1)))
encoder.add(layers.Conv2D(depth*2, kernel, activation="relu", strides=2, padding="same"))
encoder.add(layers.Conv2D(depth*4, kernel, activation="relu", strides=2, padding="same"))
encoder.add(layers.Conv2D(depth*8, kernel, activation="relu", strides=2, padding="same"))
encoder.add(layers.Conv2D(latent_dim, 1, padding="same"))
return encoder

def create_decoder():
""" Create a simple decoder sequential layer """
decoder = tf.keras.Sequential(name="decoder")
decoder.add(layers.Conv2D(depth*8, kernel, activation="relu", strides=2, padding="same"))
decoder.add(layers.Conv2D(depth*4, kernel, activation="relu", strides=2, padding="same"))
decoder.add(layers.Conv2D(depth*2, kernel, activation="relu", strides=2, padding="same"))
decoder.add(layers.Conv2D(depth, kernel, activation="relu", strides=2, padding="same"))
decoder.add(layers.Conv2D(1, kernel, padding="same"))
return decoder


class VQLayer(layers.Layer):
def __init__(self, n_embeddings, embedding_dim, beta=0.25, **kwargs):
super().__init__(**kwargs)
self.embedding_dim = embedding_dim
self.n_embeddings = n_embeddings
self.beta = beta

# Initialise embeddings
w_init = tf.random_uniform_initializer()
self.embeddings = tf.Variable(
initial_value=w_init(shape=(self.embedding_dim, self.n_embeddings),
dtype="float32"), trainable=True, name="vqvae_embeddings"
)

def call(self, x):
# Calc then flatten inputs, not incl embedding dimension
input_shape = tf.shape(x)
flattened = tf.reshape(x, [-1, self.embedding_dim])

# Perform quantisation then reshape quantised values to orig shape
encoding_indices = self.get_code_indices(flattened)
encodings = tf.one_hot(encoding_indices, self.n_embeddings)
quantised = tf.matmul(encodings, self.embeddings, transponse_b=True)
quantised = tf.reshape(quantised, input_shape)

# Vector quntisation loss is added to the layer
commitment_loss = tf.reduce_mean((tf.stop_gradient(quantised) - x) ** 2)
codebook_loss = tf.reduce_mean((quantised - tf.stop_gradient(x)) ** 2)
self.add_loss(self.beta * commitment_loss - codebook_loss)

# Straight-through estimator
return x + tf.stop_gradient(quantised - x)

def get_code_indices(self, flattened_inputs):
# Calc L2-normalised dist between inputs and codes
similarity = tf.matmul(flattened_inputs, self.embeddings)
distances = (
tf.reduce_sum(flattened_inputs ** 2, axis=1, keepdims=True)
+ tf.reduce_sum(self.embeddings ** 2, axis=0) - 2 * similarity
)

# Derive for min distances
encoding_indices = tf.argmin(distances, axis=1)
return encoding_indices


class VQVAEModel(tf.keras.Sequential):
def __init__(self, variance, latent_dim, n_embeddings, **kwargs):
super(VQVAEModel, self).__init__(**kwargs)
self.variance = variance
self.latent_dim = latent_dim
self.n_embeddings = n_embeddings

# Build our model
self.add(VQLayer(n_embeddings, latent_dim, name="vector quantiser"))
self.add(create_encoder(latent_dim))
self.add(create_decoder())

# Measure our losses
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(name="recontruction_loss")
self.vq_loss_tracker = keras.metrics.Mean(name="vq_loss")

@property
def metrics(self):
return [self.total_loss_tracker, self.reconstruction_loss_tracker, self.vq_loss_tracker]

def train_step(self, x):
# Calculates the losses from the VQ-VAE
with tf.GradientTape() as tape:
reconstructions = self.call(x)
reconstruction_loss = (tf.reduce_mean((x - reconstructions) ** 2) / self.variance)
total_loss = reconstruction_loss + sum(self.losses)

# Backpropagates our values
grads = tape.gradient(total_loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(grads, self.trainable_variables))

# Implement loss tracking
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.vq_loss_tracker.update_state(sum(self.losses))

return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"vqvae_loss": self.vq_loss_tracker.result()
}
33 changes: 33 additions & 0 deletions recognition/45853757-VQVAE/predict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from dataset import *
from modules import *
from train import *
import tensorflow as tf
import matplotlib.pyplot as plt

def calculate_ssim(original_data, predicted_data):
""" Calculates and the average of the SSIM of all images in the two sets of data as a percentage. """
ssim = tf.image.ssim(original_data, predicted_data, max_val=1)
print("SSIM of data sets:", ssim)

def compare_predicted(original_data, predicted_data, index):
""" Plots the original and predicted image at the given index """
fig = plt.figure()
ax = fig.add_subplot(1, 2, 1)
imgplot = plt.imshow(original_data[index])
ax.set_title("Original Image")

ax = fig.add_subplot(1, 2, 2)
imgplot = plt.imshow(predicted_data[index])
ax.set_title("Reconstructed Image")
fig.show()

def plot_loss(model):
return None

training_data, validation_data, testing_data, data_variance = load_data()
model = train_vqvae()

predictions = model.predict(testing_data)

calculate_ssim(testing_data, predictions)
compare_predicted(testing_data, predictions, 8)
17 changes: 17 additions & 0 deletions recognition/45853757-VQVAE/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import tensorflow as tf
from modules import *
from dataset import *

batch_size = 64
epochs = 100

def train_vqvae():
# Load our data
training_data, validation_data, testing_data, data_variance = load_data()

# Construct and train our model
vqvae_model = VQVAEModel(variance=data_variance, latent_dim=16, n_embeddings=128)
vqvae_model.compile(optimizer=keras.optimizers.Adam())
print(vqvae_model.summary())

return vqvae_model.fit(training_data, training_data, validation_data=(validation_data, validation_data), epochs=epochs, batch_size=batch_size)