as the title suggests, I'm trying to convert a notebook in keras 3, but idk why it's not working properly.
This is the original notebook (tell me if it's better to put here the code), I'm trying to do the same task but with car fronts; at the end you can find my code.
I managed to make it work with keras 2, now converting it to keras 3, but in kaggle seems really slow: I tried disabling GPU and it gets worse, so I know that it's using the GPU but CPU is always 100% and I can't figure why. My updated code is 5 times slower then the original one, while using old keras it has the same speed of the original code.
How can I do better? There is something that I'm missing? I followed keras official guide to convert keras 2 to keras 3, I'm thinking about learning pytorch but my model should run on constraint hardware, so I want to use tf lite to convert the model automatically (I'll use tf as backend for keras 3)
Furthermore, the model receives 5 tensors without using data = data[0], because data[0] is the list of batches (anchors, positive and negative) while data1 and data[2] are empty. This problem is in train_step and test_step, how can I manage that problem in a cleaner way?
My updated code for keras 3 (I prefer to copy this so it doesn't changes, I'll make a static notebook with a run if you prefer to see the correctness of code but the 20 s/it or more as speed):
import os
import zipfile
import random
import math
import pandas as pd
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import tensorflow as tf
import keras
from keras.models import Model
from keras.layers import Layer, Flatten, Dense,\
                                    Dropout, BatchNormalization, Input
from keras.metrics import Mean, CosineSimilarity
from keras.optimizers import Adam
from keras.utils import plot_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.applications.efficientnet import EfficientNetB7, preprocess_input
os.environ["KERAS_BACKEND"] = "tensorflow"
#!conda install -y gdown
if not(os.path.isdir('/kaggle/working/confirmed_fronts')):
    !gdown 
    !unzip -qq Confirmed_fronts.zip
    !rm -rf /kaggle/working/Confirmed_fronts.zip
%cd /kaggle/working/confirmed_fronts
!find . -type f | awk -F '/' '{print $4}' | awk -F '\\$\\$' 'BEGIN{print "Maker name,Model name,Registration year,Color,Genmodel ID,Adv ID,Image index"}1 {print $1 "," $2 "," $3 "," $4 "," $5 "," $6 "," $7}' > car_fronts.csv
df2 = pd.read_csv('car_fronts.csv', delimiter=',')
df2 = df2.dropna(how='all',axis=0) 
df2['Registration year'] = df2['Registration year'].astype(int)
df2['Adv ID'] = df2['Adv ID'].astype(int)
df2
df2vc = df2[['Maker name', 'Model name', 'Registration year', 'Color']].value_counts()
df2vc
df2vcFiltered = df2vc[df2vc >= 5]
df2vcFiltered
def triplets_generator(max_triplets=10, sample_lim=len(df2vcFiltered), val_set_perc=0.2, test_set_perc=0.1, print_paths=False, high_filter=False):
        
    images_tr  = []
    images_val = []
    images_te  = []
    for row in range(sample_lim):
        images_row = []
        
        index = df2vcFiltered.index[row]
 
        for i in range(max_triplets):
            random_choice1 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
            path1 = str(random_choice1['Maker name'].iloc[0]) + '/' + str(random_choice1['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice1['Maker name'].iloc[0]) + '$$' + str(random_choice1['Model name'].iloc[0]) + '$$' +  str(random_choice1['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice1['Color'].iloc[0]) + '$$' + str(random_choice1['Genmodel ID'].iloc[0]) + '$$' + str(random_choice1['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice1['Image index'].iloc[0])
            
            while True:                  #TODO better way to write it? Do while not implemented
                random_choice2 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
                if random_choice1.index != random_choice2.index:
                    break
            path2 = str(random_choice2['Maker name'].iloc[0]) + '/' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice2['Maker name'].iloc[0]) + '$$' + str(random_choice2['Model name'].iloc[0]) + '$$' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice2['Color'].iloc[0]) + '$$' + str(random_choice2['Genmodel ID'].iloc[0]) + '$$' + str(random_choice2['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice2['Image index'].iloc[0])
            
            random_choice3 = df2[(df2['Maker name'] != index[0]) | (df2['Model name'] != index[1]) | (abs(df2['Registration year'] - index[2]) >= 10) | (df2['Color'] != index[3])].sample()
            path3 = str(random_choice3['Maker name'].iloc[0]) + '/' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice3['Maker name'].iloc[0]) + '$$' + str(random_choice3['Model name'].iloc[0]) + '$$' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice3['Color'].iloc[0]) + '$$' + str(random_choice3['Genmodel ID'].iloc[0]) + '$$' + str(random_choice3['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice3['Image index'].iloc[0])
    
            a = cv2.cvtColor(cv2.imread(path1), cv2.COLOR_BGR2RGB)
            p = cv2.cvtColor(cv2.imread(path2), cv2.COLOR_BGR2RGB)
            n = cv2.cvtColor(cv2.imread(path3), cv2.COLOR_BGR2RGB)
            
            if high_filter:              #TODO if only a single channel is used, then architecture has to be (_,_,1); is there a way to be channel size independent?
                for img in [a,p,n]:
                    temp = 0.2989 * img[:,:,0] + 0.5870 * img[:,:,1] + 0.1140 * img[:,:,2]
                    temp = temp - ndimage.gaussian_filter(temp,3)
                    img[:,:,0] = temp
                    img[:,:,1] = temp
                    img[:,:,2] = temp
              
            if print_paths:
                print(path1)
                print(path2)
                print(path3)
            images_row.append([a,p,n])
        random.shuffle(images_row)
        tr, val = train_test_split(images_row, shuffle=True, test_size=test_set_perc+val_set_perc)
        val, te = train_test_split(val, shuffle=True, test_size=test_set_perc)
        images_tr.extend(tr)
        images_val.extend(val)
        images_te.extend(te)
    
    return images_tr, images_val, images_te
    
#TODO add random_state?
train, val, test = triplets_generator(sample_lim=200)
print(len(train))
print(len(train[0]))
def batch_generator(triplets, batch_size=32, augment=True):
    total_triplets = len(triplets)
    random.shuffle(triplets)
    
    datagen = ImageDataGenerator(
        rotation_range=10,  
        width_shift_range=0.05, 
        height_shift_range=0.05,   
        horizontal_flip=True,
        zoom_range=0.2
    )
    
    for i in range(0, total_triplets, batch_size):
        batch_triplets = triplets[i : i+batch_size]
        anchor_batch = []
        positive_batch = []
        negative_batch = []
        for triplet in batch_triplets:
            anchor, positive, negative = triplet
            
            anchor_image = anchor
            positive_image = positive
            negative_image = negative
                
            if augment:
                anchor_image = datagen.random_transform(anchor_image)
                positive_image = datagen.random_transform(positive_image)
                negative_image = datagen.random_transform(negative_image)
            anchor_batch.append(anchor_image)
            positive_batch.append(positive_image)
            negative_batch.append(negative_image)
        yield np.array(anchor_batch), np.array(positive_batch), np.array(negative_batch)
batch=6
print(len([next(batch_generator(train, batch))]))
print(len([next(batch_generator(train, batch))][0]))
print(len([next(batch_generator(train, batch))][0][0]))
def print_triplets(triplets):
    a, p, n = triplets
    for i in range(len(a)):
        
        fig, axarr = plt.subplots(1,3, figsize=(10, 4))
        
        axarr[0].imshow(a[i])
        axarr[0].title.set_text('Anchor')
        axarr[1].imshow(p[i])
        axarr[1].title.set_text('Positive')
        axarr[2].imshow(n[i])
        axarr[2].title.set_text('Negative')
        plt.show()
print_triplets([next(batch_generator(train, 6))][0])
def get_embedding(input_shape, num_layers_to_unfreeze=25):
    base_model = EfficientNetB7(weights='imagenet',
                                input_shape=input_shape,
                                include_top=False,
                                pooling='avg')
    for i in range(len(base_model.layers)-num_layers_to_unfreeze):
        base_model.layers[i].trainable = False
    embedding = keras.models.Sequential([
        base_model,
        Flatten(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.3),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.3),
        Dense(128, activation='relu'),
        BatchNormalization(),
        Dense(128)
    ], name='Embedding')
    return embedding
input_shape = (300, 300, 3)
embedding = get_embedding(input_shape)
embedding.summary()
@keras.saving.register_keras_serializable()
class DistanceLayer(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
    @tf.function
    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return ap_distance, an_distance
anchor_input = Input(name='anchor', shape=input_shape)
positive_input = Input(name='positive', shape=input_shape)
negative_input = Input(name='negative', shape=input_shape)
distances = DistanceLayer()(
    embedding(preprocess_input(anchor_input)),
    embedding(preprocess_input(positive_input)),
    embedding(preprocess_input(negative_input))
)
siamese_net = Model(
    inputs=[anchor_input,
            positive_input,
            negative_input],
    outputs=distances
)
@keras.saving.register_keras_serializable()
class SiameseModel(Model):
    def __init__(self, siamese_net, margin=0.5):
        super().__init__()
        self.siamese_net = siamese_net
        self.margin = margin
        self.loss_tracker = Mean(name='loss')
        self.accuracy_tracker = Mean(name='accuracy')
    @tf.function
    def call(self, inputs):
        return self.siamese_net(inputs)          #WTF
    def train_step(self, *args, **kwargs):
        if keras.backend.backend() == "jax":                   #TODO
            return self._jax_train_step(*args, **kwargs)
        elif keras.backend.backend() == "tensorflow":
            #########################
            print('TF')
            #########################
            return self._tensorflow_train_step(*args, **kwargs)
        elif keras.backend.backend() == "torch":               #TODO
            return self._torch_train_step(*args, **kwargs)
    
    def _tensorflow_train_step(self, data):
        #########################
        print(data)
        print(len(data))
        #########################
        data = data[0]
        
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)#[0])
        gradients = tape.gradient(loss, self.siamese_net.trainable_weights)
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_net.trainable_weights)
        )
        self.loss_tracker.update_state(loss)
        accuracy = self._compute_accuracy(data)#[0])               #WTF
        self.accuracy_tracker.update_state(accuracy)
        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}
    def test_step(self, data):
        data = data[0]    #WTF
        loss = self._compute_loss(data)#[0])                       #WTF
        self.loss_tracker.update_state(loss)
        accuracy = self._compute_accuracy(data)
        self.accuracy_tracker.update_state(accuracy)
        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}
    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_net(data)
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, .0)
        return loss
    def _compute_accuracy(self, data):
        ap_distance, an_distance = self.siamese_net(data)
        accuracy = tf.reduce_mean(tf.cast(ap_distance < an_distance,
                                          tf.float32))
        return accuracy
    @property
    def metrics(self):
        return [self.loss_tracker, self.accuracy_tracker]
    def get_config(self):
        base_config = super().get_config()
        config = {
            'siamese_net': keras.saving.serialize_keras_object(self.siamese_net),
            'margin': keras.saving.serialize_keras_object(self.margin),
            'loss_tracker': keras.saving.serialize_keras_object(self.loss_tracker),
            'accuracy_tracker': keras.saving.serialize_keras_object(self.accuracy_tracker),
        }
        return {**base_config, **config}
    @classmethod
    def from_config(cls, config):
        config['siamese_net'] = keras.saving.deserialize_keras_object(config.pop('siamese_net'))
        config['margin'] = keras.saving.deserialize_keras_object(config.pop('margin'))
        config['loss_tracker'] = keras.saving.deserialize_keras_object(config.pop('loss_tracker'))
        config['accuracy_tracker'] = keras.saving.deserialize_keras_object(config.pop('accuracy_tracker'))
        return cls(**config)
