In questo tutorial vediamo come implementare un algoritmo di multi-task learning in TensorFlow, imparando a predire simultaneamente più aspetti da un'unica foto di un volto in input. Nel corso dell'implementazione introdurremo numerosi moduli avanzati di TF, tra cui le librerie tf.data
e tf.image
per elaborare cartelle di immagini, i custom estimator per definire il modello (una rete convolutiva), e la nuovissima Head API (al momento in cui scrivo ancora in developer's preview) per la logica del multi-task learning.
Tutto il codice di questo post è disponibile online sul nostro repository GitHub. L'articolo è stato aggiornato dopo la sua pubblicazione iniziale.
(Brevissima) introduzione al multi-task learning
Chiunque abbia seguito almeno un tutorial introduttivo di deep learning ha familiarità con problemi di classificazione di immagini. In questi problemi, di solito siamo interessati a predire un singolo aspetto dell'immagine, come la presenza o meno di un determinato oggetto. In molti casi, però, le informazioni da estrarre potrebbero essere molteplici: prendendo come esempio un'applicazione di videosorveglianza, oltre a riconoscere una persona potrebbe essere utile capire se si tratta di un uomo o di una donna, la sua età, e così via. In queste situazioni si parla di multi-task learning (MTL), in opposizione al più classico single-task learning. Anche se potremmo affrontare ciascun problema separatamente, come vedremo tra pochissimo le reti neurali si prestano molto bene a considerare tutti i task simultaneamente, sia computazionalmente che logicamente.
Questo post non vuole essere un'introduzione approfondita al MTL, che è un campo molto vasto. Se siete interessati, rimandiamo a questo articolo introduttivo di Sebastian Ruder, pubblicato circa un anno fa.
Contenuto del tutorial
Come esempio pratico di MTL, reimplementeremo una parte dell'articolo Facial Landmark Detection by Deep Multi-task Learning, presentato all'European Conference on Computer Vision qualche anno fa.
Il dataset su cui andremo a lavorare è composto da circa 13000 immagini di volti e, per ciascuno di questi volti siamo interessati a predire 14 attributi:
- Landmark detection: le coordinate di occhi, naso ed estremi laterali della bocca (problemi di regressione);
- Pose classification: sesso, se stanno sorridendo o meno, presenza di occhiali, orientamento del volto (problemi di classificazione).
Per semplicità ne predirremo solo due, anche se il codice è molto facile da estendere per aggiungere tutti gli attributi mancanti. Procederemo in tre parti:
-
Prima di tutto vedremo come importare le cartelle del dataset e manipolare le immagini sfruttando i moduli ad alto livello tf.data e tf.image presenti in TF.
-
Come secondo passo, vedremo come reimplementare la rete convolutiva descritta nell'articolo con i custom estimator di TensorFlow. Gli estimator sono una delle interfacce ad alto livello di TF, che supportano nativamente un grande numero di funzionalità, tra cui allenamento su GPU e checkpointing. Se avete già seguito il tutorial di base su questo argomento presente sul sito ufficiale, potete opzionalmente saltare questa parte.
- Le Head API sono uno strumento ancora in developer's preview che semplificano la scrittura dei custom estimator, implementando al loro interno la maggior parte della logica relativa a ciascun problema affrontabile in pratica (es., classificazione). Vedremo come usarle per riscrivere il codice di prima ed estenderlo al caso di multi-task learning.
Cominciamo!
Requisiti / ottenere il dataset
Per seguire al meglio questo articolo vi consiglio di aggiornare la versione di TensorFlow in vostro possesso, preferibilmente alla 1.6 o superiore. Per scaricare i dati, useremo anche i moduli tqdm e requests:
pip install tensorflow tqdm requests --upgrade
Per scaricare il dataset potete eseguire questo codice (con un piccolo ringraziamento a Stack Overflow):
from tqdm import tqdm
import requests
url = "http://mmlab.ie.cuhk.edu.hk/projects/TCDCN/data/MTFL.zip"
response = requests.get(url, stream=True)
with open("MTFL", "wb") as handle:
for data in tqdm(response.iter_content(), unit=' KB'):
handle.write(data)
Decomprimendo il file vi dovreste ritrovare tre cartelle (all'interno delle quali trovate le immagini che compongono l'intero dataset), oltre a tre file di testo con le annotazioni delle immagini e la descrizione del dataset stesso.
Parte 1a - Caricare i file in Pandas
Cominciamo caricando i file di testo con Pandas:
import pandas as pd
train_data = pd.read_csv('training.txt', sep=' ', \
header=None, skipinitialspace=True, nrows=10000)
test_data = pd.read_csv('testing.txt', sep=' ', \
header=None, skipinitialspace=True, nrows=2995)
Il formato di ciascun file è abbastanza immediato:
print(train_data.iloc[0])
# 0 lfw_5590\Aaron_Eckhart_0001.jpg
# 1 107.25
# 2 147.75
# ...
# 13 2
# 14 3
# Name: 0, dtype: object
La prima riga rappresenta il path dell'immagine, e per ciascuna immagine abbiamo 14 caratteristiche da predire, di cui dieci sono coordinate spaziali (es., le coordinate dell'occhio sinistro), e le altre sono attributi categorici (es., se la persona sta sorridendo o meno). Il formato del path dell'immagine potrebbe dare alcuni problemi su piattaforme Unix, quindi rimpiazziamo rapidamente i backslash prima di continuare:
train_data.iloc[:, 0] = \
train_data.iloc[:, 0].apply(lambda s: s.replace('\\', '/'))
test_data.iloc[:, 0] = \
test_data.iloc[:, 0].apply(lambda s: s.replace('\\', '/'))
Parte 1b - Caricare e processare le immagini con tf.data
Nelle ultime versioni, la maniera consigliata di caricare dati all'interno di TensorFlow è l'utilizzo dei Dataset, che permettono di gestire grandi moli di dati (eventualmente in modo distribuito) oltre a processare nativamente immagini (tramite il pacchetto tf.image
), testo, e molto altro. In questa sezione vediamo nel dettaglio come costruire una funzione per importare i nostri dati, e rimandiamo alla guida ufficiale per più informazioni su molte altre funzionalità di tf.data
.
Ovviamente, il primo passo è importare tutti i dati da Pandas all'interno di TF (per ora consideriamo solo i dati di training):
filenames = tf.constant(train_data.iloc[:, 0].tolist())
label = tf.constant(train_data.iloc[:, 1:].values)
Se non siete familiari con i Dataset, il loro utilizzo è abbastanza semplice. Dataset
è in realtà una classe astratta che permette di caricare dati da numerose sorgenti; una volta caricati, possiamo poi instanziare un iteratore che ci permette di processarli in sequenza:
Caricare dataset partendo da tensori TF dovrebbe essere abbastanza familiare se avete seguito le guide di partenza. Come ripasso, carichiamo i tensori, dividiamo il dataset in mini-batch di 64 elementi, creiamo un iteratore e stampiamo il primo elemento su schermo:
# Crea il dataset
dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
# Crea l'iteratore
it = dataset.batch(64).make_one_shot_iterator().get_next()
# Richiama il primo mini-batch
with tf.Session() as sess:
(imgs, labels) = sess.run(it)
print(imgs[0])
# Prints: b'lfw_5590/Aaron_Eckhart_0001.jpg'
Ovviamente, quello che vogliamo processare sono le immagini, e non i loro path. Non solo quello, ma il caricamento delle immagini in questo caso non è banale, in quanto oltre ad avere dimensioni diverse, possono presentarsi sia in bianco e nero che a colori. Fortunatamente, TensorFlow ha una soluzione (quasi) pronta per questa situazione!
Come prima cosa, possiamo usare il pacchetto tf.image
per gestire il caricamento di una singola immagine (la funzione è adattata da qui):
def _parse_function(filename, label):
image_string = tf.read_file(filename)
image_decoded = tf.image.decode_jpeg(image_string, channels=3)
image_resized = tf.image.resize_images(image_decoded, [40, 40])
image_shape = tf.cast(tf.shape(image_decoded), tf.float32)
label = tf.concat([label[0:5] / image_shape[0], label[5:10] / image_shape[1], label[10:]], axis=0)
return {"x": image_resized}, label
La funzione gestisce quasi tutti i problemi descritti prima:
tf.image.decode_jpeg
prende il file binario e lo decodifica come immagine. Il parametrochannels=3
ci assicura di avere 3 canali in output, anche nel caso di immagini in bianco e nero.resize_images
ridimensiona tutte le immagini al formato desiderato (qui seguiamo l'articolo ed usiamo immagini 40x40).- Normalizziamo tutte le coordinate tra 0 ed 1 rispetto alle dimensioni dell'immagine stessa.
- In output ritorniamo un dizionario
{"x": image_resized}
invece cheimage_resized
, semplicemente perché è il formato richiesto dagli Estimators in TF.
Possiamo applicare la funzione sull'intero dataset, immagine per immagine, sfruttando la funzione map
del Dataset:
dataset = dataset.map(_parse_function)
A questo punto possiamo scrivere l'intera funzione di input, aggiungendo un minimo di logica condizionale per gestire lo shuffling e la ripetizione del dataset in fase di training:
def input_fn(data, is_eval=False):
# Path delle immagini
filenames = tf.constant(data.iloc[:, 0].tolist())
# Etichette delle immagini
labels = tf.constant(data.iloc[:, 1:].values.astype(np.float32))
# Costruisco il dataset
dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(_parse_function)
# Logica di training / testing
if is_eval:
dataset = dataset.batch(64)
else:
dataset = dataset.repeat().shuffle(1000).batch(64)
# Costruisco l'iteratore
return dataset.make_one_shot_iterator().get_next()
Testiamo la funzione stampando la prima immagine di training:
import matplotlib.pyplot as plt
with tf.Session() as sess:
(imgs, _) = sess.run(input_fn(train_data, True))
plt.imshow(imgs["x"][0] / 255)
Parte 2 - Single-task estimator
Passiamo ora alla parte più interessante del tutorial, e cominciamo predicendo un singolo attributo dell'immagine, sviluppando il modello con i custom estimator di TensorFlow.
Se non siete familiari con le reti convolutive o con i custom estimator, questo è un buon momento per ripassare.
Come modello, seguiamo quello dell'articolo, una rete convolutiva abbastanza semplice:
L'utilità delle reti convolutive per il multi-task learning sono evidenti già dalla figura: la prima parte della rete permette di estrarre delle feature dall'immagine condivise per ciascun task (100 in questo caso), a partire dalle quali vengono poi allenati una serie di modelli, uno per ciascun task. Allenando tutti questi modelli simultaneamente, possiamo ottenere feature che sono più robuste, oltre ovviamente a risparmiare un'enormità di tempo computazionale.
Per cominciare definiamo la parte condivisa dell'architettura sfruttando i layers di TF:
def extract_features(features):
# Input Layer
input_layer = tf.reshape(features["x"], [-1, 40, 40, 3])
# Primo layer convolutivo
conv1 = tf.layers.conv2d(inputs=input_layer, filters=16, kernel_size=[5, 5], padding="same", activation=tf.nn.relu)
pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)
# Secondo layer convolutivo
conv2 = tf.layers.conv2d(inputs=pool1, filters=48, kernel_size=[3, 3], padding="same", activation=tf.nn.relu)
pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)
# Terzo layer convolutivo
conv3 = tf.layers.conv2d(inputs=pool2, filters=64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu)
pool3 = tf.layers.max_pooling2d(inputs=conv3, pool_size=[2, 2], strides=2)
# Quarto layer convolutivo
conv4 = tf.layers.conv2d(inputs=pool3, filters=64, kernel_size=[2, 2], padding="same", activation=tf.nn.relu)
# Dense Layer
flat = tf.reshape(conv4, [-1, 5 * 5 * 64])
dense = tf.layers.dense(inputs=flat, units=100, activation=tf.nn.relu)
return dense
Per il momento rimaniamo sul single-task learning, e definiamo un custom estimator per predire la posizione del naso. Una volta definito il modello sopra, il codice dell'estimator è preso quasi completamente dalla guida di TF:
def single_task_cnn_model_fn(features, labels, mode):
# Estrazione delle feature
dense = extract_features(features)
# Predizioni
predictions = tf.layers.dense(inputs=dense, units=2)
outputs = {
"predictions": predictions
}
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode=mode, predictions=outputs)
# Funzione costo (errore quadratico medio)
loss = tf.losses.mean_squared_error(labels=labels[:, 2:8:5], predictions=predictions)
# Ottimizzazione
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.AdamOptimizer()
train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
# Valutazione del modello
eval_metric_ops = {
"rmse": tf.metrics.root_mean_squared_error(
labels=labels[:, 2:8:5], predictions=outputs["predictions"])}
return tf.estimator.EstimatorSpec(
mode=mode, loss=loss, eval_metric_ops=eval_metric_ops)
(Poiché le coordinate dell'occhio sinistro corrispondono rispettivamente alla terza ed ottava etichetta, nel codice estraiamo le colonne corrispondenti con labels[:, 2:8:5]
.)
Il motivo per cui il codice è abbastanza lungo è che la funzione deve gestire l'intera logica del modello, dalla predizione fino all'allenamento (vedremo tra pochissimo come semplificarlo). Per ora alleniamo il nostro modello per un po' di iterazioni:
single_task_classifier.train(input_fn=lambda: input_fn(train_data), steps=1000)
Il modello ottiene già così un errore di circa 0.17 in media sul test set:
Parte 3a - Semplificare il codice con le Head API
Prima di passare al multi-task learning, introduciamo le Head API, una funzionalità ancora in developer's preview che semplifica moltissimo la scrittura di un custom estimator.
Attenzione a non confondere gli oggetti in tf.contrib.estimator
con tf.contrib.learn.Head
, un'implementazione più vecchia ora deprecata.
L'idea di base nasce dall'osservazione che buona parte dell'implementazione di single_task_cnn_model_fn
è relativamente standard una volta scelto il nostro task (es., errore quadratico medio come funzione costo per problemi di regressione). Le Head API permettono di scindere la definizione del modello dall'implementazione della logica del task.
Molto più facile vederlo in pratica che descriverlo! Riscriviamo il nostro modello di prima, questa volta usando le nuove API:
def single_head_cnn_model_fn(features, labels, mode):
# Estrazione delle feature
dense = extract_features(features)
# Predizioni
predictions = tf.layers.dense(inputs=dense, units=2)
# Ottimizzatore
optimizer = tf.train.AdamOptimizer()
# Modello finale
regression_head = tf.contrib.estimator.regression_head(label_dimension=2)
return regression_head.create_estimator_spec(features, mode, predictions, labels[:, 2:8:5], \
lambda x: optimizer.minimize(x, global_step = tf.train.get_or_create_global_step()))
Una volta definito il modo in cui calcolare le predizioni, e quale ottimizzatore usare, buona parte della logica di prima è ora implementata direttamente nella regression_head
, che si occupa di gestire la creazione di tutte le strutture dati e gli oggetti da ritornare all'estimator.
Arriviamo quindi al cuore dell'articolo, vedendo come usare questa funzionalità per semplificare il multi-task learning.
Parte 3b - Multi-task learning (finalmente!)
Ritornando alla figura di prima che definisce il modello, possiamo pensare ad una rete per fare multi-task learning come una rete che ha una parte di modello condivisa (per estrarre le feature) e poi diverse "head", ciascuna associata ad un particolare task. Questo è nativamente gestito in TF con la multi_head
, che permette di associare una lista di head al nostro modello!
Matematicamente parlando, l'ottimizzazione avviene minimizzando la somma delle funzioni costo, una per ciascun task. Ad esempio, avendo due task ed una funzione costo $L_1$ per il primo task ed una funzione costo $L_2$ per il secondo, andremo a minimizzare:
$$ L = L_1 + L_2$$
Le due funzioni non sono però indipendenti, in quanto condividono una buona porzione della rete: questo contribuisce a velocizzare l'apprendimento, oltre a rendere le feature estratte più robuste e generiche. Per questo esempio consideriamo solo due task: la predizione della posizione del naso (come prima), unita alla predizione della posa del volto (profilo sinistro, sinistra, frontale, destra, profilo destro).
Come prima cosa dobbiamo modificare la funzione di input, che in questo caso dovrà ritornare un dizionario di etichette associate alle varie head:
def multihead_input_fn(data, is_eval=True):
features, labels = input_fn(data, is_eval=is_eval)
return features, {'head_nose': labels[:, 2:8:5], 'head_pose': tf.cast(labels[:, -1] - 1.0, tf.int32)}
A questo punto modifichiamo anche il modello, definendo una head appropriata per ciascun task:
def multi_head_cnn_model_fn(features, labels, mode):
dense = extract_features(features)
# Predizioni della rete (per ciascun task)
predictions_nose = tf.layers.dense(inputs=dense, units=2)
predictions_pose = tf.layers.dense(inputs=dense, units=5)
logits = {'head_nose': predictions_nose, 'head_pose': predictions_pose}
# Ottimizzatore
optimizer = tf.train.AdamOptimizer()
# Definiamo le due head
regression_head = tf.contrib.estimator.regression_head(name='head_nose', label_dimension=2)
classification_head = tf.contrib.estimator.multi_class_head(name='head_pose', n_classes=5)
multi_head = tf.contrib.estimator.multi_head([regression_head, classification_head])
return multi_head.create_estimator_spec(features, mode, logits, labels, optimizer)
Il resto è uguale a prima! La differenza è che sia la valutazione che la predizione saranno fatte separatamente per ciascun task, relativamente alle head che abbiamo definito:
multitask_classifier.train(input_fn=lambda: multihead_input_fn(train_data), steps=1000)
multitask_classifier.evaluate(input_fn=lambda: multihead_input_fn(test_data, is_eval=True))
# {'accuracy/head_pose': 0.6290484,
# 'average_loss/head_nose': 0.0408225,
# 'average_loss/head_pose': 1.1491545,
# 'global_step': 1000,
# 'loss': 78.43075,
# 'loss/head_nose': 5.2026973,
# 'loss/head_pose': 73.22804}
Come si vede, il modello ritorna sia la funzione costo combinata ($L$ sopra), sia quella calcolata separatamente sui due problemi, ed allo stesso modo abbiamo misure diverse di accuratezza per i task, adeguate al problema. Similmente se richiediamo delle predizioni:
p = list(multitask_classifier.predict(lambda: input_fn_predict(test_data)))
print(p[0])
# {
# ('head_nose', 'predictions'): array([0.19438197, 0.14960444], dtype=float32),
# ('head_pose', 'logits'): array([-1.1866168 , -0.17579822, ... , -0.49098715], dtype=float32),
# ('head_pose', 'probabilities'): array([0.04211354, 0.11572168, ... , 0.08443644], dtype=float32),
# ('head_pose', 'class_ids'): array([2]), ('head_pose', 'classes'): array([b'2'], dtype=object)
# }
Possiamo stampare entrambe le predizioni simultaneamente:
with tf.Session() as sess:
imgs = sess.run(input_fn_predict(test_data))
prediction_nose = p[1][(('head_nose', 'predictions'))]
prediction_pose = p[1][(('head_pose', 'class_ids'))]
plt.imshow(imgs["x"][1] / 255)
plt.scatter(prediction_nose[0] * 40, prediction_nose[1] * 40, 500, marker='x', color='red', linewidth=5)
plt.text(5, 3, 'Predicted pose: {}'.format(prediction_pose))
Questo conclude il tutorial! Ci sono molte cose che potete testare da qui: allenare il modello per più iterazioni, aggiungere nuovi task, pesare diversamente le due funzioni costo... Ancora una volta, vi invito a leggere il bellissimo articolo di Sebastien Ruder per scoprire di più su questo affascinante problema.
Ovviamente, essendo le head una feature ancora in developer's preview, è possibile che l'interfaccia si modificherà di qui al rilascio stabile.
Se questo articolo ti è piaciuto e vuoi tenerti aggiornato sulle nostre attività, ricordati che l'iscrizione all'Italian Association for Machine Learning è gratuita! Puoi seguirci anche su Facebook e su LinkedIn.