Time Series Classification with Convolutions
How a simple 1d Convolutional Neural Net is able to find time patterns without further feature engineering and achieve impressive results.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from datetime import datetime, timedelta
import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
from torch.optim.lr_scheduler import _LRScheduler
from torch.utils.data import TensorDataset, DataLoader
For this blog, I will use the Italy Power demand dataset. The classification task is to distinguish days from Oct to March (inclusive) from April to September. The dataset can be found here: http://www.timeseriesclassification.com/description.php?Dataset=ItalyPowerDemand. The best accuracy so far is stated by the webpage as 97%.
df_train = pd.read_csv('Data/ItalyPowerDemand_TRAIN.txt', header=None,delim_whitespace=True)
df_test = pd.read_csv('Data/ItalyPowerDemand_TEST.txt', header=None, delim_whitespace=True)
df_train.head()
len(df_train), len(df_test)
The first column represents the target, the rest is one feature over the course of 24 days. The task then is to correctly classify, whether this month is in winter or in summer.
x_train = df_train.iloc[:, 1:].values.reshape(-1, 1, 24)
x_test = df_test.iloc[:, 1:].values.reshape(-1, 1, 24)
y_train = df_train.iloc[:, 0].values-1
y_test = df_test.iloc[:, 0].values-1
x_train.shape, x_test.shape, y_train.shape, y_test.shape
df_train.iloc[0, 1:].plot.line(title=f'time series with class = {df_train.iloc[0, 0]}');
df_train.iloc[1, 1:].plot.line(title=f'time series with class = {df_train.iloc[1, 0]}');
df_train.iloc[2, 1:].plot.line(title=f'time series with class = {df_train.iloc[2, 0]}');
def create_datasets(train, test, train_target, test_target, valid_pct=0.1, seed=None):
"""Converts NumPy arrays into PyTorch datsets."""
train, test, train_target, test_target = train, test, train_target, test_target
assert len(train)==len(train_target)
idx = np.arange(len(train))
trn_idx, val_idx = train_test_split(
idx, test_size=valid_pct, random_state=seed)
trn_ds = TensorDataset(
torch.tensor(train[trn_idx]).float(),
torch.tensor(train_target[trn_idx]).long())
val_ds = TensorDataset(
torch.tensor(train[val_idx]).float(),
torch.tensor(train_target[val_idx]).long())
tst_ds = TensorDataset(
torch.tensor(test).float(),
torch.tensor(test_target).long())
return trn_ds, val_ds, tst_ds
def create_loaders(data, bs=128, jobs=0):
"""Wraps the datasets returned by create_datasets function with data loaders."""
trn_ds, val_ds, tst_ds = data
trn_dl = DataLoader(trn_ds, batch_size=bs, shuffle=True, num_workers=jobs)
val_dl = DataLoader(val_ds, batch_size=bs, shuffle=False, num_workers=jobs)
tst_dl = DataLoader(tst_ds, batch_size=bs, shuffle=False, num_workers=jobs)
return trn_dl, val_dl, tst_dl
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
datasets = create_datasets(x_train, x_test, y_train, y_test, seed=1234)
trn_dl, val_dl, tst_dl = create_loaders(datasets, bs=256)
class CostumConv1d(nn.Module):
"""Implementes a 1-d convolution with 'batteries included'.
The module adds (optionally) activation function and dropout layers right after
a separable convolution layer.
"""
def __init__(self, ni, no, kernel, stride, pad, drop=None,
activ=lambda: nn.ReLU(inplace=True)):
super().__init__()
assert drop is None or (0.0 < drop < 1.0)
layers = [nn.Conv1d(ni, no, kernel, stride, pad)]
if activ:
layers.append(activ())
if drop is not None:
layers.append(nn.Dropout(drop))
self.layers = nn.Sequential(*layers)
def forward(self, x):
return self.layers(x)
class Flatten(nn.Module):
"""Converts N-dimensional tensor into 'flat' one."""
def __init__(self, keep_batch_dim=True):
super().__init__()
self.keep_batch_dim = keep_batch_dim
def forward(self, x):
if self.keep_batch_dim:
return x.view(x.size(0), -1)
return x.view(-1)
When it comes to Convolutions, we can freely choose the output dimension, however, the length of the newly created tensor after the convolution is pre-defined and calculated as follows:
After our convolutions we want the L-dimension to be equal to 1, so we can flatten it. We have a sequence of 24. According to the formula we can calculate how the convolution L changes when setting kernel_size, padding and dilation accordingly. Or, we can check it manually:
for epoch in range(1):
epoch_loss = 0
for i, batch in enumerate(trn_dl):
x_raw, y_batch = [t.to(device) for t in batch]
x_raw.shape
If you want to, you can just fiddle around with the numbers. We want to end up at 1 for L in the end. After manually grid searching, I came up with a model where we only use one convolution, with a kernel of the size of the timeframe (at least almost). After that, I apply a max_pooling layer.
raw_ni=x_train.shape[1] # no of input features (here:1)
drop=0.3
m = CostumConv1d(raw_ni, 32, 23, 1, 0, drop=drop)
output_ = m(x_raw)
print(output_.shape)
# m = CostumConv1d(32, 64, 3, 2, 0, drop=drop)
# output_ = m(output_)
# print(output_.shape)
m = nn.MaxPool1d(2, stride=2)
output_ = m(output_)
print(output_.shape)
And now we know how to set up our model.
class Classifier(nn.Module):
def __init__(self, raw_ni, no, drop=.5):
super().__init__()
self.raw = nn.Sequential(
CostumConv1d(raw_ni, 128, 23, 1, 0, drop=drop),
# CostumConv1d( 32, 64, 3, 2, 0, drop=drop),
# CostumConv1d( 64, 256, 3, 1, 0, drop=drop),
# CostumConv1d( 256, 256, 2, 1, 0, drop=drop),
# CostumConv1d( 256, 256, 2, 1, 0, drop=drop),
nn.MaxPool1d(2, stride=2),
Flatten(),
nn.Dropout(drop), nn.Linear(128, 64), nn.ReLU(inplace=True),
nn.Dropout(drop), nn.Linear( 64, 64), nn.ReLU(inplace=True))
self.out = nn.Sequential(
nn.Linear(64, 64), nn.ReLU(inplace=True), nn.Linear(64, no))
def forward(self, t_raw):
raw_out = self.raw(t_raw)
out = self.out(raw_out)
return out
raw_feat = x_train.shape[1]
lr = 0.001
n_epochs = 500
iterations_per_epoch = len(trn_dl)
num_classes = 2
best_acc = 0
patience, trials = 200, 0
base = 1
step = 2
loss_history = []
acc_history = []
model = Classifier(raw_feat, num_classes).to(device)
criterion = nn.CrossEntropyLoss()
opt = optim.Adam(model.parameters(), lr=lr)
print('Start model training')
for epoch in range(1, n_epochs + 1):
model.train()
epoch_loss = 0
for i, batch in enumerate(trn_dl):
x_raw, y_batch = [t.to(device) for t in batch]
opt.zero_grad()
out = model(x_raw)
loss = criterion(out, y_batch)
epoch_loss += loss.item()
loss.backward()
opt.step()
epoch_loss /= trn_sz
loss_history.append(epoch_loss)
model.eval()
correct, total = 0, 0
for batch in val_dl:
x_raw, y_batch = [t.to(device) for t in batch]
out = model(x_raw)
preds = F.log_softmax(out, dim=1).argmax(dim=1)
total += y_batch.size(0)
correct += (preds == y_batch).sum().item()
acc = correct / total
acc_history.append(acc)
if epoch % base == 0:
print(f'Epoch: {epoch:3d}. Loss: {epoch_loss:.4f}. Acc.: {acc:2.2%}')
base *= step
if acc > best_acc:
trials = 0
best_acc = acc
torch.save(model.state_dict(), 'best.pth')
print(f'Epoch {epoch} best model saved with accuracy: {best_acc:2.2%}')
else:
trials += 1
if trials >= patience:
print(f'Early stopping on epoch {epoch}')
break
print('Done!')
def smooth(y, box_pts):
box = np.ones(box_pts)/box_pts
y_smooth = np.convolve(y, box, mode='same')
return y_smooth
f, ax = plt.subplots(1, 2, figsize=(12, 4))
ax[0].plot(loss_history, label='loss')
ax[0].set_title('Validation Loss History')
ax[0].set_xlabel('Epoch no.')
ax[0].set_ylabel('Loss')
ax[1].plot(smooth(acc_history, 5)[:-2], label='acc')
ax[1].set_title('Validation Accuracy History')
ax[1].set_xlabel('Epoch no.')
ax[1].set_ylabel('Accuracy');
We see that our model did a great job in classifying the validation data correctly. Remember, we only have training data for about 60 cases. Let's see how our model performs on the test dataset.
preds_array = np.array([])
for batch in tst_dl:
x_raw, y_batch = [t.to(device) for t in batch]
out = model(x_raw)
preds = F.log_softmax(out, dim=1).argmax(dim=1).numpy()
preds_array = np.concatenate((preds_array, preds), axis=None)
from sklearn.metrics import confusion_matrix
confusion_matrix(y_test, preds_array)
from sklearn.metrics import classification_report
target_names = ['class 1', 'class 2']
print(classification_report(y_test, preds_array, target_names=target_names))
Glorious! We achieved 97% accuracy! That is the current best result for this dataset. Let's have a look at the wrongly classified timeseries.
wrongly_classified_idx = np.argwhere(y_test!=preds_array).reshape(-1)
df_test.iloc[wrongly_classified_idx[0], 1:].plot.line(title=f'time series with class = {df_test.iloc[wrongly_classified_idx[0], 0]} \n Prediction: {preds_array[wrongly_classified_idx[0]]+1}');
df_test.iloc[wrongly_classified_idx[1], 1:].plot.line(title=f'time series with class = {df_test.iloc[wrongly_classified_idx[1], 0]} \n Prediction: {preds_array[wrongly_classified_idx[1]]+1}');
We can see that these cases are pretty hard to classify correctly. Especially the first case looks almost like a mislabeling.
After this very hands-on approach in the next blog I'll write about the way 1d convolutions work and the math behind it, so stay tuned! Lasse