Listening to Speech

Making a chatbot listen to you
Published

April 3, 2021

As part of the conversation with the house, I need to add support for understanding spoken utterances. This is so my son can speak and have the house hear him.

There are two interesting models that I can use for this, Speech2Text and the various forms of Wav2Vec2. I’m going to start by trying to capture some raw microphone sounds using SpeechRecognition.

To get this working you need to install the SpeechRecognition and pyaudio python packages. When it’s working it’s quite fun to just run it from the command line:

Code
! python -m speech_recognition
ALSA lib pcm_dmix.c:1089:(snd_pcm_dmix_open) unable to open slave
ALSA lib pcm.c:2642:(snd_pcm_open_noupdate) Unknown PCM cards.pcm.rear
ALSA lib pcm.c:2642:(snd_pcm_open_noupdate) Unknown PCM cards.pcm.center_lfe
ALSA lib pcm.c:2642:(snd_pcm_open_noupdate) Unknown PCM cards.pcm.side
ALSA lib pcm_route.c:869:(find_matching_chmap) Found no matching channel map
ALSA lib pcm_oss.c:377:(_snd_pcm_oss_open) Unknown field port
ALSA lib pcm_oss.c:377:(_snd_pcm_oss_open) Unknown field port
ALSA lib pcm_usb_stream.c:486:(_snd_pcm_usb_stream_open) Invalid type for card
ALSA lib pcm_usb_stream.c:486:(_snd_pcm_usb_stream_open) Invalid type for card
ALSA lib pcm_dmix.c:1089:(snd_pcm_dmix_open) unable to open slave
A moment of silence, please...
Set minimum energy threshold to 250.50880193287082
Say something!
Got it! Now to recognize it...
You said hello
Say something!
Got it! Now to recognize it...
You said Gujarat
Say something!
^C

The second time I actually said kwyjibo, which is a nonsense word so I’m not surprised it didn’t get it. Anyway it looks like this tool has speech recognition built in already. I can use that as a baseline to compare with.

We can start by capturing some sound and then comparing the built in speech recognition system to the Speech2Text one.

Code
import speech_recognition as sr

recognizer = sr.Recognizer()
with sr.Microphone() as source:
    print("I'm listening")
    audio = recognizer.listen(source)
I'm listening
Code
len(audio.get_wav_data())
286764
Code
try:
    sphinx_text = recognizer.recognize_sphinx(audio)
    print(f"Sphinx thinks you said {sphinx_text}")
except sr.UnknownValueError:
    print("Sphinx could not understand audio")
except sr.RequestError as e:
    print(f"Sphinx error; {e}")
Sphinx thinks you said hello

This needs the pocketsphinx python package which in turn requires swig and pulseaudio development (I installed this with sudo apt-get install swig libpulse-dev).

Since you have all this installed, you can also play the sound with the following:

Code
import pyaudio
import wave
import io

def play_sound(data: str) -> None:
    wavfile = wave.open(io.BytesIO(data))

    player = pyaudio.PyAudio()

    try:
        stream = player.open(
            format=player.get_format_from_width(wavfile.getsampwidth()),
            channels=wavfile.getnchannels(),
            rate=wavfile.getframerate(),
            output=True
        )

        try:
            data = wavfile.readframes(1024)
            while data:
                stream.write(data)
                data = wavfile.readframes(1024)
        finally:
            stream.stop_stream()
            stream.close()
    finally:
        player.terminate()
Code
play_sound(audio.get_wav_data())

I can hear my horrible voice saying hello. So sphinx works pretty well! I would hope that Speech2Text works better in noisy environments.

For now I might just hook this up to the house chat and see if it works.

Code
# from src/main/python/blog/house/converse.py
# pylint: disable=missing-docstring, not-callable, no-member, line-too-long, too-many-instance-attributes
from __future__ import annotations

import tarfile
import warnings
from dataclasses import dataclass, replace
from itertools import chain
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import torch
import torch.nn.functional as F
from transformers import OpenAIGPTLMHeadModel, OpenAIGPTTokenizer, cached_path