def train_model(model,
                train_triplets,
                epochs,
                batch_size,
                val_triplets,
                patience,
                delta=0.0001):
    best_val_accuracy = 0
    best_val_loss = float('inf')
    temp_patience = patience
    history = {
        'loss': [],
        'val_loss': [],
        'accuracy': [],
        'val_accuracy': []
    }
    train_steps_per_epoch = math.ceil(len(train_triplets) / batch_size)
    val_steps_per_epoch = math.ceil(len(val_triplets) / batch_size)
    for epoch in range(epochs):
        print(f'Epoch {epoch+1}/{epochs}')
        train_loss = 0.
        train_accuracy = 0.
        val_loss = 0.
        val_accuracy = 0.
        with tqdm(total=train_steps_per_epoch, desc='Training') as pbar:
            for batch in batch_generator(train_triplets, batch_size=batch_size):
                
                loss, accuracy = model.train_on_batch(batch)
                train_loss += loss
                train_accuracy += accuracy
                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})
        with tqdm(total=val_steps_per_epoch, desc='Validation') as pbar:
            for batch in batch_generator(val_triplets, batch_size=batch_size):
                loss, accuracy = model.test_on_batch(batch)
                val_loss += loss
                val_accuracy += accuracy
                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})
        train_loss /= train_steps_per_epoch
        train_accuracy /= train_steps_per_epoch
        val_loss /= val_steps_per_epoch
        val_accuracy /= val_steps_per_epoch
        history['loss'].append(train_loss)
        history['accuracy'].append(train_accuracy)
        history['val_loss'].append(val_loss)
        history['val_accuracy'].append(val_accuracy)
        print(f'\nTrain Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
        print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}\n')
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            model.layers[0].layers[3].save_weights('best_model.weights.h5')
        if val_loss - best_val_loss > delta:
            temp_patience -= 1
            if temp_patience == 0:
                print('Early stopping: Validation loss did not improve.')
                break
        else:
            best_val_loss = val_loss
            temp_patience = patience
    return model, history
