Data Augmentation for tabular data
To train deeplearning models the more data the better. When we're thinking of image data, the deeplearnig community thought about a lot of tricks how to enhance the model given a dataset of images. Meaning that by rotating, flipping, blurring etc. the image we can create more input data and also improve our model.
However, when thinking about tabular data, only few of these techniques exist. In this blogpost I want to show you how to create a variational autoencoder and make use of data augmentation. I will create fake data, which is sampled from the learned distribution of the underlying data.
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import nn, optim
from torch.autograd import Variable
import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device
DATA_PATH = 'data/wine.csv'
df_base = pd.read_csv(DATA_PATH, sep=',')
df_base.head()
cols = df_base.columns
def load_and_standardize_data(path):
# read in from csv
df = pd.read_csv(path, sep=',')
# replace nan with -99
df = df.fillna(-99)
df = df.values.reshape(-1, df.shape[1]).astype('float32')
# randomly split
X_train, X_test = train_test_split(df, test_size=0.3, random_state=42)
# standardize values
scaler = preprocessing.StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
return X_train, X_test, scaler
from torch.utils.data import Dataset, DataLoader
class DataBuilder(Dataset):
def __init__(self, path, train=True):
self.X_train, self.X_test, self.standardizer = load_and_standardize_data(DATA_PATH)
if train:
self.x = torch.from_numpy(self.X_train)
self.len=self.x.shape[0]
else:
self.x = torch.from_numpy(self.X_test)
self.len=self.x.shape[0]
del self.X_train
del self.X_test
def __getitem__(self,index):
return self.x[index]
def __len__(self):
return self.len
traindata_set=DataBuilder(DATA_PATH, train=True)
testdata_set=DataBuilder(DATA_PATH, train=False)
trainloader=DataLoader(dataset=traindata_set,batch_size=1024)
testloader=DataLoader(dataset=testdata_set,batch_size=1024)
type(trainloader.dataset.x), type(testloader.dataset.x)
trainloader.dataset.x.shape, testloader.dataset.x.shape
trainloader.dataset.x
class Autoencoder(nn.Module):
def __init__(self,D_in,H=50,H2=12,latent_dim=3):
#Encoder
super(Autoencoder,self).__init__()
self.linear1=nn.Linear(D_in,H)
self.lin_bn1 = nn.BatchNorm1d(num_features=H)
self.linear2=nn.Linear(H,H2)
self.lin_bn2 = nn.BatchNorm1d(num_features=H2)
self.linear3=nn.Linear(H2,H2)
self.lin_bn3 = nn.BatchNorm1d(num_features=H2)
# Latent vectors mu and sigma
self.fc1 = nn.Linear(H2, latent_dim)
self.bn1 = nn.BatchNorm1d(num_features=latent_dim)
self.fc21 = nn.Linear(latent_dim, latent_dim)
self.fc22 = nn.Linear(latent_dim, latent_dim)
# Sampling vector
self.fc3 = nn.Linear(latent_dim, latent_dim)
self.fc_bn3 = nn.BatchNorm1d(latent_dim)
self.fc4 = nn.Linear(latent_dim, H2)
self.fc_bn4 = nn.BatchNorm1d(H2)
# Decoder
self.linear4=nn.Linear(H2,H2)
self.lin_bn4 = nn.BatchNorm1d(num_features=H2)
self.linear5=nn.Linear(H2,H)
self.lin_bn5 = nn.BatchNorm1d(num_features=H)
self.linear6=nn.Linear(H,D_in)
self.lin_bn6 = nn.BatchNorm1d(num_features=D_in)
self.relu = nn.ReLU()
def encode(self, x):
lin1 = self.relu(self.lin_bn1(self.linear1(x)))
lin2 = self.relu(self.lin_bn2(self.linear2(lin1)))
lin3 = self.relu(self.lin_bn3(self.linear3(lin2)))
fc1 = F.relu(self.bn1(self.fc1(lin3)))
r1 = self.fc21(fc1)
r2 = self.fc22(fc1)
return r1, r2
def reparameterize(self, mu, logvar):
if self.training:
std = logvar.mul(0.5).exp_()
eps = Variable(std.data.new(std.size()).normal_())
return eps.mul(std).add_(mu)
else:
return mu
def decode(self, z):
fc3 = self.relu(self.fc_bn3(self.fc3(z)))
fc4 = self.relu(self.fc_bn4(self.fc4(fc3)))
lin4 = self.relu(self.lin_bn4(self.linear4(fc4)))
lin5 = self.relu(self.lin_bn5(self.linear5(lin4)))
return self.lin_bn6(self.linear6(lin5))
def forward(self, x):
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
return self.decode(z), mu, logvar
class customLoss(nn.Module):
def __init__(self):
super(customLoss, self).__init__()
self.mse_loss = nn.MSELoss(reduction="sum")
def forward(self, x_recon, x, mu, logvar):
loss_MSE = self.mse_loss(x_recon, x)
loss_KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return loss_MSE + loss_KLD
If you want to better understand the variational autoencoder technique, look here.
For better understanding this AutoencoderClass, let me go briefly through it. This is a variational autoencoder (VAE) with two hidden layers, which (by default, but you can change this) 50 and then 12 activations. The latent factors are set to 3 (you can change that, too). So we're first exploding our initially 14 variables to 50 activations, then condensing it to 12, then to 3. From these 3 latent factors we then sample to recreate the original 14 values. We do that by inflating the 3 latent factors back to 12, then 50 and finally 14 activations (we decode the latent factors so to speak). With this reconstructed batch (recon_batch) we compare it with the original batch, computate our loss and adjust the weights and biases via our gradient (our optimizer here will be Adam).
D_in = data_set.x.shape[1]
H = 50
H2 = 12
model = Autoencoder(D_in, H, H2).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_mse = customLoss()
epochs = 1500
log_interval = 50
val_losses = []
train_losses = []
test_losses = []
def train(epoch):
model.train()
train_loss = 0
for batch_idx, data in enumerate(trainloader):
data = data.to(device)
optimizer.zero_grad()
recon_batch, mu, logvar = model(data)
loss = loss_mse(recon_batch, data, mu, logvar)
loss.backward()
train_loss += loss.item()
optimizer.step()
if epoch % 200 == 0:
print('====> Epoch: {} Average training loss: {:.4f}'.format(
epoch, train_loss / len(trainloader.dataset)))
train_losses.append(train_loss / len(trainloader.dataset))
def test(epoch):
with torch.no_grad():
test_loss = 0
for batch_idx, data in enumerate(testloader):
data = data.to(device)
optimizer.zero_grad()
recon_batch, mu, logvar = model(data)
loss = loss_mse(recon_batch, data, mu, logvar)
test_loss += loss.item()
if epoch % 200 == 0:
print('====> Epoch: {} Average test loss: {:.4f}'.format(
epoch, test_loss / len(testloader.dataset)))
test_losses.append(test_loss / len(testloader.dataset))
for epoch in range(1, epochs + 1):
train(epoch)
test(epoch)
We we're able to reduce the training and test loss but quite a bit, let's have a look at how the fake results actually look like vs the real results:
with torch.no_grad():
for batch_idx, data in enumerate(testloader):
data = data.to(device)
optimizer.zero_grad()
recon_batch, mu, logvar = model(data)
scaler = trainloader.dataset.standardizer
recon_row = scaler.inverse_transform(recon_batch[0].cpu().numpy())
real_row = scaler.inverse_transform(testloader.dataset.x[0].cpu().numpy())
df = pd.DataFrame(np.stack((recon_row, real_row)), columns = cols)
df
Not to bad right (the first row is the reconstructed row, the second one the real row from the data)? However, what we want is to built this row not with the real input so to speak, since right now we were giving the model the complete rows with their 14 columns, condensed it to 3 input parameters, just to blow it up again to the corresponding 14 columns. What I want to do is to create these 14 rows by giving the model 3 latent factors as input. Let's have a look at these latent variables.
sigma = torch.exp(logvar/2)
mu[1], sigma[1]
Mu represents the mean for each of our latent factor values, logvar the log of the standard deviation. Each of these have a distribution by itself. We have 54 cases in our test data, so we have 3x54 different mu and logvar. We can have a look at the distribution of each of the 3 latent variables:
mu.mean(axis=0), sigma.mean(axis=0)
All of the latent variables have a mean around zero, but the last latent factor has a wider standard deviation. So when we sample values from each of these latent variables, the last value will vary much more then the other two. I assume a normal distribution for all the latent factors.
# sample z from q
no_samples = 20
q = torch.distributions.Normal(mu.mean(axis=0), sigma.mean(axis=0))
z = q.rsample(sample_shape=torch.Size([no_samples]))
z.shape
z[:5]
With these three latent factors we can now start and create fake data for our dataset and see how it looks like:
with torch.no_grad():
pred = model.decode(z).cpu().numpy()
pred[1]
fake_data = scaler.inverse_transform(pred)
fake_data.shape
df_fake = pd.DataFrame(fake_data, columns = cols)
df_fake['Wine'] = np.round(df_fake['Wine']).astype(int)
df_fake['Wine'] = np.where(df_fake['Wine']<1, 1, df_fake['Wine'])
df_fake.head(10)
For comparison the real data:
df_base.sample(10)
df_base.groupby('Wine').mean()
df_fake.groupby('Wine').mean()
That looks pretty convincing if you ask me.
To sum up, we've built a variational autoencoder, which we trained on our trainingset. We checked whether our loss kept on improving based on the testset, which the autoencoder never saw for generating fake data. We then calculated the mean and standard deviation from our latent factors given the test data. We've then sampled from this distribution to feed it back into our decoder to create some fake data. With this approach I am now able to create as much fake data derived from the underlying distribution as a want. And I think the results look promising.
You can take this approach to for example create data from under-represented in highly skewed datasets instead of just weighting them higher. The re-weighting approach might cause the algorithm to find relations where there are none, only because a few then overrepresented data points share this relation by random. With the shown approach, the learned distribution would take into account the high variance these features have and therefore will hopefully help the algorithm to not draw these false conclusions.
Stay tuned for the next blogpost, where I will show the shown approach in exactly this use case.