ATTR_TO_SPECIAL_TOKEN = {
    "bos_token": "<bos>",
    "eos_token": "<eos>",
    "pad_token": "<pad>",
    "additional_special_tokens": ["<speaker1>", "<speaker2>"],
}
SPECIAL_TOKENS = ["<bos>", "<eos>", "<speaker1>", "<speaker2>", "<pad>"]
HF_FINETUNED_MODEL = "https://s3.amazonaws.com/models.huggingface.co/transfer-learning-chatbot/gpt_personachat_cache.tar.gz"
MODEL_CHATBOT_FOLDER = None


@dataclass
class Settings:
    min_length: int = 1  # minimum response length in tokens
    max_length: int = 20  # maximum response length in tokens
    max_history: int = 2  # number of human utterances remembered
    device: torch.device = torch.device("cpu")

    # temperature, top_k and top_p are used to perform top-k and nucleus (top_p) sampling.
    # This is a successor to beam search which tries to more accurately reflect the variance of actual speech.
    # Nucleus filtering is described in Holtzman et al. (http://arxiv.org/abs/1904.09751)
    temperature: float = 0.7
    top_k: int = 0
    top_p: float = 0.9

    no_sample: bool = False  # just use greedy decoding instead of sampling


@dataclass
class ModelAndTokenizer:
    model: OpenAIGPTLMHeadModel
    tokenizer: OpenAIGPTTokenizer

    @staticmethod
    def load() -> ModelAndTokenizer:
        model_path = download_model()
        tokenizer = OpenAIGPTTokenizer.from_pretrained(model_path)
        model = OpenAIGPTLMHeadModel.from_pretrained(model_path)

        add_special_tokens_(model, tokenizer)

        return ModelAndTokenizer(model=model, tokenizer=tokenizer)


@dataclass
class Conversation:
    settings: Settings
    model_and_tokenizer: ModelAndTokenizer
    personality: List[List[int]]
    history: List[List[int]]

    @staticmethod
    def make(
        model_and_tokenizer: ModelAndTokenizer,
        personality: str,
        settings: Settings = Settings(),
    ) -> Conversation:
        tokenizer = model_and_tokenizer.tokenizer
        encoded_personality = [
            tokenizer.encode(line.strip().casefold())
            for line in personality.splitlines()
            if line.strip()
        ]
        return Conversation(
            settings=settings,
            model_and_tokenizer=model_and_tokenizer,
            personality=encoded_personality,
            history=[],
        )

    @torch.no_grad()
    def respond(
        self,
        utterance: str,
    ) -> Tuple[str, Conversation]:
        tokenizer = self.model_and_tokenizer.tokenizer

        history = self.history + [tokenizer.encode(utterance)]
        out_ids = sample_sequence(
            personality=self.personality,
            history=history,
            tokenizer=tokenizer,
            model=self.model_and_tokenizer.model,
            settings=self.settings,
        )
        history.append(out_ids)
        history = history[-(2 * self.settings.max_history + 1) :]
        out_text = tokenizer.decode(out_ids, skip_special_tokens=True)
        return out_text, replace(self, history=history)


def download_model(cache_dir: Optional[Path] = None) -> Path:
    archive = cached_path(HF_FINETUNED_MODEL, cache_dir=cache_dir)
    expanded = Path(archive).parent / "expanded"
    expanded.mkdir(exist_ok=True, parents=True)
    with tarfile.open(archive, "r:gz") as archive:
        archive.extractall(expanded)
    return expanded


def add_special_tokens_(
    model: OpenAIGPTLMHeadModel, tokenizer: OpenAIGPTTokenizer
) -> None:
    """ Add special tokens to the tokenizer and the model if they have not already been added. """
    orig_num_tokens = len(tokenizer.encoder)
    num_added_tokens = tokenizer.add_special_tokens(
        ATTR_TO_SPECIAL_TOKEN
    )  # doesn't add if they are already there
    if num_added_tokens > 0:
        model.resize_token_embeddings(new_num_tokens=orig_num_tokens + num_added_tokens)