siamese_model = SiameseModel(siamese_net)
siamese_model.jit_compile = False
siamese_modelpile(optimizer=Adam(0.00001))
siamese_model, history = train_model(siamese_model,
                                     train_triplets=train,
                                     epochs=200,
                                     batch_size=64,
                                     val_triplets=val,
                                     patience=3)
as the title suggests, I'm trying to convert a notebook in keras 3, but idk why it's not working properly.
This is the original notebook (tell me if it's better to put here the code), I'm trying to do the same task but with car fronts; at the end you can find my code.
I managed to make it work with keras 2, now converting it to keras 3, but in kaggle seems really slow: I tried disabling GPU and it gets worse, so I know that it's using the GPU but CPU is always 100% and I can't figure why. My updated code is 5 times slower then the original one, while using old keras it has the same speed of the original code.
How can I do better? There is something that I'm missing? I followed keras official guide to convert keras 2 to keras 3, I'm thinking about learning pytorch but my model should run on constraint hardware, so I want to use tf lite to convert the model automatically (I'll use tf as backend for keras 3)
Furthermore, the model receives 5 tensors without using data = data[0], because data[0] is the list of batches (anchors, positive and negative) while data1 and data[2] are empty. This problem is in train_step and test_step, how can I manage that problem in a cleaner way?
My updated code for keras 3 (I prefer to copy this so it doesn't changes, I'll make a static notebook with a run if you prefer to see the correctness of code but the 20 s/it or more as speed):
import os
import zipfile
import random
import math
import pandas as pd
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import tensorflow as tf
import keras
from keras.models import Model
from keras.layers import Layer, Flatten, Dense,\
                                    Dropout, BatchNormalization, Input
from keras.metrics import Mean, CosineSimilarity
from keras.optimizers import Adam
from keras.utils import plot_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.applications.efficientnet import EfficientNetB7, preprocess_input
os.environ["KERAS_BACKEND"] = "tensorflow"
#!conda install -y gdown
if not(os.path.isdir('/kaggle/working/confirmed_fronts')):
    !gdown https://drive.google.com/uc?id=1e1ajCFntWRVeCluTvSJ6gV-2ew6F0ys5
    !unzip -qq Confirmed_fronts.zip
    !rm -rf /kaggle/working/Confirmed_fronts.zip
%cd /kaggle/working/confirmed_fronts
!find . -type f | awk -F '/' '{print $4}' | awk -F '\\$\\$' 'BEGIN{print "Maker name,Model name,Registration year,Color,Genmodel ID,Adv ID,Image index"}1 {print $1 "," $2 "," $3 "," $4 "," $5 "," $6 "," $7}' > car_fronts.csv
df2 = pd.read_csv('car_fronts.csv', delimiter=',')
df2 = df2.dropna(how='all',axis=0) 
df2['Registration year'] = df2['Registration year'].astype(int)
df2['Adv ID'] = df2['Adv ID'].astype(int)
df2
df2vc = df2[['Maker name', 'Model name', 'Registration year', 'Color']].value_counts()
df2vc
df2vcFiltered = df2vc[df2vc >= 5]
df2vcFiltered
def triplets_generator(max_triplets=10, sample_lim=len(df2vcFiltered), val_set_perc=0.2, test_set_perc=0.1, print_paths=False, high_filter=False):
        
    images_tr  = []
    images_val = []
    images_te  = []
    for row in range(sample_lim):
        images_row = []
        
        index = df2vcFiltered.index[row]
 
        for i in range(max_triplets):
            random_choice1 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
            path1 = str(random_choice1['Maker name'].iloc[0]) + '/' + str(random_choice1['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice1['Maker name'].iloc[0]) + '$$' + str(random_choice1['Model name'].iloc[0]) + '$$' +  str(random_choice1['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice1['Color'].iloc[0]) + '$$' + str(random_choice1['Genmodel ID'].iloc[0]) + '$$' + str(random_choice1['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice1['Image index'].iloc[0])
            
            while True:                  #TODO better way to write it? Do while not implemented
                random_choice2 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
                if random_choice1.index != random_choice2.index:
                    break
            path2 = str(random_choice2['Maker name'].iloc[0]) + '/' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice2['Maker name'].iloc[0]) + '$$' + str(random_choice2['Model name'].iloc[0]) + '$$' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice2['Color'].iloc[0]) + '$$' + str(random_choice2['Genmodel ID'].iloc[0]) + '$$' + str(random_choice2['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice2['Image index'].iloc[0])
            
            random_choice3 = df2[(df2['Maker name'] != index[0]) | (df2['Model name'] != index[1]) | (abs(df2['Registration year'] - index[2]) >= 10) | (df2['Color'] != index[3])].sample()
            path3 = str(random_choice3['Maker name'].iloc[0]) + '/' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice3['Maker name'].iloc[0]) + '$$' + str(random_choice3['Model name'].iloc[0]) + '$$' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice3['Color'].iloc[0]) + '$$' + str(random_choice3['Genmodel ID'].iloc[0]) + '$$' + str(random_choice3['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice3['Image index'].iloc[0])
    
            a = cv2.cvtColor(cv2.imread(path1), cv2.COLOR_BGR2RGB)
            p = cv2.cvtColor(cv2.imread(path2), cv2.COLOR_BGR2RGB)
            n = cv2.cvtColor(cv2.imread(path3), cv2.COLOR_BGR2RGB)
            
            if high_filter:              #TODO if only a single channel is used, then architecture has to be (_,_,1); is there a way to be channel size independent?
                for img in [a,p,n]:
                    temp = 0.2989 * img[:,:,0] + 0.5870 * img[:,:,1] + 0.1140 * img[:,:,2]
                    temp = temp - ndimage.gaussian_filter(temp,3)
                    img[:,:,0] = temp
                    img[:,:,1] = temp
                    img[:,:,2] = temp
              
            if print_paths:
                print(path1)
                print(path2)
                print(path3)
            images_row.append([a,p,n])
        random.shuffle(images_row)
        tr, val = train_test_split(images_row, shuffle=True, test_size=test_set_perc+val_set_perc)
        val, te = train_test_split(val, shuffle=True, test_size=test_set_perc)
        images_tr.extend(tr)
        images_val.extend(val)
        images_te.extend(te)
    
    return images_tr, images_val, images_te
    
#TODO add random_state?
train, val, test = triplets_generator(sample_lim=200)
print(len(train))
print(len(train[0]))
def batch_generator(triplets, batch_size=32, augment=True):
    total_triplets = len(triplets)
    random.shuffle(triplets)
    
    datagen = ImageDataGenerator(
        rotation_range=10,  
        width_shift_range=0.05, 
        height_shift_range=0.05,   
        horizontal_flip=True,
        zoom_range=0.2
    )
    
    for i in range(0, total_triplets, batch_size):
        batch_triplets = triplets[i : i+batch_size]
        anchor_batch = []
        positive_batch = []
        negative_batch = []
        for triplet in batch_triplets:
            anchor, positive, negative = triplet
            
            anchor_image = anchor
            positive_image = positive
            negative_image = negative
                
            if augment:
                anchor_image = datagen.random_transform(anchor_image)
                positive_image = datagen.random_transform(positive_image)
                negative_image = datagen.random_transform(negative_image)
            anchor_batch.append(anchor_image)
            positive_batch.append(positive_image)
            negative_batch.append(negative_image)
        yield np.array(anchor_batch), np.array(positive_batch), np.array(negative_batch)
batch=6
print(len([next(batch_generator(train, batch))]))
print(len([next(batch_generator(train, batch))][0]))
print(len([next(batch_generator(train, batch))][0][0]))
def print_triplets(triplets):
    a, p, n = triplets
    for i in range(len(a)):
        
        fig, axarr = plt.subplots(1,3, figsize=(10, 4))
        
        axarr[0].imshow(a[i])
        axarr[0].title.set_text('Anchor')
        axarr[1].imshow(p[i])
        axarr[1].title.set_text('Positive')
        axarr[2].imshow(n[i])
        axarr[2].title.set_text('Negative')
        plt.show()
print_triplets([next(batch_generator(train, 6))][0])
def get_embedding(input_shape, num_layers_to_unfreeze=25):
    base_model = EfficientNetB7(weights='imagenet',
                                input_shape=input_shape,
                                include_top=False,
                                pooling='avg')
    for i in range(len(base_model.layers)-num_layers_to_unfreeze):
        base_model.layers[i].trainable = False
    embedding = keras.models.Sequential([
        base_model,
        Flatten(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.3),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.3),
        Dense(128, activation='relu'),
        BatchNormalization(),
        Dense(128)
    ], name='Embedding')
    return embedding
input_shape = (300, 300, 3)
embedding = get_embedding(input_shape)
embedding.summary()
@keras.saving.register_keras_serializable()
class DistanceLayer(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
    @tf.function
    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return ap_distance, an_distance
anchor_input = Input(name='anchor', shape=input_shape)
positive_input = Input(name='positive', shape=input_shape)
negative_input = Input(name='negative', shape=input_shape)
distances = DistanceLayer()(
    embedding(preprocess_input(anchor_input)),
    embedding(preprocess_input(positive_input)),
    embedding(preprocess_input(negative_input))
)
siamese_net = Model(
    inputs=[anchor_input,
            positive_input,
            negative_input],
    outputs=distances
)
@keras.saving.register_keras_serializable()
class SiameseModel(Model):
    def __init__(self, siamese_net, margin=0.5):
        super().__init__()
        self.siamese_net = siamese_net
        self.margin = margin
        self.loss_tracker = Mean(name='loss')
        self.accuracy_tracker = Mean(name='accuracy')
    @tf.function
    def call(self, inputs):
        return self.siamese_net(inputs)          #WTF
    def train_step(self, *args, **kwargs):
        if keras.backend.backend() == "jax":                   #TODO
            return self._jax_train_step(*args, **kwargs)
        elif keras.backend.backend() == "tensorflow":
            #########################
            print('TF')
            #########################
            return self._tensorflow_train_step(*args, **kwargs)
        elif keras.backend.backend() == "torch":               #TODO
            return self._torch_train_step(*args, **kwargs)
    
    def _tensorflow_train_step(self, data):
        #########################
        print(data)
        print(len(data))
        #########################
        data = data[0]
        
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)#[0])
        gradients = tape.gradient(loss, self.siamese_net.trainable_weights)
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_net.trainable_weights)
        )
        self.loss_tracker.update_state(loss)
        accuracy = self._compute_accuracy(data)#[0])               #WTF
        self.accuracy_tracker.update_state(accuracy)
        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}
    def test_step(self, data):
        data = data[0]    #WTF
        loss = self._compute_loss(data)#[0])                       #WTF
        self.loss_tracker.update_state(loss)
        accuracy = self._compute_accuracy(data)
        self.accuracy_tracker.update_state(accuracy)
        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}
    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_net(data)
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, .0)
        return loss
    def _compute_accuracy(self, data):
        ap_distance, an_distance = self.siamese_net(data)
        accuracy = tf.reduce_mean(tf.cast(ap_distance < an_distance,
                                          tf.float32))
        return accuracy
    @property
    def metrics(self):
        return [self.loss_tracker, self.accuracy_tracker]
    def get_config(self):
        base_config = super().get_config()
        config = {
            'siamese_net': keras.saving.serialize_keras_object(self.siamese_net),
            'margin': keras.saving.serialize_keras_object(self.margin),
            'loss_tracker': keras.saving.serialize_keras_object(self.loss_tracker),
            'accuracy_tracker': keras.saving.serialize_keras_object(self.accuracy_tracker),
        }
        return {**base_config, **config}
    @classmethod
    def from_config(cls, config):
        config['siamese_net'] = keras.saving.deserialize_keras_object(config.pop('siamese_net'))
        config['margin'] = keras.saving.deserialize_keras_object(config.pop('margin'))
        config['loss_tracker'] = keras.saving.deserialize_keras_object(config.pop('loss_tracker'))
        config['accuracy_tracker'] = keras.saving.deserialize_keras_object(config.pop('accuracy_tracker'))
        return cls(**config)
