Single Speaker Speech Recognition with Hidden Markov Models
Hidden Markov Models (HMMs) are powerful, flexible methods for representing and classifying data with trends over time, and have been a key component in speech recognition systems for many years.
I found it very difficult to find a good example (with code!) of a simple speech recognition system, so I decided to create this post. Though this implementation won't win any awards for "Best Speech Recognizer", I hope it will provide some insight into how HMMs can be used for speech recognition and other tasks.
In this post, I will define what Hidden Markov Models are, show how to implement one form (Gaussian Mixture Model HMM, GMM-HMM) using numpy + scipy, and how to use this algorithm for single speaker speech recognition. For a more "production grade" HMM implementation, see hmmlearn which holds the HMM implementations that were formerly a part of sklearn.
To demonstrate this algorithm, we need a dataset to operate on. I have chosen to use the sample dataset from this Google Code project by Hakon Sandsmark. I also used this code as a reference when creating my own implementation of a Guassian Mixture Model HMM (GMM-HMM). This aided in testing my implementation, as well as giving a frame of reference for performance.
Other available datasets are largely multispeaker, but the simple frequency peak features used in this example do not work in the multispeaker regime (different speakers have different frequency content for the same word! Let alone male/female speech differences...). Future work will cover more advanced feature extraction techniques for audio, and extend these examples to multispeaker recognition.
%matplotlib inline from utils import progress_bar_downloader import os #Hosting files on my dropbox since downloading from google code is painful #Original project hosting is here: https://code.google.com/p/hmm-speech-recognition/downloads/list #Audio is included in the zip file link = 'https://dl.dropboxusercontent.com/u/15378192/audio.tar.gz' dlname = 'audio.tar.gz' if not os.path.exists('./%s'%dlname): progress_bar_downloader(link, dlname) os.system('tar xzf %s'%dlname) else: print '%s already downloaded!'%dlname
audio.tar.gz already downloaded!
fpaths =  labels =  spoken =  for f in os.listdir('audio'): for w in os.listdir('audio/' + f): fpaths.append('audio/' + f + '/' + w) labels.append(f) if f not in spoken: spoken.append(f) print 'Words spoken:',spoken
Words spoken: ['kiwi', 'apple', 'banana', 'orange', 'pineapple', 'peach', 'lime']
This data has a total of 7 different spoken words, and each was spoken 15 different times, giving a grand total of 105 files. Next, the files will be extracted into a single data matrix (zero padding files to uniform length), and a label vector with the correct label for each data file is created.
#Files can be heard in Linux using the following commands from the command line #cat kiwi07.wav | aplay -f S16_LE -t wav -r 8000 #Files are signed 16 bit raw, sample rate 8000 from scipy.io import wavfile import numpy as np data = np.zeros((len(fpaths), 32000)) maxsize = -1 for n,file in enumerate(fpaths): _, d = wavfile.read(file) data[n, :d.shape] = d if d.shape > maxsize: maxsize = d.shape data = data[:, :maxsize] #Each sample file is one row in data, and has one entry in labels print 'Number of files total:',data.shape all_labels = np.zeros(data.shape) for n, l in enumerate(set(labels)): all_labels[np.array([i for i, _ in enumerate(labels) if _ == l])] = n print 'Labels and label indices',all_labels
Number of files total: 105 Labels and label indices [ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 5. 4. 4. 4. 4. 4. 4. 4. 4. 4. 4. 4. 4. 4. 4. 4. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 6. 6. 6. 6. 6. 6. 6. 6. 6. 6. 6. 6. 6. 6. 6.]
Science Fiction (Double Feature)
Once the data has been downloaded and turned into an input matrix, the next step is to extract features from the raw data, as is done in many other machine learning pipelines.
Most "consumer grade" speaker recognition systems use advanced processing to extract a variety of features that describe the sound over both frequency and time, and until recently "custom features" were one of the keys to making a great recognition system. The current state of the art (to my knowledge, at least) has recently moved to using deep neural networks for feature extraction, which I hope to show in a future post. For now, we will stick to very simple features, in order to show a "simplest working example".
In this example, simple frequency peak detection was used, rather than the bevy of expert features typically used in a modern speech recognition pipeline (MFCCs, or more recently, a pretrained multilayer neural network). This has a direct effect on performance, but allows for a holistic implementation that fits in a single post :)
import scipy import numpy as np def stft(x, fftsize=64, overlap_pct=.5): #Modified from http://stackoverflow.com/questions/2459295/stft-and-istft-in-python hop = int(fftsize * (1 - overlap_pct)) w = scipy.hanning(fftsize + 1)[:-1] raw = np.array([np.fft.rfft(w * x[i:i + fftsize]) for i in range(0, len(x) - fftsize, hop)]) return raw[:, :(fftsize / 2)]
In order to find peaks in frequency, a technique called the Short Time Fourier Transform (STFT) is used. This idea is quite simple - the FFT is applied over chunks of the input data, resulting in a 2D FFT "image", usually called the spectrogram. Setting the FFT size allows us to control the amount of frequency resolution available, while overlapping these windows allows us to control the time resolution at the cost of increasing the data size.
Briefly, if X is a vector of length 20, we wish to create a 2D array, STFT_X. If the FFT size is 10, and the overlap is .5 (5 samples), this means (in pseudocode):
STFT_X[0, :] = FFT(X[0:9]) STFT_X[1, :] = FFT(X[5:14]) STFT_X[2, :] = FFT(X[10:19])
We then have 3 FFT frames which have been extracted from the input sample X. For our feature extraction, we would next find peaks in each row of STFT_X.
The STFT is usually a crucial element of most DSP pipelines, and highly efficient routines are available to compute this (see FFTW, which numpy wraps). Though I have implemented my own STFT here, it is also possible to use matplotlib's specgram function instead.
import matplotlib.pyplot as plt plt.plot(data[0, :], color='steelblue') plt.title('Timeseries example for %s'%labels) plt.xlim(0, 3500) plt.xlabel('Time (samples)') plt.ylabel('Amplitude (signed 16 bit)') plt.figure() log_freq = 20 * np.log(np.abs(stft(data[0, :]))) print log_freq.shape plt.imshow(log_freq, cmap='gray', interpolation=None) plt.xlabel('Freq (bin)') plt.ylabel('Time (overlapped frames)') plt.ylim(log_freq.shape) plt.title('PSD of %s example'%labels)
-c:9: RuntimeWarning: divide by zero encountered in log
<matplotlib.text.Text at 0x3eb6890>
Next, peak detection is applied to each FFT frame of every data file. In a previous blog post, I described the use of wavelets for peak detection. Here, we will use a moving window to search for peaks instead. The primary steps to this algorithm are as follows:
- Create a data window of length X. In this example X=9, though any window size can be used.
- Split this window into 3 sections: left, center and right. For the 9 sample window, this will be LLLCCCRRR.
- Apply some function (mean, median, max, min, etc) over each section of the window.
- If the maximum value of the function over the center section is greater than the result for left or right, continue to the next check. Otherwise GOTO 6.
- If the maximum value for f(CCC) is in the very center of the window, you have found a peak! Mark it and continue. Otherwise, go to the next step.
- Shift the input data by one sample, and repeat the process. (data[0:9] -> data[1:10])
- Once all data has been processed, you should have some detected peaks. Sort them in descending order by amplitude, then output the top N peaks. In this case, N=6
An implementation of this algorithm is shown below.
from numpy.lib.stride_tricks import as_strided #Peak detection using the technique described here: http://kkjkok.blogspot.com/2013/12/dsp-snippets_9.html def peakfind(x, n_peaks, l_size=3, r_size=3, c_size=3, f=np.mean): win_size = l_size + r_size + c_size shape = x.shape[:-1] + (x.shape[-1] - win_size + 1, win_size) strides = x.strides + (x.strides[-1],) xs = as_strided(x, shape=shape, strides=strides) def is_peak(x): centered = (np.argmax(x) == l_size + int(c_size/2)) l = x[:l_size] c = x[l_size:l_size + c_size] r = x[-r_size:] passes = np.max(c) > np.max([f(l), f(r)]) if centered and passes: return np.max(c) else: return -1 r = np.apply_along_axis(is_peak, 1, xs) top = np.argsort(r, None)[::-1] heights = r[top[:n_peaks]] #Add l_size and half - 1 of center size to get to actual peak location top[top > -1] = top[top > -1] + l_size + int(c_size / 2.) return heights, top[:n_peaks]
plot_data = np.abs(stft(data[20, :]))[15, :] values, locs = peakfind(plot_data, n_peaks=6) fp = locs[values > -1] fv = values[values > -1] plt.plot(plot_data, color='steelblue') plt.plot(fp, fv, 'x', color='darkred') plt.title('Peak location example') plt.xlabel('Frequency (bins)') plt.ylabel('Amplitude')
<matplotlib.text.Text at 0x3dcc510>
The peak detector does an acceptable job in detecting peaks, though it is by no means perfect. One of the limiting factors here is the length of the FFT - this peak detector is currently searching over 9 sample chunks, which is very large when the FFT size is only 64 bins! Once again, this will have a negative impact on performance.
We could set the FFT size larger, but in testing this algorithm longer FFT sizes appeared to do worse, even with higher overlap to maintain the same time resolution. Ultimately, this is where more advanced features would come into play.
#This processing (top freq peaks) only works for single speaker case... need better features for multispeaker! #MFCC (or deep NN/automatic feature extraction) could be interesting all_obs =  for i in range(data.shape): d = np.abs(stft(data[i, :])) n_dim = 6 obs = np.zeros((n_dim, d.shape)) for r in range(d.shape): _, t = peakfind(d[r, :], n_peaks=n_dim) obs[:, r] = t.copy() if i % 10 == 0: print "Processed obs %s"%i all_obs.append(obs) all_obs = np.atleast_3d(all_obs)
Processed obs 0 Processed obs 10 Processed obs 20 Processed obs 30 Processed obs 40 Processed obs 50 Processed obs 60 Processed obs 70 Processed obs 80 Processed obs 90 Processed obs 100
An implementation of a GMM-HMM is shown below. This code is also available as a gist, and any improvements I make will show up there.
This algorithm is fairly complex, and a good summary can be seen here, Brown and here, Moore. This implementation does not have any Viterbi backtracking - since my only interest (for now) is in speech classification, only Baum-Welch and Forward-Backward portions of the GMM-HMM are necessary.
HMMs can do do three primary tasks:
- State Estimation \(P(S | O)\) - can be useful if you have prior info about what states mean and create the state probabilities yourself.
- Path Estimation - given observations, what is the most likely "state path"? Not useful in our case, and not even implemented here!
- Maximum Likelihood Estimation \(P(O | \lambda)\) - learn the HMM parameters \(\lambda\) which maximize the probability of observations. This is the primary method we will use!
To train the HMM, we use the Baum-Welch algorithm. There are many, many resources on this algorithm and I will not regurgitate here. Implementing this HMM was fairly tricky, and I highly recommend using a library unless you are interested in a "learning experience".
This code uses requires scipy 0.14 for the multivariate_normal density.
import scipy.stats as st import numpy as np class gmmhmm: #This class converted with modifications from https://code.google.com/p/hmm-speech-recognition/source/browse/Word.m def __init__(self, n_states): self.n_states = n_states self.random_state = np.random.RandomState(0) #Normalize random initial state self.prior = self._normalize(self.random_state.rand(self.n_states, 1)) self.A = self._stochasticize(self.random_state.rand(self.n_states, self.n_states)) self.mu = None self.covs = None self.n_dims = None def _forward(self, B): log_likelihood = 0. T = B.shape alpha = np.zeros(B.shape) for t in range(T): if t == 0: alpha[:, t] = B[:, t] * self.prior.ravel() else: alpha[:, t] = B[:, t] * np.dot(self.A.T, alpha[:, t - 1]) alpha_sum = np.sum(alpha[:, t]) alpha[:, t] /= alpha_sum log_likelihood = log_likelihood + np.log(alpha_sum) return log_likelihood, alpha def _backward(self, B): T = B.shape beta = np.zeros(B.shape); beta[:, -1] = np.ones(B.shape) for t in range(T - 1)[::-1]: beta[:, t] = np.dot(self.A, (B[:, t + 1] * beta[:, t + 1])) beta[:, t] /= np.sum(beta[:, t]) return beta def _state_likelihood(self, obs): obs = np.atleast_2d(obs) B = np.zeros((self.n_states, obs.shape)) for s in range(self.n_states): #Needs scipy 0.14 B[s, :] = st.multivariate_normal.pdf(obs.T, mean=self.mu[:, s].T, cov=self.covs[:, :, s].T) #This function can (and will!) return values >> 1 #See the discussion here for the equivalent matlab function #https://groups.google.com/forum/#!topic/comp.soft-sys.matlab/YksWK0T74Ak #Key line: "Probabilities have to be less than 1, #Densities can be anything, even infinite (at individual points)." #This is evaluating the density at individual points... return B def _normalize(self, x): return (x + (x == 0)) / np.sum(x) def _stochasticize(self, x): return (x + (x == 0)) / np.sum(x, axis=1) def _em_init(self, obs): #Using this _em_init function allows for less required constructor args if self.n_dims is None: self.n_dims = obs.shape if self.mu is None: subset = self.random_state.choice(np.arange(self.n_dims), size=self.n_states, replace=False) self.mu = obs[:, subset] if self.covs is None: self.covs = np.zeros((self.n_dims, self.n_dims, self.n_states)) self.covs += np.diag(np.diag(np.cov(obs)))[:, :, None] return self def _em_step(self, obs): obs = np.atleast_2d(obs) B = self._state_likelihood(obs) T = obs.shape log_likelihood, alpha = self._forward(B) beta = self._backward(B) xi_sum = np.zeros((self.n_states, self.n_states)) gamma = np.zeros((self.n_states, T)) for t in range(T - 1): partial_sum = self.A * np.dot(alpha[:, t], (beta[:, t] * B[:, t + 1]).T) xi_sum += self._normalize(partial_sum) partial_g = alpha[:, t] * beta[:, t] gamma[:, t] = self._normalize(partial_g) partial_g = alpha[:, -1] * beta[:, -1] gamma[:, -1] = self._normalize(partial_g) expected_prior = gamma[:, 0] expected_A = self._stochasticize(xi_sum) expected_mu = np.zeros((self.n_dims, self.n_states)) expected_covs = np.zeros((self.n_dims, self.n_dims, self.n_states)) gamma_state_sum = np.sum(gamma, axis=1) #Set zeros to 1 before dividing gamma_state_sum = gamma_state_sum + (gamma_state_sum == 0) for s in range(self.n_states): gamma_obs = obs * gamma[s, :] expected_mu[:, s] = np.sum(gamma_obs, axis=1) / gamma_state_sum[s] partial_covs = np.dot(gamma_obs, obs.T) / gamma_state_sum[s] - np.dot(expected_mu[:, s], expected_mu[:, s].T) #Symmetrize partial_covs = np.triu(partial_covs) + np.triu(partial_covs).T - np.diag(partial_covs) #Ensure positive semidefinite by adding diagonal loading expected_covs += .01 * np.eye(self.n_dims)[:, :, None] self.prior = expected_prior self.mu = expected_mu self.covs = expected_covs self.A = expected_A return log_likelihood def fit(self, obs, n_iter=15): #Support for 2D and 3D arrays #2D should be n_features, n_dims #3D should be n_examples, n_features, n_dims #For example, with 6 features per speech segment, 105 different words #this array should be size #(105, 6, X) where X is the number of frames with features extracted #For a single example file, the array should be size (6, X) if len(obs.shape) == 2: for i in range(n_iter): self._em_init(obs) log_likelihood = self._em_step(obs) elif len(obs.shape) == 3: count = obs.shape for n in range(count): for i in range(n_iter): self._em_init(obs[n, :, :]) log_likelihood = self._em_step(obs[n, :, :]) return self def transform(self, obs): #Support for 2D and 3D arrays #2D should be n_features, n_dims #3D should be n_examples, n_features, n_dims #For example, with 6 features per speech segment, 105 different words #this array should be size #(105, 6, X) where X is the number of frames with features extracted #For a single example file, the array should be size (6, X) if len(obs.shape) == 2: B = self._state_likelihood(obs) log_likelihood, _ = self._forward(B) return log_likelihood elif len(obs.shape) == 3: count = obs.shape out = np.zeros((count,)) for n in range(count): B = self._state_likelihood(obs[n, :, :]) log_likelihood, _ = self._forward(B) out[n] = log_likelihood return out if __name__ == "__main__": rstate = np.random.RandomState(0) t1 = np.ones((4, 40)) + .001 * rstate.rand(4, 40) t1 /= t1.sum(axis=0) t2 = rstate.rand(*t1.shape) t2 /= t2.sum(axis=0) m1 = gmmhmm(2) m1.fit(t1) m2 = gmmhmm(2) m2.fit(t2) m1t1 = m1.transform(t1) m2t1 = m2.transform(t1) print "Likelihoods for test set 1" print "M1:",m1t1 print "M2:",m2t1 print "Prediction for test set 1" print "Model", np.argmax([m1t1, m2t1]) + 1 print m1t2 = m1.transform(t2) m2t2 = m2.transform(t2) print "Likelihoods for test set 2" print "M1:",m1t2 print "M2:",m2t2 print "Prediction for test set 2" print "Model", np.argmax([m1t2, m2t2]) + 1
Likelihoods for test set 1 M1: 221.388285751 M2: 165.272802308 Prediction for test set 1 Model 1 Likelihoods for test set 2 M1: 33.1945942149 M2: 59.1527475305 Prediction for test set 2 Model 2
Once the peaks are detected for each frame, we have a 3 dimensional numpy array of size [numdatafiles, numpeaks, numframes] - in this case [105, 6, 216]. It is important that the observations be normalized in order to form probabilities, since the HMM expects to be trained on state probabilities. What does this mean?
For each sample, we have extracted a number of frames over time, and each frame has its own set of peaks. We divide each frame by the sum of all peaks (axis 0 of all_obs[n], or axis 1 of all_obs), and form a "state probability" for each frame in our dataset. In essence, we are creating 6 states, where the highest peak in a frame is state 1, second highest state 2, etc. for each FFT frame.
This representation shows the "spread" of the peaks - if all the peaks are close together, each one will have about the same probability, while peaks that are in a more unique distribution should take on a more unique fingerprint. An extreme case would be 3 low frequency peaks, and 3 high - this would result in very low state probabilities for the low frequency peaks, and very high state probabilities for the high frequency peaks. Since the HMM attempts to learn transition probabilities between frames, this is a decent way to represent speech for HMMs.
from sklearn.cross_validation import StratifiedShuffleSplit sss = StratifiedShuffleSplit(all_labels, test_size=0.1, random_state=0) for n,i in enumerate(all_obs): all_obs[n] /= all_obs[n].sum(axis=0) for train_index, test_index in sss: X_train, X_test = all_obs[train_index, ...], all_obs[test_index, ...] y_train, y_test = all_labels[train_index], all_labels[test_index] print 'Size of training matrix:', X_train.shape print 'Size of testing matrix:', X_test.shape
Size of training matrix: (91, 6, 216) Size of testing matrix: (14, 6, 216)
To predict words, we need to train 7 separate GMM-HMM models, one for each word. We will then feed the features for a test sample into each, choosing the word associated with the GMM-HMM having the highest output likelihood. This maximum likelihood estimate is our best guess for what word was spoken.
ys = set(all_labels) ms = [gmmhmm(6) for y in ys] _ = [m.fit(X_train[y_train == y, :, :]) for m, y in zip(ms, ys)] ps = [m.transform(X_test) for m in ms] res = np.vstack(ps) predicted_labels = np.argmax(res, axis=0) missed = (predicted_labels != y_test) print 'Test accuracy:%.2f percent'%(100 * (1 - np.mean(missed)))
Test accuracy:78.57 percent
It seems that this classifier does fairly well! Let's look at a confusion matrix for what words were incorrect, to see if there is any unusual pattern in the misclassifications.
from sklearn.metrics import confusion_matrix cm = confusion_matrix(y_test, predicted_labels) plt.matshow(cm, cmap='gray') ax = plt.gca() _ = ax.set_xticklabels([" "] + [l[:2] for l in spoken]) _ = ax.set_yticklabels([" "] + spoken) plt.title('Confusion matrix, single speaker') plt.ylabel('True label') plt.xlabel('Predicted label')
<matplotlib.text.Text at 0x49d6750>
This speaker recognition pipeline, despite its simplicity, still manages to score a test accuracy of ~78%! Though the confusion matrix doesn't provide much insight (confusing pineapple and orange? apple and lime?), it is good to see a "real" example of using HMMs to do single speaker speech recognition.