def converse(
    personality: str,
    tokenizer: OpenAIGPTTokenizer,
    model: OpenAIGPTLMHeadModel,
    settings: Settings = Settings(),
) -> None:
    encoded_personality = [
        tokenizer.encode(line.strip().casefold())
        for line in personality.splitlines()
        if line.strip()
    ]

    history = []
    while True:
        raw_text = input(">>> ")
        while not raw_text:
            print("Prompt should not be empty!")
            raw_text = input(">>> ")
        if raw_text.strip() == "quit":
            break
        history.append(tokenizer.encode(raw_text))
        with torch.no_grad():
            out_ids = sample_sequence(
                personality=encoded_personality,
                history=history,
                tokenizer=tokenizer,
                model=model,
                settings=settings,
            )
        history.append(out_ids)
        history = history[-(2 * settings.max_history + 1) :]
        out_text = tokenizer.decode(out_ids, skip_special_tokens=True)
        print(out_text)


def sample_sequence(
    *,
    personality: List[List[int]],
    history: List[List[int]],
    tokenizer: OpenAIGPTTokenizer,
    model: OpenAIGPTLMHeadModel,
    settings: Settings = Settings(),
    current_output: Optional[List[int]] = None,
):
    special_tokens_ids = tokenizer.convert_tokens_to_ids(SPECIAL_TOKENS)
    if current_output is None:
        current_output = []

    for i in range(settings.max_length):
        instance = build_input_from_segments(
            personality=personality,
            history=history,
            reply=current_output,
            tokenizer=tokenizer,
            with_eos=False,
        )

        input_ids = torch.tensor(
            instance["input_ids"], device=settings.device
        ).unsqueeze(0)
        token_type_ids = torch.tensor(
            instance["token_type_ids"], device=settings.device
        ).unsqueeze(0)

        logits = model(input_ids, token_type_ids=token_type_ids).logits
        if isinstance(logits, tuple):  # for gpt2 and maybe others
            logits = logits[0]
        logits = logits[0, -1, :] / settings.temperature
        logits = top_filtering(logits, top_k=settings.top_k, top_p=settings.top_p)
        probs = F.softmax(logits, dim=-1)

        prev = (
            torch.topk(probs, 1)[1]
            if settings.no_sample
            else torch.multinomial(probs, 1)
        )
        if i < settings.min_length and prev.item() in special_tokens_ids:
            while prev.item() in special_tokens_ids:
                if probs.max().item() == 1:
                    warnings.warn(
                        "Warning: model generating special token with probability 1."
                    )
                    break  # avoid infinitely looping over special token
                prev = torch.multinomial(probs, num_samples=1)

        if prev.item() in special_tokens_ids:
            break
        current_output.append(prev.item())

    return current_output


def build_input_from_segments(
    *,
    personality: List[List[int]],
    history,
    reply,
    tokenizer: OpenAIGPTTokenizer,
    lm_labels: bool = False,
    with_eos: bool = True,
) -> Dict[str, Any]:
    """ Build a sequence of input from 3 segments: personality, history and last reply. """
    bos, eos, speaker1, speaker2 = tokenizer.convert_tokens_to_ids(SPECIAL_TOKENS[:-1])
    sequence = (
        [[bos] + list(chain(*personality))]
        + history
        + [reply + ([eos] if with_eos else [])]
    )
    sequence = [sequence[0]] + [
        [speaker2 if (len(sequence) - i) % 2 else speaker1] + s
        for i, s in enumerate(sequence[1:])
    ]
    instance = {}
    instance["input_ids"] = list(chain(*sequence))
    instance["token_type_ids"] = [
        speaker2 if i % 2 else speaker1 for i, s in enumerate(sequence) for _ in s
    ]
    instance["mc_token_ids"] = len(instance["input_ids"]) - 1
    instance["lm_labels"] = [-100] * len(instance["input_ids"])
    if lm_labels:
        instance["lm_labels"] = (
            ([-100] * sum(len(s) for s in sequence[:-1])) + [-100] + sequence[-1][1:]
        )
    return instance


