as the title suggests, I'm trying to convert a notebook in keras 3, but idk why it's not working properly.
This is the original notebook (tell me if it's better to put here the code), I'm trying to do the same task but with car fronts; at the end you can find my code.
I managed to make it work with keras 2, now converting it to keras 3, but in kaggle seems really slow: I tried disabling GPU and it gets worse, so I know that it's using the GPU but CPU is always 100% and I can't figure why. My updated code is 5 times slower then the original one, while using old keras it has the same speed of the original code.
How can I do better? There is something that I'm missing? I followed keras official guide to convert keras 2 to keras 3, I'm thinking about learning pytorch but my model should run on constraint hardware, so I want to use tf lite to convert the model automatically (I'll use tf as backend for keras 3)
Furthermore, the model receives 5 tensors without using data = data[0], because data[0] is the list of batches (anchors, positive and negative) while data1 and data[2] are empty. This problem is in train_step and test_step, how can I manage that problem in a cleaner way?
My updated code for keras 3 (I prefer to copy this so it doesn't changes, I'll make a static notebook with a run if you prefer to see the correctness of code but the 20 s/it or more as speed):
import os
import zipfile
import random
import math
import pandas as pd
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import tensorflow as tf
import keras
from keras.models import Model
from keras.layers import Layer, Flatten, Dense,\
Dropout, BatchNormalization, Input
from keras.metrics import Mean, CosineSimilarity
from keras.optimizers import Adam
from keras.utils import plot_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.applications.efficientnet import EfficientNetB7, preprocess_input
os.environ["KERAS_BACKEND"] = "tensorflow"
#!conda install -y gdown
if not(os.path.isdir('/kaggle/working/confirmed_fronts')):
!gdown
!unzip -qq Confirmed_fronts.zip
!rm -rf /kaggle/working/Confirmed_fronts.zip
%cd /kaggle/working/confirmed_fronts
!find . -type f | awk -F '/' '{print $4}' | awk -F '\\$\\$' 'BEGIN{print "Maker name,Model name,Registration year,Color,Genmodel ID,Adv ID,Image index"}1 {print $1 "," $2 "," $3 "," $4 "," $5 "," $6 "," $7}' > car_fronts.csv
df2 = pd.read_csv('car_fronts.csv', delimiter=',')
df2 = df2.dropna(how='all',axis=0)
df2['Registration year'] = df2['Registration year'].astype(int)
df2['Adv ID'] = df2['Adv ID'].astype(int)
df2
df2vc = df2[['Maker name', 'Model name', 'Registration year', 'Color']].value_counts()
df2vc
df2vcFiltered = df2vc[df2vc >= 5]
df2vcFiltered
def triplets_generator(max_triplets=10, sample_lim=len(df2vcFiltered), val_set_perc=0.2, test_set_perc=0.1, print_paths=False, high_filter=False):
images_tr = []
images_val = []
images_te = []
for row in range(sample_lim):
images_row = []
index = df2vcFiltered.index[row]
for i in range(max_triplets):
random_choice1 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
path1 = str(random_choice1['Maker name'].iloc[0]) + '/' + str(random_choice1['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice1['Maker name'].iloc[0]) + '$$' + str(random_choice1['Model name'].iloc[0]) + '$$' + str(random_choice1['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice1['Color'].iloc[0]) + '$$' + str(random_choice1['Genmodel ID'].iloc[0]) + '$$' + str(random_choice1['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice1['Image index'].iloc[0])
while True: #TODO better way to write it? Do while not implemented
random_choice2 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
if random_choice1.index != random_choice2.index:
break
path2 = str(random_choice2['Maker name'].iloc[0]) + '/' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice2['Maker name'].iloc[0]) + '$$' + str(random_choice2['Model name'].iloc[0]) + '$$' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice2['Color'].iloc[0]) + '$$' + str(random_choice2['Genmodel ID'].iloc[0]) + '$$' + str(random_choice2['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice2['Image index'].iloc[0])
random_choice3 = df2[(df2['Maker name'] != index[0]) | (df2['Model name'] != index[1]) | (abs(df2['Registration year'] - index[2]) >= 10) | (df2['Color'] != index[3])].sample()
path3 = str(random_choice3['Maker name'].iloc[0]) + '/' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice3['Maker name'].iloc[0]) + '$$' + str(random_choice3['Model name'].iloc[0]) + '$$' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice3['Color'].iloc[0]) + '$$' + str(random_choice3['Genmodel ID'].iloc[0]) + '$$' + str(random_choice3['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice3['Image index'].iloc[0])
a = cv2.cvtColor(cv2.imread(path1), cv2.COLOR_BGR2RGB)
p = cv2.cvtColor(cv2.imread(path2), cv2.COLOR_BGR2RGB)
n = cv2.cvtColor(cv2.imread(path3), cv2.COLOR_BGR2RGB)
if high_filter: #TODO if only a single channel is used, then architecture has to be (_,_,1); is there a way to be channel size independent?
for img in [a,p,n]:
temp = 0.2989 * img[:,:,0] + 0.5870 * img[:,:,1] + 0.1140 * img[:,:,2]
temp = temp - ndimage.gaussian_filter(temp,3)
img[:,:,0] = temp
img[:,:,1] = temp
img[:,:,2] = temp
if print_paths:
print(path1)
print(path2)
print(path3)
images_row.append([a,p,n])
random.shuffle(images_row)
tr, val = train_test_split(images_row, shuffle=True, test_size=test_set_perc+val_set_perc)
val, te = train_test_split(val, shuffle=True, test_size=test_set_perc)
images_tr.extend(tr)
images_val.extend(val)
images_te.extend(te)
return images_tr, images_val, images_te
#TODO add random_state?
train, val, test = triplets_generator(sample_lim=200)
print(len(train))
print(len(train[0]))
def batch_generator(triplets, batch_size=32, augment=True):
total_triplets = len(triplets)
random.shuffle(triplets)
datagen = ImageDataGenerator(
rotation_range=10,
width_shift_range=0.05,
height_shift_range=0.05,
horizontal_flip=True,
zoom_range=0.2
)
for i in range(0, total_triplets, batch_size):
batch_triplets = triplets[i : i+batch_size]
anchor_batch = []
positive_batch = []
negative_batch = []
for triplet in batch_triplets:
anchor, positive, negative = triplet
anchor_image = anchor
positive_image = positive
negative_image = negative
if augment:
anchor_image = datagen.random_transform(anchor_image)
positive_image = datagen.random_transform(positive_image)
negative_image = datagen.random_transform(negative_image)
anchor_batch.append(anchor_image)
positive_batch.append(positive_image)
negative_batch.append(negative_image)
yield np.array(anchor_batch), np.array(positive_batch), np.array(negative_batch)
batch=6
print(len([next(batch_generator(train, batch))]))
print(len([next(batch_generator(train, batch))][0]))
print(len([next(batch_generator(train, batch))][0][0]))
def print_triplets(triplets):
a, p, n = triplets
for i in range(len(a)):
fig, axarr = plt.subplots(1,3, figsize=(10, 4))
axarr[0].imshow(a[i])
axarr[0].title.set_text('Anchor')
axarr[1].imshow(p[i])
axarr[1].title.set_text('Positive')
axarr[2].imshow(n[i])
axarr[2].title.set_text('Negative')
plt.show()
print_triplets([next(batch_generator(train, 6))][0])
def get_embedding(input_shape, num_layers_to_unfreeze=25):
base_model = EfficientNetB7(weights='imagenet',
input_shape=input_shape,
include_top=False,
pooling='avg')
for i in range(len(base_model.layers)-num_layers_to_unfreeze):
base_model.layers[i].trainable = False
embedding = keras.models.Sequential([
base_model,
Flatten(),
Dense(512, activation='relu'),
BatchNormalization(),
Dropout(0.3),
Dense(256, activation='relu'),
BatchNormalization(),
Dropout(0.3),
Dense(128, activation='relu'),
BatchNormalization(),
Dense(128)
], name='Embedding')
return embedding
input_shape = (300, 300, 3)
embedding = get_embedding(input_shape)
embedding.summary()
@keras.saving.register_keras_serializable()
class DistanceLayer(Layer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@tf.function
def call(self, anchor, positive, negative):
ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
return ap_distance, an_distance
anchor_input = Input(name='anchor', shape=input_shape)
positive_input = Input(name='positive', shape=input_shape)
negative_input = Input(name='negative', shape=input_shape)
distances = DistanceLayer()(
embedding(preprocess_input(anchor_input)),
embedding(preprocess_input(positive_input)),
embedding(preprocess_input(negative_input))
)
siamese_net = Model(
inputs=[anchor_input,
positive_input,
negative_input],
outputs=distances
)
@keras.saving.register_keras_serializable()
class SiameseModel(Model):
def __init__(self, siamese_net, margin=0.5):
super().__init__()
self.siamese_net = siamese_net
self.margin = margin
self.loss_tracker = Mean(name='loss')
self.accuracy_tracker = Mean(name='accuracy')
@tf.function
def call(self, inputs):
return self.siamese_net(inputs) #WTF
def train_step(self, *args, **kwargs):
if keras.backend.backend() == "jax": #TODO
return self._jax_train_step(*args, **kwargs)
elif keras.backend.backend() == "tensorflow":
#########################
print('TF')
#########################
return self._tensorflow_train_step(*args, **kwargs)
elif keras.backend.backend() == "torch": #TODO
return self._torch_train_step(*args, **kwargs)
def _tensorflow_train_step(self, data):
#########################
print(data)
print(len(data))
#########################
data = data[0]
with tf.GradientTape() as tape:
loss = self._compute_loss(data)#[0])
gradients = tape.gradient(loss, self.siamese_net.trainable_weights)
self.optimizer.apply_gradients(
zip(gradients, self.siamese_net.trainable_weights)
)
self.loss_tracker.update_state(loss)
accuracy = self._compute_accuracy(data)#[0]) #WTF
self.accuracy_tracker.update_state(accuracy)
return {'loss': self.loss_tracker.result(),
'accuracy': self.accuracy_tracker.result()}
def test_step(self, data):
data = data[0] #WTF
loss = self._compute_loss(data)#[0]) #WTF
self.loss_tracker.update_state(loss)
accuracy = self._compute_accuracy(data)
self.accuracy_tracker.update_state(accuracy)
return {'loss': self.loss_tracker.result(),
'accuracy': self.accuracy_tracker.result()}
def _compute_loss(self, data):
ap_distance, an_distance = self.siamese_net(data)
loss = ap_distance - an_distance
loss = tf.maximum(loss + self.margin, .0)
return loss
def _compute_accuracy(self, data):
ap_distance, an_distance = self.siamese_net(data)
accuracy = tf.reduce_mean(tf.cast(ap_distance < an_distance,
tf.float32))
return accuracy
@property
def metrics(self):
return [self.loss_tracker, self.accuracy_tracker]
def get_config(self):
base_config = super().get_config()
config = {
'siamese_net': keras.saving.serialize_keras_object(self.siamese_net),
'margin': keras.saving.serialize_keras_object(self.margin),
'loss_tracker': keras.saving.serialize_keras_object(self.loss_tracker),
'accuracy_tracker': keras.saving.serialize_keras_object(self.accuracy_tracker),
}
return {**base_config, **config}
@classmethod
def from_config(cls, config):
config['siamese_net'] = keras.saving.deserialize_keras_object(config.pop('siamese_net'))
config['margin'] = keras.saving.deserialize_keras_object(config.pop('margin'))
config['loss_tracker'] = keras.saving.deserialize_keras_object(config.pop('loss_tracker'))
config['accuracy_tracker'] = keras.saving.deserialize_keras_object(config.pop('accuracy_tracker'))
return cls(**config)
def train_model(model,
train_triplets,
epochs,
batch_size,
val_triplets,
patience,
delta=0.0001):
best_val_accuracy = 0
best_val_loss = float('inf')
temp_patience = patience
history = {
'loss': [],
'val_loss': [],
'accuracy': [],
'val_accuracy': []
}
train_steps_per_epoch = math.ceil(len(train_triplets) / batch_size)
val_steps_per_epoch = math.ceil(len(val_triplets) / batch_size)
for epoch in range(epochs):
print(f'Epoch {epoch+1}/{epochs}')
train_loss = 0.
train_accuracy = 0.
val_loss = 0.
val_accuracy = 0.
with tqdm(total=train_steps_per_epoch, desc='Training') as pbar:
for batch in batch_generator(train_triplets, batch_size=batch_size):
loss, accuracy = model.train_on_batch(batch)
train_loss += loss
train_accuracy += accuracy
pbar.update()
pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})
with tqdm(total=val_steps_per_epoch, desc='Validation') as pbar:
for batch in batch_generator(val_triplets, batch_size=batch_size):
loss, accuracy = model.test_on_batch(batch)
val_loss += loss
val_accuracy += accuracy
pbar.update()
pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})
train_loss /= train_steps_per_epoch
train_accuracy /= train_steps_per_epoch
val_loss /= val_steps_per_epoch
val_accuracy /= val_steps_per_epoch
history['loss'].append(train_loss)
history['accuracy'].append(train_accuracy)
history['val_loss'].append(val_loss)
history['val_accuracy'].append(val_accuracy)
print(f'\nTrain Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}\n')
if val_accuracy > best_val_accuracy:
best_val_accuracy = val_accuracy
model.layers[0].layers[3].save_weights('best_model.weights.h5')
if val_loss - best_val_loss > delta:
temp_patience -= 1
if temp_patience == 0:
print('Early stopping: Validation loss did not improve.')
break
else:
best_val_loss = val_loss
temp_patience = patience
return model, history
siamese_model = SiameseModel(siamese_net)
siamese_model.jit_compile = False
siamese_modelpile(optimizer=Adam(0.00001))
siamese_model, history = train_model(siamese_model,
train_triplets=train,
epochs=200,
batch_size=64,
val_triplets=val,
patience=3)
as the title suggests, I'm trying to convert a notebook in keras 3, but idk why it's not working properly.
This is the original notebook (tell me if it's better to put here the code), I'm trying to do the same task but with car fronts; at the end you can find my code.
I managed to make it work with keras 2, now converting it to keras 3, but in kaggle seems really slow: I tried disabling GPU and it gets worse, so I know that it's using the GPU but CPU is always 100% and I can't figure why. My updated code is 5 times slower then the original one, while using old keras it has the same speed of the original code.
How can I do better? There is something that I'm missing? I followed keras official guide to convert keras 2 to keras 3, I'm thinking about learning pytorch but my model should run on constraint hardware, so I want to use tf lite to convert the model automatically (I'll use tf as backend for keras 3)
Furthermore, the model receives 5 tensors without using data = data[0], because data[0] is the list of batches (anchors, positive and negative) while data1 and data[2] are empty. This problem is in train_step and test_step, how can I manage that problem in a cleaner way?
My updated code for keras 3 (I prefer to copy this so it doesn't changes, I'll make a static notebook with a run if you prefer to see the correctness of code but the 20 s/it or more as speed):
import os
import zipfile
import random
import math
import pandas as pd
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import tensorflow as tf
import keras
from keras.models import Model
from keras.layers import Layer, Flatten, Dense,\
Dropout, BatchNormalization, Input
from keras.metrics import Mean, CosineSimilarity
from keras.optimizers import Adam
from keras.utils import plot_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.applications.efficientnet import EfficientNetB7, preprocess_input
os.environ["KERAS_BACKEND"] = "tensorflow"
#!conda install -y gdown
if not(os.path.isdir('/kaggle/working/confirmed_fronts')):
!gdown https://drive.google.com/uc?id=1e1ajCFntWRVeCluTvSJ6gV-2ew6F0ys5
!unzip -qq Confirmed_fronts.zip
!rm -rf /kaggle/working/Confirmed_fronts.zip
%cd /kaggle/working/confirmed_fronts
!find . -type f | awk -F '/' '{print $4}' | awk -F '\\$\\$' 'BEGIN{print "Maker name,Model name,Registration year,Color,Genmodel ID,Adv ID,Image index"}1 {print $1 "," $2 "," $3 "," $4 "," $5 "," $6 "," $7}' > car_fronts.csv
df2 = pd.read_csv('car_fronts.csv', delimiter=',')
df2 = df2.dropna(how='all',axis=0)
df2['Registration year'] = df2['Registration year'].astype(int)
df2['Adv ID'] = df2['Adv ID'].astype(int)
df2
df2vc = df2[['Maker name', 'Model name', 'Registration year', 'Color']].value_counts()
df2vc
df2vcFiltered = df2vc[df2vc >= 5]
df2vcFiltered
def triplets_generator(max_triplets=10, sample_lim=len(df2vcFiltered), val_set_perc=0.2, test_set_perc=0.1, print_paths=False, high_filter=False):
images_tr = []
images_val = []
images_te = []
for row in range(sample_lim):
images_row = []
index = df2vcFiltered.index[row]
for i in range(max_triplets):
random_choice1 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
path1 = str(random_choice1['Maker name'].iloc[0]) + '/' + str(random_choice1['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice1['Maker name'].iloc[0]) + '$$' + str(random_choice1['Model name'].iloc[0]) + '$$' + str(random_choice1['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice1['Color'].iloc[0]) + '$$' + str(random_choice1['Genmodel ID'].iloc[0]) + '$$' + str(random_choice1['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice1['Image index'].iloc[0])
while True: #TODO better way to write it? Do while not implemented
random_choice2 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
if random_choice1.index != random_choice2.index:
break
path2 = str(random_choice2['Maker name'].iloc[0]) + '/' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice2['Maker name'].iloc[0]) + '$$' + str(random_choice2['Model name'].iloc[0]) + '$$' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice2['Color'].iloc[0]) + '$$' + str(random_choice2['Genmodel ID'].iloc[0]) + '$$' + str(random_choice2['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice2['Image index'].iloc[0])
random_choice3 = df2[(df2['Maker name'] != index[0]) | (df2['Model name'] != index[1]) | (abs(df2['Registration year'] - index[2]) >= 10) | (df2['Color'] != index[3])].sample()
path3 = str(random_choice3['Maker name'].iloc[0]) + '/' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice3['Maker name'].iloc[0]) + '$$' + str(random_choice3['Model name'].iloc[0]) + '$$' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice3['Color'].iloc[0]) + '$$' + str(random_choice3['Genmodel ID'].iloc[0]) + '$$' + str(random_choice3['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice3['Image index'].iloc[0])
a = cv2.cvtColor(cv2.imread(path1), cv2.COLOR_BGR2RGB)
p = cv2.cvtColor(cv2.imread(path2), cv2.COLOR_BGR2RGB)
n = cv2.cvtColor(cv2.imread(path3), cv2.COLOR_BGR2RGB)
if high_filter: #TODO if only a single channel is used, then architecture has to be (_,_,1); is there a way to be channel size independent?
for img in [a,p,n]:
temp = 0.2989 * img[:,:,0] + 0.5870 * img[:,:,1] + 0.1140 * img[:,:,2]
temp = temp - ndimage.gaussian_filter(temp,3)
img[:,:,0] = temp
img[:,:,1] = temp
img[:,:,2] = temp
if print_paths:
print(path1)
print(path2)
print(path3)
images_row.append([a,p,n])
random.shuffle(images_row)
tr, val = train_test_split(images_row, shuffle=True, test_size=test_set_perc+val_set_perc)
val, te = train_test_split(val, shuffle=True, test_size=test_set_perc)
images_tr.extend(tr)
images_val.extend(val)
images_te.extend(te)
return images_tr, images_val, images_te
#TODO add random_state?
train, val, test = triplets_generator(sample_lim=200)
print(len(train))
print(len(train[0]))
def batch_generator(triplets, batch_size=32, augment=True):
total_triplets = len(triplets)
random.shuffle(triplets)
datagen = ImageDataGenerator(
rotation_range=10,
width_shift_range=0.05,
height_shift_range=0.05,
horizontal_flip=True,
zoom_range=0.2
)
for i in range(0, total_triplets, batch_size):
batch_triplets = triplets[i : i+batch_size]
anchor_batch = []
positive_batch = []
negative_batch = []
for triplet in batch_triplets:
anchor, positive, negative = triplet
anchor_image = anchor
positive_image = positive
negative_image = negative
if augment:
anchor_image = datagen.random_transform(anchor_image)
positive_image = datagen.random_transform(positive_image)
negative_image = datagen.random_transform(negative_image)
anchor_batch.append(anchor_image)
positive_batch.append(positive_image)
negative_batch.append(negative_image)
yield np.array(anchor_batch), np.array(positive_batch), np.array(negative_batch)
batch=6
print(len([next(batch_generator(train, batch))]))
print(len([next(batch_generator(train, batch))][0]))
print(len([next(batch_generator(train, batch))][0][0]))
def print_triplets(triplets):
a, p, n = triplets
for i in range(len(a)):
fig, axarr = plt.subplots(1,3, figsize=(10, 4))
axarr[0].imshow(a[i])
axarr[0].title.set_text('Anchor')
axarr[1].imshow(p[i])
axarr[1].title.set_text('Positive')
axarr[2].imshow(n[i])
axarr[2].title.set_text('Negative')
plt.show()
print_triplets([next(batch_generator(train, 6))][0])
def get_embedding(input_shape, num_layers_to_unfreeze=25):
base_model = EfficientNetB7(weights='imagenet',
input_shape=input_shape,
include_top=False,
pooling='avg')
for i in range(len(base_model.layers)-num_layers_to_unfreeze):
base_model.layers[i].trainable = False
embedding = keras.models.Sequential([
base_model,
Flatten(),
Dense(512, activation='relu'),
BatchNormalization(),
Dropout(0.3),
Dense(256, activation='relu'),
BatchNormalization(),
Dropout(0.3),
Dense(128, activation='relu'),
BatchNormalization(),
Dense(128)
], name='Embedding')
return embedding
input_shape = (300, 300, 3)
embedding = get_embedding(input_shape)
embedding.summary()
@keras.saving.register_keras_serializable()
class DistanceLayer(Layer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@tf.function
def call(self, anchor, positive, negative):
ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
return ap_distance, an_distance
anchor_input = Input(name='anchor', shape=input_shape)
positive_input = Input(name='positive', shape=input_shape)
negative_input = Input(name='negative', shape=input_shape)
distances = DistanceLayer()(
embedding(preprocess_input(anchor_input)),
embedding(preprocess_input(positive_input)),
embedding(preprocess_input(negative_input))
)
siamese_net = Model(
inputs=[anchor_input,
positive_input,
negative_input],
outputs=distances
)
@keras.saving.register_keras_serializable()
class SiameseModel(Model):
def __init__(self, siamese_net, margin=0.5):
super().__init__()
self.siamese_net = siamese_net
self.margin = margin
self.loss_tracker = Mean(name='loss')
self.accuracy_tracker = Mean(name='accuracy')
@tf.function
def call(self, inputs):
return self.siamese_net(inputs) #WTF
def train_step(self, *args, **kwargs):
if keras.backend.backend() == "jax": #TODO
return self._jax_train_step(*args, **kwargs)
elif keras.backend.backend() == "tensorflow":
#########################
print('TF')
#########################
return self._tensorflow_train_step(*args, **kwargs)
elif keras.backend.backend() == "torch": #TODO
return self._torch_train_step(*args, **kwargs)
def _tensorflow_train_step(self, data):
#########################
print(data)
print(len(data))
#########################
data = data[0]
with tf.GradientTape() as tape:
loss = self._compute_loss(data)#[0])
gradients = tape.gradient(loss, self.siamese_net.trainable_weights)
self.optimizer.apply_gradients(
zip(gradients, self.siamese_net.trainable_weights)
)
self.loss_tracker.update_state(loss)
accuracy = self._compute_accuracy(data)#[0]) #WTF
self.accuracy_tracker.update_state(accuracy)
return {'loss': self.loss_tracker.result(),
'accuracy': self.accuracy_tracker.result()}
def test_step(self, data):
data = data[0] #WTF
loss = self._compute_loss(data)#[0]) #WTF
self.loss_tracker.update_state(loss)
accuracy = self._compute_accuracy(data)
self.accuracy_tracker.update_state(accuracy)
return {'loss': self.loss_tracker.result(),
'accuracy': self.accuracy_tracker.result()}
def _compute_loss(self, data):
ap_distance, an_distance = self.siamese_net(data)
loss = ap_distance - an_distance
loss = tf.maximum(loss + self.margin, .0)
return loss
def _compute_accuracy(self, data):
ap_distance, an_distance = self.siamese_net(data)
accuracy = tf.reduce_mean(tf.cast(ap_distance < an_distance,
tf.float32))
return accuracy
@property
def metrics(self):
return [self.loss_tracker, self.accuracy_tracker]
def get_config(self):
base_config = super().get_config()
config = {
'siamese_net': keras.saving.serialize_keras_object(self.siamese_net),
'margin': keras.saving.serialize_keras_object(self.margin),
'loss_tracker': keras.saving.serialize_keras_object(self.loss_tracker),
'accuracy_tracker': keras.saving.serialize_keras_object(self.accuracy_tracker),
}
return {**base_config, **config}
@classmethod
def from_config(cls, config):
config['siamese_net'] = keras.saving.deserialize_keras_object(config.pop('siamese_net'))
config['margin'] = keras.saving.deserialize_keras_object(config.pop('margin'))
config['loss_tracker'] = keras.saving.deserialize_keras_object(config.pop('loss_tracker'))
config['accuracy_tracker'] = keras.saving.deserialize_keras_object(config.pop('accuracy_tracker'))
return cls(**config)
def train_model(model,
train_triplets,
epochs,
batch_size,
val_triplets,
patience,
delta=0.0001):
best_val_accuracy = 0
best_val_loss = float('inf')
temp_patience = patience
history = {
'loss': [],
'val_loss': [],
'accuracy': [],
'val_accuracy': []
}
train_steps_per_epoch = math.ceil(len(train_triplets) / batch_size)
val_steps_per_epoch = math.ceil(len(val_triplets) / batch_size)
for epoch in range(epochs):
print(f'Epoch {epoch+1}/{epochs}')
train_loss = 0.
train_accuracy = 0.
val_loss = 0.
val_accuracy = 0.
with tqdm(total=train_steps_per_epoch, desc='Training') as pbar:
for batch in batch_generator(train_triplets, batch_size=batch_size):
loss, accuracy = model.train_on_batch(batch)
train_loss += loss
train_accuracy += accuracy
pbar.update()
pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})
with tqdm(total=val_steps_per_epoch, desc='Validation') as pbar:
for batch in batch_generator(val_triplets, batch_size=batch_size):
loss, accuracy = model.test_on_batch(batch)
val_loss += loss
val_accuracy += accuracy
pbar.update()
pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})
train_loss /= train_steps_per_epoch
train_accuracy /= train_steps_per_epoch
val_loss /= val_steps_per_epoch
val_accuracy /= val_steps_per_epoch
history['loss'].append(train_loss)
history['accuracy'].append(train_accuracy)
history['val_loss'].append(val_loss)
history['val_accuracy'].append(val_accuracy)
print(f'\nTrain Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}\n')
if val_accuracy > best_val_accuracy:
best_val_accuracy = val_accuracy
model.layers[0].layers[3].save_weights('best_model.weights.h5')
if val_loss - best_val_loss > delta:
temp_patience -= 1
if temp_patience == 0:
print('Early stopping: Validation loss did not improve.')
break
else:
best_val_loss = val_loss
temp_patience = patience
return model, history
siamese_model = SiameseModel(siamese_net)
siamese_model.jit_compile = False
siamese_model.compile(optimizer=Adam(0.00001))
siamese_model, history = train_model(siamese_model,
train_triplets=train,
epochs=200,
batch_size=64,
val_triplets=val,
patience=3)
After reviewing your attempt to convert Keras 2 to 3, I noticed some issues. For instance, DistanceLayer
has not been properly migrated to a Keras 3 compatible format. Additionally, the model's forward pass is executed twice while computing loss and metrics, which may impact training speed and overall efficiency. Also I would recommend using the fit method or the custom fit approach whenever possible, rather than custom_training, when working with Keras. I have updated your code from Keras 2 to 3. Here is the code. Some highlights:
tensorflow
and torch
as backend.tf.data
API to build dataset for siamese modelling.keras.layers.Random*
layers for augmentation.