tensorflow - From keras 2 to keras 3 conversion of notebook - Stack Overflow

admin2025-04-17  17

as the title suggests, I'm trying to convert a notebook in keras 3, but idk why it's not working properly.

This is the original notebook (tell me if it's better to put here the code), I'm trying to do the same task but with car fronts; at the end you can find my code.

I managed to make it work with keras 2, now converting it to keras 3, but in kaggle seems really slow: I tried disabling GPU and it gets worse, so I know that it's using the GPU but CPU is always 100% and I can't figure why. My updated code is 5 times slower then the original one, while using old keras it has the same speed of the original code.

How can I do better? There is something that I'm missing? I followed keras official guide to convert keras 2 to keras 3, I'm thinking about learning pytorch but my model should run on constraint hardware, so I want to use tf lite to convert the model automatically (I'll use tf as backend for keras 3)

Furthermore, the model receives 5 tensors without using data = data[0], because data[0] is the list of batches (anchors, positive and negative) while data1 and data[2] are empty. This problem is in train_step and test_step, how can I manage that problem in a cleaner way?


My updated code for keras 3 (I prefer to copy this so it doesn't changes, I'll make a static notebook with a run if you prefer to see the correctness of code but the 20 s/it or more as speed):

import os
import zipfile

import random
import math
import pandas as pd

import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
from tqdm import tqdm

from sklearn.model_selection import train_test_split

import tensorflow as tf
import keras
from keras.models import Model
from keras.layers import Layer, Flatten, Dense,\
                                    Dropout, BatchNormalization, Input
from keras.metrics import Mean, CosineSimilarity
from keras.optimizers import Adam
from keras.utils import plot_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.applications.efficientnet import EfficientNetB7, preprocess_input

os.environ["KERAS_BACKEND"] = "tensorflow"


#!conda install -y gdown
if not(os.path.isdir('/kaggle/working/confirmed_fronts')):
    !gdown 

    !unzip -qq Confirmed_fronts.zip

    !rm -rf /kaggle/working/Confirmed_fronts.zip


%cd /kaggle/working/confirmed_fronts

!find . -type f | awk -F '/' '{print $4}' | awk -F '\\$\\$' 'BEGIN{print "Maker name,Model name,Registration year,Color,Genmodel ID,Adv ID,Image index"}1 {print $1 "," $2 "," $3 "," $4 "," $5 "," $6 "," $7}' > car_fronts.csv


df2 = pd.read_csv('car_fronts.csv', delimiter=',')

df2 = df2.dropna(how='all',axis=0) 

df2['Registration year'] = df2['Registration year'].astype(int)
df2['Adv ID'] = df2['Adv ID'].astype(int)

df2


df2vc = df2[['Maker name', 'Model name', 'Registration year', 'Color']].value_counts()

df2vc


df2vcFiltered = df2vc[df2vc >= 5]
df2vcFiltered


def triplets_generator(max_triplets=10, sample_lim=len(df2vcFiltered), val_set_perc=0.2, test_set_perc=0.1, print_paths=False, high_filter=False):
        
    images_tr  = []
    images_val = []
    images_te  = []

    for row in range(sample_lim):
        images_row = []
        
        index = df2vcFiltered.index[row]
 
        for i in range(max_triplets):
            random_choice1 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
            path1 = str(random_choice1['Maker name'].iloc[0]) + '/' + str(random_choice1['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice1['Maker name'].iloc[0]) + '$$' + str(random_choice1['Model name'].iloc[0]) + '$$' +  str(random_choice1['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice1['Color'].iloc[0]) + '$$' + str(random_choice1['Genmodel ID'].iloc[0]) + '$$' + str(random_choice1['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice1['Image index'].iloc[0])
            
            while True:                  #TODO better way to write it? Do while not implemented
                random_choice2 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
                if random_choice1.index != random_choice2.index:
                    break
            path2 = str(random_choice2['Maker name'].iloc[0]) + '/' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice2['Maker name'].iloc[0]) + '$$' + str(random_choice2['Model name'].iloc[0]) + '$$' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice2['Color'].iloc[0]) + '$$' + str(random_choice2['Genmodel ID'].iloc[0]) + '$$' + str(random_choice2['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice2['Image index'].iloc[0])
            
            random_choice3 = df2[(df2['Maker name'] != index[0]) | (df2['Model name'] != index[1]) | (abs(df2['Registration year'] - index[2]) >= 10) | (df2['Color'] != index[3])].sample()
            path3 = str(random_choice3['Maker name'].iloc[0]) + '/' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice3['Maker name'].iloc[0]) + '$$' + str(random_choice3['Model name'].iloc[0]) + '$$' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice3['Color'].iloc[0]) + '$$' + str(random_choice3['Genmodel ID'].iloc[0]) + '$$' + str(random_choice3['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice3['Image index'].iloc[0])
    
            a = cv2.cvtColor(cv2.imread(path1), cv2.COLOR_BGR2RGB)
            p = cv2.cvtColor(cv2.imread(path2), cv2.COLOR_BGR2RGB)
            n = cv2.cvtColor(cv2.imread(path3), cv2.COLOR_BGR2RGB)
            
            if high_filter:              #TODO if only a single channel is used, then architecture has to be (_,_,1); is there a way to be channel size independent?
                for img in [a,p,n]:
                    temp = 0.2989 * img[:,:,0] + 0.5870 * img[:,:,1] + 0.1140 * img[:,:,2]
                    temp = temp - ndimage.gaussian_filter(temp,3)
                    img[:,:,0] = temp
                    img[:,:,1] = temp
                    img[:,:,2] = temp
              
            if print_paths:
                print(path1)
                print(path2)
                print(path3)

            images_row.append([a,p,n])

        random.shuffle(images_row)

        tr, val = train_test_split(images_row, shuffle=True, test_size=test_set_perc+val_set_perc)
        val, te = train_test_split(val, shuffle=True, test_size=test_set_perc)

        images_tr.extend(tr)
        images_val.extend(val)
        images_te.extend(te)
    
    return images_tr, images_val, images_te
    
#TODO add random_state?


train, val, test = triplets_generator(sample_lim=200)
print(len(train))
print(len(train[0]))


def batch_generator(triplets, batch_size=32, augment=True):
    total_triplets = len(triplets)
    random.shuffle(triplets)
    
    datagen = ImageDataGenerator(
        rotation_range=10,  
        width_shift_range=0.05, 
        height_shift_range=0.05,   
        horizontal_flip=True,
        zoom_range=0.2
    )
    
    for i in range(0, total_triplets, batch_size):
        batch_triplets = triplets[i : i+batch_size]

        anchor_batch = []
        positive_batch = []
        negative_batch = []

        for triplet in batch_triplets:
            anchor, positive, negative = triplet
            
            anchor_image = anchor
            positive_image = positive
            negative_image = negative
                
            if augment:
                anchor_image = datagen.random_transform(anchor_image)
                positive_image = datagen.random_transform(positive_image)
                negative_image = datagen.random_transform(negative_image)

            anchor_batch.append(anchor_image)
            positive_batch.append(positive_image)
            negative_batch.append(negative_image)

        yield np.array(anchor_batch), np.array(positive_batch), np.array(negative_batch)


batch=6
print(len([next(batch_generator(train, batch))]))
print(len([next(batch_generator(train, batch))][0]))
print(len([next(batch_generator(train, batch))][0][0]))


def print_triplets(triplets):
    a, p, n = triplets

    for i in range(len(a)):
        
        fig, axarr = plt.subplots(1,3, figsize=(10, 4))
        
        axarr[0].imshow(a[i])
        axarr[0].title.set_text('Anchor')
        axarr[1].imshow(p[i])
        axarr[1].title.set_text('Positive')
        axarr[2].imshow(n[i])
        axarr[2].title.set_text('Negative')
        plt.show()


print_triplets([next(batch_generator(train, 6))][0])


def get_embedding(input_shape, num_layers_to_unfreeze=25):
    base_model = EfficientNetB7(weights='imagenet',
                                input_shape=input_shape,
                                include_top=False,
                                pooling='avg')

    for i in range(len(base_model.layers)-num_layers_to_unfreeze):
        base_model.layers[i].trainable = False

    embedding = keras.models.Sequential([
        base_model,
        Flatten(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.3),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.3),
        Dense(128, activation='relu'),
        BatchNormalization(),
        Dense(128)
    ], name='Embedding')

    return embedding


input_shape = (300, 300, 3)

embedding = get_embedding(input_shape)
embedding.summary()


@keras.saving.register_keras_serializable()
class DistanceLayer(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @tf.function
    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return ap_distance, an_distance

anchor_input = Input(name='anchor', shape=input_shape)
positive_input = Input(name='positive', shape=input_shape)
negative_input = Input(name='negative', shape=input_shape)

distances = DistanceLayer()(
    embedding(preprocess_input(anchor_input)),
    embedding(preprocess_input(positive_input)),
    embedding(preprocess_input(negative_input))
)

siamese_net = Model(
    inputs=[anchor_input,
            positive_input,
            negative_input],
    outputs=distances
)


@keras.saving.register_keras_serializable()
class SiameseModel(Model):
    def __init__(self, siamese_net, margin=0.5):
        super().__init__()
        self.siamese_net = siamese_net
        self.margin = margin
        self.loss_tracker = Mean(name='loss')
        self.accuracy_tracker = Mean(name='accuracy')

    @tf.function
    def call(self, inputs):
        return self.siamese_net(inputs)          #WTF

    def train_step(self, *args, **kwargs):
        if keras.backend.backend() == "jax":                   #TODO
            return self._jax_train_step(*args, **kwargs)
        elif keras.backend.backend() == "tensorflow":
            #########################
            print('TF')
            #########################
            return self._tensorflow_train_step(*args, **kwargs)
        elif keras.backend.backend() == "torch":               #TODO
            return self._torch_train_step(*args, **kwargs)
    
    def _tensorflow_train_step(self, data):
        #########################
        print(data)
        print(len(data))
        #########################
        data = data[0]
        
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)#[0])

        gradients = tape.gradient(loss, self.siamese_net.trainable_weights)

        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_net.trainable_weights)
        )

        self.loss_tracker.update_state(loss)

        accuracy = self._compute_accuracy(data)#[0])               #WTF
        self.accuracy_tracker.update_state(accuracy)

        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}

    def test_step(self, data):
        data = data[0]    #WTF
        loss = self._compute_loss(data)#[0])                       #WTF

        self.loss_tracker.update_state(loss)

        accuracy = self._compute_accuracy(data)
        self.accuracy_tracker.update_state(accuracy)

        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}

    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_net(data)

        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, .0)
        return loss

    def _compute_accuracy(self, data):
        ap_distance, an_distance = self.siamese_net(data)
        accuracy = tf.reduce_mean(tf.cast(ap_distance < an_distance,
                                          tf.float32))
        return accuracy

    @property
    def metrics(self):
        return [self.loss_tracker, self.accuracy_tracker]

    def get_config(self):
        base_config = super().get_config()
        config = {
            'siamese_net': keras.saving.serialize_keras_object(self.siamese_net),
            'margin': keras.saving.serialize_keras_object(self.margin),
            'loss_tracker': keras.saving.serialize_keras_object(self.loss_tracker),
            'accuracy_tracker': keras.saving.serialize_keras_object(self.accuracy_tracker),
        }
        return {**base_config, **config}

    @classmethod
    def from_config(cls, config):
        config['siamese_net'] = keras.saving.deserialize_keras_object(config.pop('siamese_net'))
        config['margin'] = keras.saving.deserialize_keras_object(config.pop('margin'))
        config['loss_tracker'] = keras.saving.deserialize_keras_object(config.pop('loss_tracker'))
        config['accuracy_tracker'] = keras.saving.deserialize_keras_object(config.pop('accuracy_tracker'))
        return cls(**config)