def top_filtering(
    logits: torch.Tensor,
    top_k: float = 0.0,
    top_p: float = 0.9,
    threshold: float = -float("Inf"),
    filter_value: float = -float("Inf"),
):
    """Filter a distribution of logits using top-k, top-p (nucleus) and/or threshold filtering
    Args:
        logits: logits distribution shape (vocabulary size)
        top_k: <=0: no filtering, >0: keep only top k tokens with highest probability.
        top_p: <=0.0: no filtering, >0.0: keep only a subset S of candidates, where S is the smallest subset
            whose total probability mass is greater than or equal to the threshold top_p.
            In practice, we select the highest probability tokens whose cumulative probability mass exceeds
            the threshold top_p.
        threshold: a minimal threshold to keep logits
    """
    assert (
        logits.dim() == 1
    )  # Only work for batch size 1 for now - could update but it would obfuscate a bit the code
    top_k = min(top_k, logits.size(-1))
    if top_k > 0:
        # Remove all tokens with a probability less than the last token in the top-k tokens
        indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None]
        logits[indices_to_remove] = filter_value

    if top_p > 0.0:
        # Compute cumulative probabilities of sorted tokens
        sorted_logits, sorted_indices = torch.sort(logits, descending=True)
        cumulative_probabilities = torch.cumsum(
            F.softmax(sorted_logits, dim=-1), dim=-1
        )

        # Remove tokens with cumulative probability above the threshold
        sorted_indices_to_remove = cumulative_probabilities > top_p
        # Shift the indices to the right to keep also the first token above the threshold
        sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
        sorted_indices_to_remove[..., 0] = 0

        # Back to unsorted indices and set them to -infinity
        indices_to_remove = sorted_indices[sorted_indices_to_remove]
        logits[indices_to_remove] = filter_value

    indices_to_remove = logits < threshold
    logits[indices_to_remove] = filter_value

    return logits
Code
model_and_tokenizer = ModelAndTokenizer.load()
settings = Settings()
conversation = Conversation.make(
    model_and_tokenizer=model_and_tokenizer,
    settings=settings,
    personality="""
    I am a house.
    I love my son.
    I like my organs.
    I can hear.
    """
)
Some weights of the model checkpoint at /home/matthew/.cache/huggingface/transformers/expanded were not used when initializing OpenAIGPTLMHeadModel: ['multiple_choice_head.summary.weight', 'multiple_choice_head.summary.bias']
- This IS expected if you are initializing OpenAIGPTLMHeadModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing OpenAIGPTLMHeadModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Code
conversation.respond("hello")[0]
'hello! how are you doing today?'

So I’ve created a clean interface for the code from the previous post. It holds the current state in the Conversation class, and then allows you to provide an utterance to it. It generates the text response and the subsequent conversation.

So lets try it out…

Code
import speech_recognition as sr
from typing import Optional

def listen_sr() -> Optional[str]:
    recognizer = sr.Recognizer()
    with sr.Microphone() as source:
        print("I'm listening")
        audio = recognizer.listen(source)

    try:
        sphinx_text = recognizer.recognize_sphinx(audio)
        print(f"Sphinx thinks you said: {sphinx_text}")
        return sphinx_text
    except sr.UnknownValueError:
        print("Sphinx could not understand audio")
    except sr.RequestError as e:
        print(f"Sphinx error; {e}")
    return None

def converse_sr(conversation: Conversation) -> None:
    while True:
        text = listen_sr()
        if text is None:
            print("say quit to end")
            continue
        text = text.casefold().strip()
        if text == "quit":
            return
        if not text:
            continue

        response, conversation = conversation.respond(text)
        print(response)
Code
converse_sr(conversation)
I'm listening
Sphinx thinks you said: if one
i've my son i'll go for a walk with him
I'm listening
Sphinx thinks you said: how are they
i'm a child myself and have a big family
I'm listening
Sphinx thinks you said: fincher
that's cool. my son is a pro wrestler
I'm listening
Sphinx thinks you said: but isn't that
do you have any hobbies
I'm listening
Sphinx thinks you said: quit

So now that I’ve tried it out I think that the sphinx speech recognition needs work. Only my quit utterance was actually correctly interpreted.

Code
import torch
from transformers import Speech2TextProcessor, Speech2TextForConditionalGeneration

