This project is initiated by Raphael Jeong-Hin Chin to create a Deep Learning (DL) model that can be implemented in automated kiosks to help people with difficulties ordering goods and services through the kiosks. This project was primarily focused on the Penang Chinese community, and thus, the DL model is trained with the Penang Hokkien Dialect for this purpose. Nonetheless, similar techniques can be applied to other languages and dialects with modifications on the model.
The primary motivation for this project was due to this article [1] that talks about the difficulties and challenges faced by elderly people while ordering KFC through its automated kiosk. The lack of human interaction in the process of purchasing goods and services has caused difficulties to elderly customers and various research was conducted to solve this issue. This project is created to serve as a proof-of-concept for introducing DL models into automated kiosks so that customers can order goods and services through the Hokkien Dialect.
Penang Hokkien was chosen because it is a subdivision of Hokkien which the author is fluent in. Hokkien is the most spoken dialect by the Chinese ethnic group not only in Penang, but also throughout Malaysia and Singapore. For example, the word kiasu is used to describe the grasping or selfish attitude arising from a fear of missing out on something. This Hokkien word has been included in the Oxford English Dictionary as a legitimate Singaporean English word. Moreover, the Penang Hokkien is almost similar to other types of Hokkien spoken in Malaysia, Singapore, and Taiwan. Therefore, choosing Hokkien as a dialect to train on can help solve the challenges posed by the automated kiosks and preserve the Hokkien dialect.
The voice samples were collected from eight different individuals. Each individual will say each digit (1 to 10) in Hokkien once. The samples will not published with this notebook. Individuals who are interested in the samples should contact the author at jeonghin@gmail.com. A majority portion of the codes contained in this notebook came from Ketan Doshi. [2]
from sklearn import preprocessing
import torch
wav_names = []
wav_labels = []
tot_samples = 8 # Total number of samples
for sample_size in range(1,tot_samples+1):
for true_class in range(1,11):
wav_names.append("/sample"+str(sample_size)+"_"+str(true_class)+".wav")
wav_labels.append(str(true_class))
le = preprocessing.LabelEncoder()
targets = le.fit_transform(wav_labels)
# targets: array([0, 1, 2, 3])
targets = torch.as_tensor(targets)
# targets: tensor([0, 1, 2, 3])
from pathlib import Path
import pandas as pd
df = pd.DataFrame({'relative_path':wav_names, 'classID':wav_labels})
data_path = Path.cwd()
import math, random
import torchaudio
from torchaudio import transforms
from IPython.display import Audio
class AudioUtil():
# ----------------------------
# Load an audio file. Return the signal as a tensor and the sample rate
# ----------------------------
@staticmethod
def open(audio_file):
sig, sr = torchaudio.load(audio_file)
return (sig, sr)
# ----------------------------
# Convert the given audio to the desired number of channels
# ----------------------------
@staticmethod
def rechannel(aud, new_channel):
sig, sr = aud
if (sig.shape[0] == new_channel):
# Nothing to do
return aud
if (new_channel == 1):
# Convert from stereo to mono by selecting only the first channel
resig = sig[:1, :]
else:
# Convert from mono to stereo by duplicating the first channel
resig = torch.cat([sig, sig])
return ((resig, sr))
@staticmethod
def resample(aud, newsr):
sig, sr = aud
if (sr == newsr):
# Nothing to do
return aud
num_channels = sig.shape[0]
# Resample first channel
resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
if (num_channels > 1):
# Resample the second channel and merge both channels
retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
resig = torch.cat([resig, retwo])
return ((resig, newsr))
# ----------------------------
# Pad (or truncate) the signal to a fixed length 'max_ms' in milliseconds
# ----------------------------
@staticmethod
def pad_trunc(aud, max_ms):
sig, sr = aud
num_rows, sig_len = sig.shape
max_len = sr//1000 * max_ms
if (sig_len > max_len):
# Truncate the signal to the given length
sig = sig[:,:max_len]
elif (sig_len < max_len):
# Length of padding to add at the beginning and end of the signal
pad_begin_len = random.randint(0, max_len - sig_len)
pad_end_len = max_len - sig_len - pad_begin_len
# Pad with 0s
pad_begin = torch.zeros((num_rows, pad_begin_len))
pad_end = torch.zeros((num_rows, pad_end_len))
sig = torch.cat((pad_begin, sig, pad_end), 1)
return (sig, sr)
# ----------------------------
# Shifts the signal to the left or right by some percent. Values at the end
# are 'wrapped around' to the start of the transformed signal.
# ----------------------------
@staticmethod
def time_shift(aud, shift_limit):
sig,sr = aud
_, sig_len = sig.shape
shift_amt = int(random.random() * shift_limit * sig_len)
return (sig.roll(shift_amt), sr)
# ----------------------------
# Generate a Spectrogram
# ----------------------------
@staticmethod
def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
sig,sr = aud
top_db = 80
# spec has shape [channel, n_mels, time], where channel is mono, stereo etc
spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)
# Convert to decibels
spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
return (spec)
# ----------------------------
# Augment the Spectrogram by masking out some sections of it in both the frequency
# dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
# overfitting and to help the model generalise better. The masked sections are
# replaced with the mean value.
# ----------------------------
@staticmethod
def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
_, n_mels, n_steps = spec.shape
mask_value = spec.mean()
aug_spec = spec
freq_mask_param = max_mask_pct * n_mels
for _ in range(n_freq_masks):
aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)
time_mask_param = max_mask_pct * n_steps
for _ in range(n_time_masks):
aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)
return aug_spec
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio
# ----------------------------
# Sound Dataset
# ----------------------------
class SoundDS(Dataset):
def __init__(self, df, data_path):
self.df = df
self.data_path = str(data_path)
self.duration = 4000
self.sr = 44100
self.channel = 2
self.shift_pct = 0.4
# ----------------------------
# Number of items in dataset
# ----------------------------
def __len__(self):
return len(self.df)
# ----------------------------
# Get i'th item in dataset
# ----------------------------
def __getitem__(self, idx):
# Absolute file path of the audio file - concatenate the audio directory with
# the relative path
audio_file = self.data_path + self.df.loc[idx, 'relative_path']
# Get the Class ID
class_id = self.df.loc[idx, 'classID']
aud = AudioUtil.open(audio_file)
# Some sounds have a higher sample rate, or fewer channels compared to the
# majority. So make all sounds have the same number of channels and same
# sample rate. Unless the sample rate is the same, the pad_trunc will still
# result in arrays of different lengths, even though the sound duration is
# the same.
reaud = AudioUtil.resample(aud, self.sr)
rechan = AudioUtil.rechannel(reaud, self.channel)
dur_aud = AudioUtil.pad_trunc(rechan, self.duration)
shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)
sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
return aug_sgram, class_id
from torch.utils.data import random_split
myds = SoundDS(df, data_path)
# Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])
# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=num_val, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=num_val, shuffle=False)
import torch.nn.functional as F
from torch.nn import init
import torch.nn as nn
# ----------------------------
# Audio Classification Model
# ----------------------------
class AudioClassifier (nn.Module):
# ----------------------------
# Build the model architecture
# ----------------------------
def __init__(self):
super().__init__()
conv_layers = []
# First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
self.relu1 = nn.ReLU()
self.bn1 = nn.BatchNorm2d(8)
init.kaiming_normal_(self.conv1.weight, a=0.1)
self.conv1.bias.data.zero_()
conv_layers += [self.conv1, self.relu1, self.bn1]
self.conv2 = nn.Conv2d(8, 32, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
self.relu2 = nn.ReLU()
self.bn2 = nn.BatchNorm2d(32)
init.kaiming_normal_(self.conv2.weight, a=0.1)
self.conv2.bias.data.zero_()
conv_layers += [self.conv2, self.relu2, self.bn2]
self.conv3 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
self.relu3 = nn.ReLU()
self.bn3 = nn.BatchNorm2d(64)
init.kaiming_normal_(self.conv3.weight, a=0.1)
self.conv3.bias.data.zero_()
conv_layers += [self.conv3, self.relu3, self.bn3]
self.conv4 = nn.Conv2d(64, 128, kernel_size=(2, 2), stride=(2, 2), padding=(1, 1))
self.relu4 = nn.ReLU()
self.bn4 = nn.BatchNorm2d(128)
init.kaiming_normal_(self.conv4.weight, a=0.1)
self.conv4.bias.data.zero_()
conv_layers += [self.conv4, self.relu4, self.bn4]
self.conv5 = nn.Conv2d(128, 256, kernel_size=(2, 2), stride=(1, 1), padding=(1, 1))
self.relu5 = nn.ReLU()
self.bn5 = nn.BatchNorm2d(256)
init.kaiming_normal_(self.conv5.weight, a=0.1)
self.conv5.bias.data.zero_()
conv_layers += [self.conv5, self.relu5, self.bn5]
# Linear Classifier
self.ap = nn.AdaptiveAvgPool2d(output_size=1)
self.lin = nn.Linear(in_features=256, out_features=10)
# Wrap the Convolutional Blocks
self.conv = nn.Sequential(*conv_layers)
# ----------------------------
# Forward pass computations
# ----------------------------
def forward(self, x):
# Run the convolutional blocks
x = self.conv(x)
# Adaptive pool and flatten for input to linear layer
x = self.ap(x)
x = x.view(x.shape[0], -1)
# Linear layer
x = self.lin(x)
# Final output
return x
# Create the model and put it on the GPU if available
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)
# Check that it is on Cuda
next(myModel.parameters()).device
device(type='cpu')
# ----------------------------
# Training Loop
# ----------------------------
def training(model, train_dl, num_epochs):
# Loss Function, Optimizer and Scheduler
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.1)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.01,
steps_per_epoch=int(len(train_dl)),
epochs=num_epochs,
anneal_strategy='linear')
# Repeat for each epoch
for epoch in range(num_epochs):
running_loss = 0.0
correct_prediction = 0
total_prediction = 0
# Repeat for each batch in the training set
for i, data in enumerate(train_dl):
# Get the input features and target labels, and put them on the GPU
# inputs, labels = data[0].to(device), torch.as_tensor(data[1]).to(device)
# inputs, labels = data[0], data[1]
le = preprocessing.LabelEncoder()
targets = le.fit_transform(list(data[1]))
inputs, labels = data[0], torch.as_tensor(targets)
# print(targets)
# Normalize the inputs
inputs_m, inputs_s = inputs.mean(), inputs.std()
inputs = (inputs - inputs_m) / inputs_s
# Zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
scheduler.step()
# Keep stats for Loss and Accuracy
running_loss += loss.item()
# Get the predicted class with the highest score
_, prediction = torch.max(outputs,1)
# Count of predictions that matched the target label
correct_prediction += (prediction == labels).sum().item()
total_prediction += prediction.shape[0]
#if i % 10 == 0: # print every 10 mini-batches
# print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
# Print stats at the end of the epoch
num_batches = len(train_dl)
avg_loss = running_loss / num_batches
acc = correct_prediction/total_prediction
print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')
print('Finished Training')
num_epochs= 10 # Just for demo, adjust this higher.
training(myModel, train_dl, num_epochs)
[W NNPACK.cpp:51] Could not initialize NNPACK! Reason: Unsupported hardware.
Epoch: 0, Loss: 2.33, Accuracy: 0.09 Epoch: 1, Loss: 2.33, Accuracy: 0.20 Epoch: 2, Loss: 2.41, Accuracy: 0.14 Epoch: 3, Loss: 2.32, Accuracy: 0.11 Epoch: 4, Loss: 2.20, Accuracy: 0.17 Epoch: 5, Loss: 2.33, Accuracy: 0.17 Epoch: 6, Loss: 2.17, Accuracy: 0.20 Epoch: 7, Loss: 2.18, Accuracy: 0.12 Epoch: 8, Loss: 2.05, Accuracy: 0.23 Epoch: 9, Loss: 2.14, Accuracy: 0.20 Finished Training
# ----------------------------
# Inference
# ----------------------------
def inference (model, val_dl):
correct_prediction = 0
total_prediction = 0
# Disable gradient updates
with torch.no_grad():
for data in val_dl:
# Get the input features and target labels, and put them on the GPU
# inputs, labels = data[0].to(device), data[1].to(device)
# inputs, labels = data[0], data[1]
le = preprocessing.LabelEncoder()
targets = le.fit_transform(list(data[1]))
inputs, labels = data[0], torch.as_tensor(targets)
# Normalize the inputs
inputs_m, inputs_s = inputs.mean(), inputs.std()
inputs = (inputs - inputs_m) / inputs_s
# Get predictions
outputs = model(inputs)
# Get the predicted class with the highest score
_, prediction = torch.max(outputs,1)
# Count of predictions that matched the target label
correct_prediction += (prediction == labels).sum().item()
total_prediction += prediction.shape[0]
acc = correct_prediction/total_prediction
print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')
# Run inference on trained model with the validation set
inference(myModel, val_dl)
Accuracy: 0.12, Total items: 16
The result is not satisfying as the train and test accuracies are too low. Small amount of data is one of the causes for the low accuracy as the model did not have enough data to train on. One way to solve it is to collect more samples.
As this is just a personal project and no funding was provided, it is difficult to reach out to the public to collect voice samples. However, if this personal project makes sense and research groups out there are interested to make it into a real research project, feel free to do so.