Home >Backend Development >Python Tutorial >Step-by-Step Guide: Interruption Management with OpenAI Realtime API
This article introduces how to implement a conversation interruption feature using the OpenAI Realtime API.
The details of the implementation are available in the GitHub repository.
This implementation is based on the code from Azure-Samples/aoai-realtime-audio-sdk. A detailed explanation of the code can be found in this article.
In this implementation, we use the microphone and speaker of the local PC for audio input and output.
The audio captured from the microphone is sent to the OpenAI Realtime API server for processing.
To capture audio from the local PC's microphone, we use the stream functionality of the pyaudio library. The following code sets up a stream for audio input:
p = pyaudio.PyAudio() input_default_input_index = p.get_default_input_device_info()['index'] input_stream = p.open( format=STREAM_FORMAT, channels=INPUT_CHANNELS, rate=INPUT_SAMPLE_RATE, input=True, output=False, frames_per_buffer=INPUT_CHUNK_SIZE, input_device_index=input_default_input_index, start=False, ) input_stream.start_stream()
Audio capture is performed using threading.Thread for parallel processing. The audio data obtained from the microphone is encoded into base64 format and stored in a queue.
def listen_audio(input_stream: pyaudio.Stream): while True: audio_data = input_stream.read(INPUT_CHUNK_SIZE, exception_on_overflow=False) if audio_data is None: continue base64_audio = base64.b64encode(audio_data).decode("utf-8") audio_input_queue.put(base64_audio) threading.Thread(target=listen_audio, args=(input_stream,), daemon=True).start()
The base64 strings stored in the queue are sent to the OpenAI Realtime API server as "input_audio_buffer.append" messages.
async def send_audio(client: RTLowLevelClient): while not client.closed: base64_audio = await asyncio.get_event_loop().run_in_executor(None, audio_input_queue.get) await client.send(InputAudioBufferAppendMessage(audio=base64_audio)) await asyncio.sleep(0)
Audio playback is performed through the local PC's speakers using the audio data received from the OpenAI Realtime API server.
The audio data is received as "response.audio.delta" messages from the server. Since the received data is encoded in base64, it is decoded, stored in a queue, and converted into a playable format.
async def receive_messages(client: RTLowLevelClient): while True: message = await client.recv() if message is None: continue match message.type: case "response.audio.delta": audio_data = base64.b64decode(message.delta) for i in range(0, len(audio_data), OUTPUT_CHUNK_SIZE): audio_output_queue.put(audio_data[i:i+OUTPUT_CHUNK_SIZE]) await asyncio.sleep(0)
The data stored in the queue is played through the local PC's speakers using parallel processing. This playback process uses threading.Thread to ensure that the audio data is played smoothly in real-time.
def play_audio(output_stream: pyaudio.Stream): while True: audio_data = audio_output_queue.get() output_stream.write(audio_data) p = pyaudio.PyAudio() output_default_output_index = p.get_default_output_device_info()['index'] output_stream = p.open( format=STREAM_FORMAT, channels=OUTPUT_CHANNELS, rate=OUTPUT_SAMPLE_RATE, input=False, output=True, frames_per_buffer=OUTPUT_CHUNK_SIZE, output_device_index=output_default_output_index, start=False, ) output_stream.start_stream() threading.Thread(target=play_audio, args=(output_stream,), daemon=True).start()
The OpenAI Realtime API automatically detects conversation segments on the server side. This allows for the detection of new speech and the creation of real-time responses even while the AI is responding.
However, when playing audio on a local PC, it is important to stop the playback of ongoing audio to achieve a natural interruption of the conversation. This point requires attention. The detection of user speech is received from the OpenAI Realtime API server as an "input_audio_buffer.speech_started" message. When this message is received, the playback is stopped by clearing the audio data stored in the queue.
async def receive_messages(client: RTLowLevelClient): while True: message = await client.recv() # print(f"{message=}") if message is None: continue match message.type: case "input_audio_buffer.speech_started": print("Input Audio Buffer Speech Started Message") print(f" Item Id: {message.item_id}") print(f" Audio Start [ms]: {message.audio_start_ms}") while not audio_output_queue.empty(): audio_output_queue.get()
As for audio output, no modifications are needed; it operates as described in the previously explained code.
This time, I introduced a Python implementation for conversation interruption.
I hope this article proves helpful to anyone who faces challenges with stopping AI speech effectively, as I did.
Additionally, the definition and configuration of stream instances can affect the quality of audio playback. If you experience interruptions in audio playback, reviewing these settings might help improve the situation.
Thank you for reading until the end.
The above is the detailed content of Step-by-Step Guide: Interruption Management with OpenAI Realtime API. For more information, please follow other related articles on the PHP Chinese website!