model = Speech2TextForConditionalGeneration.from_pretrained("facebook/s2t-small-librispeech-asr")
processor = Speech2TextProcessor.from_pretrained("facebook/s2t-small-librispeech-asr")
Code
import speech_recognition as sr

recognizer = sr.Recognizer()
with sr.Microphone() as source:
    print("I'm listening")
    audio = recognizer.listen(source)
I'm listening
Code
import librosa
import io

speech = librosa.load(io.BytesIO(audio.get_wav_data()), sr=16_000)[0]
inputs = processor(speech, sampling_rate=16_000, return_tensors="pt")
generated_ids = model.generate(input_ids=inputs["input_features"], attention_mask=inputs["attention_mask"])
transcription = processor.batch_decode(generated_ids)

transcription
['</s> hi</s>']

I actually said hello. It’s close enough though!

Code
import io

import speech_recognition as sr
import librosa

def listen_s2t(processor: Speech2TextProcessor, model: Speech2TextForConditionalGeneration) -> str:
    recognizer = sr.Recognizer()
    with sr.Microphone() as source:
        print("I'm listening")
        audio = recognizer.listen(source)

    speech = librosa.load(io.BytesIO(audio.get_wav_data()), sr=16_000)[0]
    inputs = processor(speech, sampling_rate=16_000, return_tensors="pt")
    generated_ids = model.generate(input_ids=inputs["input_features"], attention_mask=inputs["attention_mask"])
    utterances = processor.batch_decode(generated_ids)
    
    # </s> is the end of speech token
    # splitting like this separates out individual utterances
    return [
        utterance.strip()
        for entry in utterances
        for utterance in entry.split("</s>")
        if utterance.strip()
    ]
Code
listen_s2t(processor=processor, model=model)
I'm listening
['hallo how are you']
Code
def converse_s2t(conversation: Conversation, processor: Speech2TextProcessor, model: Speech2TextForConditionalGeneration) -> None:
    while True:
        text = listen_s2t(processor=processor, model=model)
        if not text:
            print("say quit to end")
            continue
        text = text[0].casefold().strip()
        if text == "quit":
            print("ending")
            return
        if not text:
            continue

        print(f">>> {text}")
        response, conversation = conversation.respond(text)
        print(response)
Code
converse_s2t(conversation, processor=processor, model=model)
I'm listening
>>> have i heard ye
hello! how are you doing today?
I'm listening
>>> and i will vex you
vex me? my son loves me.
I'm listening
>>> your son does love you
i love him. he is a very good kid.
I'm listening
>>> what do you think he'll be when he grows up
i'm sure he will be a great dad
I'm listening
>>> do you like your dad
?
I'm listening
ending

So this also has problems encoding my speech. I wonder if this is related to the size of the model? There are medium and large versions available.

Separately though there is the wav2vec2 model. I think it would be worth trying that out.

Code
from transformers import Wav2Vec2Tokenizer, Wav2Vec2ForCTC