def train_model(model,
                train_triplets,
                epochs,
                batch_size,
                val_triplets,
                patience,
                delta=0.0001):

    best_val_accuracy = 0
    best_val_loss = float('inf')
    temp_patience = patience
    history = {
        'loss': [],
        'val_loss': [],
        'accuracy': [],
        'val_accuracy': []
    }

    train_steps_per_epoch = math.ceil(len(train_triplets) / batch_size)
    val_steps_per_epoch = math.ceil(len(val_triplets) / batch_size)

    for epoch in range(epochs):
        print(f'Epoch {epoch+1}/{epochs}')
        train_loss = 0.
        train_accuracy = 0.
        val_loss = 0.
        val_accuracy = 0.

        with tqdm(total=train_steps_per_epoch, desc='Training') as pbar:
            for batch in batch_generator(train_triplets, batch_size=batch_size):
                
                loss, accuracy = model.train_on_batch(batch)
                train_loss += loss
                train_accuracy += accuracy

                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})

        with tqdm(total=val_steps_per_epoch, desc='Validation') as pbar:
            for batch in batch_generator(val_triplets, batch_size=batch_size):
                loss, accuracy = model.test_on_batch(batch)
                val_loss += loss
                val_accuracy += accuracy

                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})

        train_loss /= train_steps_per_epoch
        train_accuracy /= train_steps_per_epoch
        val_loss /= val_steps_per_epoch
        val_accuracy /= val_steps_per_epoch

        history['loss'].append(train_loss)
        history['accuracy'].append(train_accuracy)
        history['val_loss'].append(val_loss)
        history['val_accuracy'].append(val_accuracy)

        print(f'\nTrain Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
        print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}\n')

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            model.layers[0].layers[3].save_weights('best_model.weights.h5')

        if val_loss - best_val_loss > delta:
            temp_patience -= 1
            if temp_patience == 0:
                print('Early stopping: Validation loss did not improve.')
                break
        else:
            best_val_loss = val_loss
            temp_patience = patience

    return model, history