def train_model(model,
                train_triplets,
                epochs,
                batch_size,
                val_triplets,
                patience,
                delta=0.0001):
    best_val_accuracy = 0
    best_val_loss = float('inf')
    temp_patience = patience
    history = {
        'loss': [],
        'val_loss': [],
        'accuracy': [],
        'val_accuracy': []
    }
    train_steps_per_epoch = math.ceil(len(train_triplets) / batch_size)
    val_steps_per_epoch = math.ceil(len(val_triplets) / batch_size)
    for epoch in range(epochs):
        print(f'Epoch {epoch+1}/{epochs}')
        train_loss = 0.
        train_accuracy = 0.
        val_loss = 0.
        val_accuracy = 0.
        with tqdm(total=train_steps_per_epoch, desc='Training') as pbar:
            for batch in batch_generator(train_triplets, batch_size=batch_size):
                
                loss, accuracy = model.train_on_batch(batch)
                train_loss += loss
                train_accuracy += accuracy
                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})
        with tqdm(total=val_steps_per_epoch, desc='Validation') as pbar:
            for batch in batch_generator(val_triplets, batch_size=batch_size):
                loss, accuracy = model.test_on_batch(batch)
                val_loss += loss
                val_accuracy += accuracy
                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})
        train_loss /= train_steps_per_epoch
        train_accuracy /= train_steps_per_epoch
        val_loss /= val_steps_per_epoch
        val_accuracy /= val_steps_per_epoch
        history['loss'].append(train_loss)
        history['accuracy'].append(train_accuracy)
        history['val_loss'].append(val_loss)
        history['val_accuracy'].append(val_accuracy)
        print(f'\nTrain Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
        print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}\n')
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            model.layers[0].layers[3].save_weights('best_model.weights.h5')
        if val_loss - best_val_loss > delta:
            temp_patience -= 1
            if temp_patience == 0:
                print('Early stopping: Validation loss did not improve.')
                break
        else:
            best_val_loss = val_loss
            temp_patience = patience
    return model, history
siamese_model = SiameseModel(siamese_net)
siamese_model.jit_compile = False
siamese_model.compile(optimizer=Adam(0.00001))
siamese_model, history = train_model(siamese_model,
                                     train_triplets=train,
                                     epochs=200,
                                     batch_size=64,
                                     val_triplets=val,
                                     patience=3)
After reviewing your attempt to convert Keras 2 to 3, I noticed some issues. For instance, DistanceLayer has not been properly migrated to a Keras 3 compatible format. Additionally, the model's forward pass is executed twice while computing loss and metrics, which may impact training speed and overall efficiency. Also I would recommend using the fit method or the custom fit approach whenever possible, rather than custom_training, when working with Keras. I have updated your code from Keras 2 to 3. Here is the code. Some highlights:
tensorflow and torch as backend.tf.data API to build dataset for siamese modelling.keras.layers.Random* layers for augmentation.