tokenizer = Wav2Vec2Tokenizer.from_pretrained("facebook/wav2vec2-base-960h")
model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")
/home/matthew/.cache/pypoetry/virtualenvs/blog-HrtMnrOS-py3.9/lib/python3.9/site-packages/transformers/models/wav2vec2/tokenization_wav2vec2.py:356: FutureWarning: The class `Wav2Vec2Tokenizer` is deprecated and will be removed in version 5 of Transformers. Please use `Wav2Vec2Processor` or `Wav2Vec2CTCTokenizer` instead.
  warnings.warn(
Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Code
import speech_recognition as sr

recognizer = sr.Recognizer()
with sr.Microphone() as source:
    print("I'm listening")
    audio = recognizer.listen(source)
I'm listening
Code
import librosa
import io
import torch

speech = librosa.load(io.BytesIO(audio.get_wav_data()), sr=16_000)[0]

input_values = tokenizer(speech, sampling_rate=16_000, return_tensors="pt", padding="longest").input_values
logits = model.generate(input_values)
predicted_ids = torch.argmax(logits, dim=-1)
transcription = tokenizer.batch_decode(predicted_ids)

transcription
Input length of input_ids is 28979, but ``max_length`` is set to 20.This can lead to unexpected behavior. You should consider increasing ``config.max_length`` or ``max_length``.
['<unk>']

I don’t really get what the problem is. 20 samples (whatever the word is) at 16_000 per second would be a tiny fraction of a second. There is no way that could form a coherent word or word part.

Anyway the Speech2Text system works. I may investigate Wav2Vec2 further in future as it’s a more recent model and likely better quality.

The last part is to speak the response so that my son can actually have a conversation.

Code
import pyttsx3

pyttsx3.speak("Hello")

My son said that the house is a lady so I am going to try to alter the voice.

Code
import pyttsx3
engine = pyttsx3.init()
[voice.id for voice in engine.getProperty("voices")]
['afrikaans',
 'aragonese',
 'bulgarian',
 'bosnian',
 'catalan',
 'czech',
 'welsh',
 'danish',
 'german',
 'greek',
 'default',
 'english',
 'en-scottish',
 'english-north',
 'english_rp',
 'english_wmids',
 'english-us',
 'en-westindies',
 'esperanto',
 'spanish',
 'spanish-latin-am',
 'estonian',
 'persian',
 'persian-pinglish',
 'finnish',
 'french-Belgium',
 'french',
 'irish-gaeilge',
 'greek-ancient',
 'hindi',
 'croatian',
 'hungarian',
 'armenian',
 'armenian-west',
 'indonesian',
 'icelandic',
 'italian',
 'lojban',
 'georgian',
 'kannada',
 'kurdish',
 'latin',
 'lingua_franca_nova',
 'lithuanian',
 'latvian',
 'macedonian',
 'malayalam',
 'malay',
 'nepali',
 'dutch',
 'norwegian',
 'punjabi',
 'polish',
 'brazil',
 'portugal',
 'romanian',
 'russian',
 'slovak',
 'albanian',
 'serbian',
 'swedish',
 'swahili-test',
 'tamil',
 'turkish',
 'vietnam',
 'vietnam_hue',
 'vietnam_sgn',
 'Mandarin',
 'cantonese']

So there is no female voice included in the core pyttsx3 package. Apparently this stackoverflow answer has a solution though which is to manipulate the voice slightly.

Code
engine.setProperty('voice', 'english_rp+f3')
engine.say("Hello")
engine.runAndWait()
Code
type(engine)
pyttsx3.engine.Engine
Code
def full_conversation(
    conversation: Conversation,
    processor: Speech2TextProcessor,
    model: Speech2TextForConditionalGeneration,
    engine: pyttsx3.engine.Engine,
) -> None:
    while True:
        text = listen_s2t(processor=processor, model=model)
        if not text:
            print("say quit to end")
            continue
        text = text[0].casefold().strip()
        if text == "quit":
            print("ending")
            return
        if not text:
            continue

        print(f">>> {text}")
        response, conversation = conversation.respond(text)

        print(response)
        engine.say(response)
        engine.runAndWait()
Code
import torch
from transformers import Speech2TextProcessor, Speech2TextForConditionalGeneration
import pyttsx3

model = Speech2TextForConditionalGeneration.from_pretrained("facebook/s2t-small-librispeech-asr")
processor = Speech2TextProcessor.from_pretrained("facebook/s2t-small-librispeech-asr")

engine = pyttsx3.init()
engine.setProperty('voice', 'english_rp+f3')

full_conversation(
    conversation=conversation,
    processor=processor,
    model=model,
    engine=engine,
)
I'm listening
>>> how i
how are you doing today?
I'm listening
>>> that why did he talk them goose i only wanted ones to be told into them i know what it wants
i can hear you. i can smell them. i am a house
I'm listening
>>> and what is drank yet that ain't that sounded in stanny it's quite close talking isn't it i wouldn't slowly down the talking plays because men all be just flying with that mother
i love my son, he makes me feel safe. i'm a house
I'm listening
>>> whew curled him down quite
he's so smart, it makes me feel safe
I'm listening
ending

That worked! And my son quite liked it. He found the voice a little fast though.

I should point out that the transcription of what he said is terrible. I’ll have to try to fix that somehow.