siamese_model = SiameseModel(siamese_net)
siamese_model.jit_compile = False
siamese_modelpile(optimizer=Adam(0.00001))

siamese_model, history = train_model(siamese_model,
                                     train_triplets=train,
                                     epochs=200,
                                     batch_size=64,
                                     val_triplets=val,
                                     patience=3)

as the title suggests, I'm trying to convert a notebook in keras 3, but idk why it's not working properly.

This is the original notebook (tell me if it's better to put here the code), I'm trying to do the same task but with car fronts; at the end you can find my code.

I managed to make it work with keras 2, now converting it to keras 3, but in kaggle seems really slow: I tried disabling GPU and it gets worse, so I know that it's using the GPU but CPU is always 100% and I can't figure why. My updated code is 5 times slower then the original one, while using old keras it has the same speed of the original code.

How can I do better? There is something that I'm missing? I followed keras official guide to convert keras 2 to keras 3, I'm thinking about learning pytorch but my model should run on constraint hardware, so I want to use tf lite to convert the model automatically (I'll use tf as backend for keras 3)

Furthermore, the model receives 5 tensors without using data = data[0], because data[0] is the list of batches (anchors, positive and negative) while data1 and data[2] are empty. This problem is in train_step and test_step, how can I manage that problem in a cleaner way?


My updated code for keras 3 (I prefer to copy this so it doesn't changes, I'll make a static notebook with a run if you prefer to see the correctness of code but the 20 s/it or more as speed):

import os
import zipfile

import random
import math
import pandas as pd

import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
from tqdm import tqdm

from sklearn.model_selection import train_test_split

import tensorflow as tf
import keras
from keras.models import Model
from keras.layers import Layer, Flatten, Dense,\
                                    Dropout, BatchNormalization, Input
from keras.metrics import Mean, CosineSimilarity
from keras.optimizers import Adam
from keras.utils import plot_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.applications.efficientnet import EfficientNetB7, preprocess_input

os.environ["KERAS_BACKEND"] = "tensorflow"


#!conda install -y gdown
if not(os.path.isdir('/kaggle/working/confirmed_fronts')):
    !gdown https://drive.google.com/uc?id=1e1ajCFntWRVeCluTvSJ6gV-2ew6F0ys5

    !unzip -qq Confirmed_fronts.zip

    !rm -rf /kaggle/working/Confirmed_fronts.zip


%cd /kaggle/working/confirmed_fronts

!find . -type f | awk -F '/' '{print $4}' | awk -F '\\$\\$' 'BEGIN{print "Maker name,Model name,Registration year,Color,Genmodel ID,Adv ID,Image index"}1 {print $1 "," $2 "," $3 "," $4 "," $5 "," $6 "," $7}' > car_fronts.csv


df2 = pd.read_csv('car_fronts.csv', delimiter=',')

df2 = df2.dropna(how='all',axis=0) 

df2['Registration year'] = df2['Registration year'].astype(int)
df2['Adv ID'] = df2['Adv ID'].astype(int)

df2


df2vc = df2[['Maker name', 'Model name', 'Registration year', 'Color']].value_counts()

df2vc


df2vcFiltered = df2vc[df2vc >= 5]
df2vcFiltered


def triplets_generator(max_triplets=10, sample_lim=len(df2vcFiltered), val_set_perc=0.2, test_set_perc=0.1, print_paths=False, high_filter=False):
        
    images_tr  = []
    images_val = []
    images_te  = []

    for row in range(sample_lim):
        images_row = []
        
        index = df2vcFiltered.index[row]
 
        for i in range(max_triplets):
            random_choice1 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
            path1 = str(random_choice1['Maker name'].iloc[0]) + '/' + str(random_choice1['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice1['Maker name'].iloc[0]) + '$$' + str(random_choice1['Model name'].iloc[0]) + '$$' +  str(random_choice1['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice1['Color'].iloc[0]) + '$$' + str(random_choice1['Genmodel ID'].iloc[0]) + '$$' + str(random_choice1['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice1['Image index'].iloc[0])
            
            while True:                  #TODO better way to write it? Do while not implemented
                random_choice2 = df2[(df2['Maker name'] == index[0]) & (df2['Model name'] == index[1]) & (df2['Registration year'] == index[2]) & (df2['Color'] == index[3])].sample()
                if random_choice1.index != random_choice2.index:
                    break
            path2 = str(random_choice2['Maker name'].iloc[0]) + '/' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice2['Maker name'].iloc[0]) + '$$' + str(random_choice2['Model name'].iloc[0]) + '$$' + str(random_choice2['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice2['Color'].iloc[0]) + '$$' + str(random_choice2['Genmodel ID'].iloc[0]) + '$$' + str(random_choice2['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice2['Image index'].iloc[0])
            
            random_choice3 = df2[(df2['Maker name'] != index[0]) | (df2['Model name'] != index[1]) | (abs(df2['Registration year'] - index[2]) >= 10) | (df2['Color'] != index[3])].sample()
            path3 = str(random_choice3['Maker name'].iloc[0]) + '/' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '/' + str(random_choice3['Maker name'].iloc[0]) + '$$' + str(random_choice3['Model name'].iloc[0]) + '$$' + str(random_choice3['Registration year'].iloc[0].astype(int)) + '$$' + str(random_choice3['Color'].iloc[0]) + '$$' + str(random_choice3['Genmodel ID'].iloc[0]) + '$$' + str(random_choice3['Adv ID'].iloc[0].astype(int)) + '$$' + str(random_choice3['Image index'].iloc[0])
    
            a = cv2.cvtColor(cv2.imread(path1), cv2.COLOR_BGR2RGB)
            p = cv2.cvtColor(cv2.imread(path2), cv2.COLOR_BGR2RGB)
            n = cv2.cvtColor(cv2.imread(path3), cv2.COLOR_BGR2RGB)
            
            if high_filter:              #TODO if only a single channel is used, then architecture has to be (_,_,1); is there a way to be channel size independent?
                for img in [a,p,n]:
                    temp = 0.2989 * img[:,:,0] + 0.5870 * img[:,:,1] + 0.1140 * img[:,:,2]
                    temp = temp - ndimage.gaussian_filter(temp,3)
                    img[:,:,0] = temp
                    img[:,:,1] = temp
                    img[:,:,2] = temp
              
            if print_paths:
                print(path1)
                print(path2)
                print(path3)

            images_row.append([a,p,n])

        random.shuffle(images_row)

        tr, val = train_test_split(images_row, shuffle=True, test_size=test_set_perc+val_set_perc)
        val, te = train_test_split(val, shuffle=True, test_size=test_set_perc)

        images_tr.extend(tr)
        images_val.extend(val)
        images_te.extend(te)
    
    return images_tr, images_val, images_te
    
#TODO add random_state?


train, val, test = triplets_generator(sample_lim=200)
print(len(train))
print(len(train[0]))


def batch_generator(triplets, batch_size=32, augment=True):
    total_triplets = len(triplets)
    random.shuffle(triplets)
    
    datagen = ImageDataGenerator(
        rotation_range=10,  
        width_shift_range=0.05, 
        height_shift_range=0.05,   
        horizontal_flip=True,
        zoom_range=0.2
    )
    
    for i in range(0, total_triplets, batch_size):
        batch_triplets = triplets[i : i+batch_size]

        anchor_batch = []
        positive_batch = []
        negative_batch = []

        for triplet in batch_triplets:
            anchor, positive, negative = triplet
            
            anchor_image = anchor
            positive_image = positive
            negative_image = negative
                
            if augment:
                anchor_image = datagen.random_transform(anchor_image)
                positive_image = datagen.random_transform(positive_image)
                negative_image = datagen.random_transform(negative_image)

            anchor_batch.append(anchor_image)
            positive_batch.append(positive_image)
            negative_batch.append(negative_image)

        yield np.array(anchor_batch), np.array(positive_batch), np.array(negative_batch)


batch=6
print(len([next(batch_generator(train, batch))]))
print(len([next(batch_generator(train, batch))][0]))
print(len([next(batch_generator(train, batch))][0][0]))


def print_triplets(triplets):
    a, p, n = triplets

    for i in range(len(a)):
        
        fig, axarr = plt.subplots(1,3, figsize=(10, 4))
        
        axarr[0].imshow(a[i])
        axarr[0].title.set_text('Anchor')
        axarr[1].imshow(p[i])
        axarr[1].title.set_text('Positive')
        axarr[2].imshow(n[i])
        axarr[2].title.set_text('Negative')
        plt.show()


print_triplets([next(batch_generator(train, 6))][0])


def get_embedding(input_shape, num_layers_to_unfreeze=25):
    base_model = EfficientNetB7(weights='imagenet',
                                input_shape=input_shape,
                                include_top=False,
                                pooling='avg')

    for i in range(len(base_model.layers)-num_layers_to_unfreeze):
        base_model.layers[i].trainable = False

    embedding = keras.models.Sequential([
        base_model,
        Flatten(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.3),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.3),
        Dense(128, activation='relu'),
        BatchNormalization(),
        Dense(128)
    ], name='Embedding')

    return embedding


input_shape = (300, 300, 3)

embedding = get_embedding(input_shape)
embedding.summary()


@keras.saving.register_keras_serializable()
class DistanceLayer(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @tf.function
    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return ap_distance, an_distance

anchor_input = Input(name='anchor', shape=input_shape)
positive_input = Input(name='positive', shape=input_shape)
negative_input = Input(name='negative', shape=input_shape)

distances = DistanceLayer()(
    embedding(preprocess_input(anchor_input)),
    embedding(preprocess_input(positive_input)),
    embedding(preprocess_input(negative_input))
)

siamese_net = Model(
    inputs=[anchor_input,
            positive_input,
            negative_input],
    outputs=distances
)


@keras.saving.register_keras_serializable()
class SiameseModel(Model):
    def __init__(self, siamese_net, margin=0.5):
        super().__init__()
        self.siamese_net = siamese_net
        self.margin = margin
        self.loss_tracker = Mean(name='loss')
        self.accuracy_tracker = Mean(name='accuracy')

    @tf.function
    def call(self, inputs):
        return self.siamese_net(inputs)          #WTF

    def train_step(self, *args, **kwargs):
        if keras.backend.backend() == "jax":                   #TODO
            return self._jax_train_step(*args, **kwargs)
        elif keras.backend.backend() == "tensorflow":
            #########################
            print('TF')
            #########################
            return self._tensorflow_train_step(*args, **kwargs)
        elif keras.backend.backend() == "torch":               #TODO
            return self._torch_train_step(*args, **kwargs)
    
    def _tensorflow_train_step(self, data):
        #########################
        print(data)
        print(len(data))
        #########################
        data = data[0]
        
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)#[0])

        gradients = tape.gradient(loss, self.siamese_net.trainable_weights)

        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_net.trainable_weights)
        )

        self.loss_tracker.update_state(loss)

        accuracy = self._compute_accuracy(data)#[0])               #WTF
        self.accuracy_tracker.update_state(accuracy)

        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}

    def test_step(self, data):
        data = data[0]    #WTF
        loss = self._compute_loss(data)#[0])                       #WTF

        self.loss_tracker.update_state(loss)

        accuracy = self._compute_accuracy(data)
        self.accuracy_tracker.update_state(accuracy)

        return {'loss': self.loss_tracker.result(),
                'accuracy': self.accuracy_tracker.result()}

    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_net(data)

        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, .0)
        return loss

    def _compute_accuracy(self, data):
        ap_distance, an_distance = self.siamese_net(data)
        accuracy = tf.reduce_mean(tf.cast(ap_distance < an_distance,
                                          tf.float32))
        return accuracy

    @property
    def metrics(self):
        return [self.loss_tracker, self.accuracy_tracker]

    def get_config(self):
        base_config = super().get_config()
        config = {
            'siamese_net': keras.saving.serialize_keras_object(self.siamese_net),
            'margin': keras.saving.serialize_keras_object(self.margin),
            'loss_tracker': keras.saving.serialize_keras_object(self.loss_tracker),
            'accuracy_tracker': keras.saving.serialize_keras_object(self.accuracy_tracker),
        }
        return {**base_config, **config}

    @classmethod
    def from_config(cls, config):
        config['siamese_net'] = keras.saving.deserialize_keras_object(config.pop('siamese_net'))
        config['margin'] = keras.saving.deserialize_keras_object(config.pop('margin'))
        config['loss_tracker'] = keras.saving.deserialize_keras_object(config.pop('loss_tracker'))
        config['accuracy_tracker'] = keras.saving.deserialize_keras_object(config.pop('accuracy_tracker'))
        return cls(**config)


def train_model(model,
                train_triplets,
                epochs,
                batch_size,
                val_triplets,
                patience,
                delta=0.0001):

    best_val_accuracy = 0
    best_val_loss = float('inf')
    temp_patience = patience
    history = {
        'loss': [],
        'val_loss': [],
        'accuracy': [],
        'val_accuracy': []
    }

    train_steps_per_epoch = math.ceil(len(train_triplets) / batch_size)
    val_steps_per_epoch = math.ceil(len(val_triplets) / batch_size)

    for epoch in range(epochs):
        print(f'Epoch {epoch+1}/{epochs}')
        train_loss = 0.
        train_accuracy = 0.
        val_loss = 0.
        val_accuracy = 0.

        with tqdm(total=train_steps_per_epoch, desc='Training') as pbar:
            for batch in batch_generator(train_triplets, batch_size=batch_size):
                
                loss, accuracy = model.train_on_batch(batch)
                train_loss += loss
                train_accuracy += accuracy

                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})

        with tqdm(total=val_steps_per_epoch, desc='Validation') as pbar:
            for batch in batch_generator(val_triplets, batch_size=batch_size):
                loss, accuracy = model.test_on_batch(batch)
                val_loss += loss
                val_accuracy += accuracy

                pbar.update()
                pbar.set_postfix({'Loss': loss, 'Accuracy': accuracy})

        train_loss /= train_steps_per_epoch
        train_accuracy /= train_steps_per_epoch
        val_loss /= val_steps_per_epoch
        val_accuracy /= val_steps_per_epoch

        history['loss'].append(train_loss)
        history['accuracy'].append(train_accuracy)
        history['val_loss'].append(val_loss)
        history['val_accuracy'].append(val_accuracy)

        print(f'\nTrain Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
        print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}\n')

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            model.layers[0].layers[3].save_weights('best_model.weights.h5')

        if val_loss - best_val_loss > delta:
            temp_patience -= 1
            if temp_patience == 0:
                print('Early stopping: Validation loss did not improve.')
                break
        else:
            best_val_loss = val_loss
            temp_patience = patience

    return model, history


siamese_model = SiameseModel(siamese_net)
siamese_model.jit_compile = False
siamese_model.compile(optimizer=Adam(0.00001))

siamese_model, history = train_model(siamese_model,
                                     train_triplets=train,
                                     epochs=200,
                                     batch_size=64,
                                     val_triplets=val,
                                     patience=3)
Share Improve this question asked Feb 1 at 12:32 Rob99Rob99 34 bronze badges 0
Add a comment  | 

1 Answer 1

Reset to default 0

After reviewing your attempt to convert Keras 2 to 3, I noticed some issues. For instance, DistanceLayer has not been properly migrated to a Keras 3 compatible format. Additionally, the model's forward pass is executed twice while computing loss and metrics, which may impact training speed and overall efficiency. Also I would recommend using the fit method or the custom fit approach whenever possible, rather than custom_training, when working with Keras. I have updated your code from Keras 2 to 3. Here is the code. Some highlights:

  • Support tensorflow and torch as backend.
  • Added tf.data API to build dataset for siamese modelling.
  • Added keras.layers.Random* layers for augmentation.
转载请注明原文地址:http://anycun.com/QandA/1744828358a88184.html