-
Notifications
You must be signed in to change notification settings - Fork 24
Python SDK Migration Guide
A guide for developers migrating to the Fern-generated Python SDK (version 0.7.0
and above).
Hume’s newest Python SDK refactors the core client architecture, separating functionality into distinct modules for specific APIs (e.g., the Expression Measurement API and the Empathic Voice Interface API).
Version 0.7.0
introduces the following features:
- Explicit types
- Better support for asynchronous operations
- More granular client configuration
- Continued support for legacy SDK implementations
- Support for Python version
3.12
with Expression Measurement API namespace methods
This guide will help you adapt your code to the new SDK structure with practical examples and explanations of the key differences.
Below is a matrix showing the compatibility of the Hume Python SDK across various Python versions and operating systems.
Python Version | Operating System | |
---|---|---|
Empathic Voice Interface |
3.9 , 3.10 , 3.11
|
macOS, Linux |
Expression Measurement |
3.9 , 3.10 , 3.11 , 3.12
|
macOS, Linux, Windows |
For the Empathic Voice Interface, Python versions 3.9
through 3.11
are supported on macOS and Linux.
For Expression Measurement, Python versions 3.9
through 3.12
are supported on macOS, Linux, and Windows.
The legacy SDK is entirely contained within the new SDK’s src/hume/legacy
folder in order to ensure smooth transition to the new features. To preserve your code’s current functionality, follow these steps:
- Run
pip install “hume[legacy]"
to install the legacy package extra.- If you are using EVI’s microphone utilities, run
pip install “hume[microphone]”
to install the microphone extra.
- If you are using EVI’s microphone utilities, run
- Change your import statements to
from hume.legacy
instead offrom hume
.
from hume.legacy import HumeVoiceClient, VoiceConfig
client = HumeVoiceClient("<YOUR_API_KEY>")
config = client.empathic_voice.configs.get_config_version(
id = "id",
version = 1
)
Instead of using HumeBatchClient
, HumeStreamClient
, or HumeVoiceClient
, now use AsyncHumeClient
- the new asynchronous base client.
This client is authenticated with your Hume API key and provides access to the Expression Measurement API and Empathic Voice Interface API as namespaces. If you're not using async, the synchronous HumeClient
is available, but we recommend defaulting to AsyncHumeClient
for most use cases.
Each API is namespaced accordingly:
from hume.client import AsyncHumeClient
# base synchronous client
client = AsyncHumeClient(api_key = <HUME_API_KEY>)
# Expression Measurement (Batch)
client.expression_measurement.batch
# Expression Measurement (Streaming)
client.expression_measurement.streaming
# Empathic Voice Interface
client.empathic_voice.
Importantly, invoking asynchronous functionality (e.g., instantiating an EVI WebSocket connection) when using a synchronous client (i.e., HumeClient
) is disallowed behavior and causes an error. On the other hand, invoking synchronous behavior from an asynchronous client is supported, however each method must be awaited.
from hume.client import HumeClient, AsyncHumeClient
# INVALID: using a synchronous client for asynchronous behavior
client = HumeClient(api_key = <HUME_API_KEY>)
# Using the asynchronous connect method with a sync client will cause an error
async with client.empathic_voice.chat.connect() as socket:
# ...
# VALID: using an asynchronous client for asynchronous behavior
async_client = AsyncHumeClient(api_key = <HUME_API_KEY>)
# Using the async connect method with an async client will work properly
async with async_client.empathic_voice.chat.connect() as socket:
# ...
# VALID: using an asynchronous client for synchronous behavior
async_client = AsyncHumeClient(api_key = <HUME_API_KEY>)
# Using the configs.list_configs() method with an async client
print(await client.empathic_voice.configs.list_configs())
First, identify what operations you would like to perform.
- For tasks such as creating a config, listing the tools you have available, and more, we recommend using the Hume Portal because of its comprehensive user interface.
- For chatting with EVI (i.e., accessing the
chat
endpoint), it is required to use the asynchronous Hume client. - If you need to interact with configurations, tools, or other items programmatically, it is recommended to use the asynchronous Hume client - but possible to use the synchronous client if needed.
Then, authenticate the client and proceed with your desired functionality.
The EVI WebSocket connection is now configurable using an explicit type: ChatConnectOptions
. This object must be passed into the method used to initialize the connection.
from hume.client import HumeClient
# authenticate the synchronous client
client = HumeClient(api_key=<HUME_API_KEY>)
# list your configs
client.empathic_voice.configs.list_configs()
It is now possible to fully manage the WebSocket events with your EVI integration, meaning you can define custom behavior when the WebSocket is opened, closed, receives a message, or receives an error. Use the new asynchronous client’s connect_with_callbacks
function to do so, and reference the SubscribeEvent
message type within your on_message
callback function.
from hume.client import AsyncHumeClient
from hume.empathic_voice.chat.socket_client import ChatConnectOptions
async def main() -> None:
# Initialize the asynchronous client, authenticating with your API key
client = AsyncHumeClient(api_key=<HUME_API_KEY>)
# Define options for the WebSocket connection, such as an EVI config id and a secret key for token authentication
options = ChatConnectOptions(config_id=<HUME_CONFIG_ID>, secret_key=<HUME_SECRET_KEY>)
# Open the WebSocket connection with the configuration options and the interface's handlers
async with client.empathic_voice.chat.connect_with_callbacks(
options=options,
on_open=<custom on_open function>,
on_message=<custom on_message function>,
on_close=<custom on_close function>,
on_error=<custom on_error function>
) as socket:
# ...
if __name__ == "__main__":
asyncio.run(main())
async def on_message(message: SubscribeEvent):
"""Callback function to handle a WebSocket message event.
Args:
data (SubscribeEvent): This represents any type of message that is received through the EVI WebSocket, formatted in JSON. See the full list of messages in the API Reference [here](https://dev.hume.ai/reference/empathic-voice-interface-evi/chat/chat#receive).
"""
# Create an empty dictionary to store expression inference scores
scores = {}
if message.type == "chat_metadata":
message_type = message.type.upper()
chat_id = message.chat_id
chat_group_id = message.chat_group_id
text = f"<{message_type}> Chat ID: {chat_id}, Chat Group ID: {chat_group_id}"
elif message.type in ["user_message", "assistant_message"]:
role = message.message.role.upper()
message_text = message.message.content
text = f"{role}: {message_text}"
if message.from_text is False:
scores = dict(message.models.prosody.scores)
elif message.type == "audio_output":
message_str: str = message.data
message_bytes = base64.b64decode(message_str.encode("utf-8"))
await self.byte_strs.put(message_bytes)
return
elif message.type == "error":
error_message: str = message.message
error_code: str = message.code
raise ApiError(f"Error ({error_code}): {error_message}") # ApiError is also an imported type
else:
message_type = message.type.upper()
text = f"<{message_type}>"
print(text)
from hume import HumeVoiceClient, MicrophoneInterface
import asyncio
async def main() -> None:
# Connect and authenticate with Hume
client = HumeVoiceClient(<HUME_API_KEY>)
# Start streaming EVI over your device's microphone and speakers
async with client.connect() as socket:
await MicrophoneInterface.start(socket)
if __name__ == "__main__":
asyncio.run(main())
Instantiate the asynchronous client, configure the job with a Models
object, and submit your media URLs for processing. Once submitted and the job is awaited to completion, predictions may be retrieved based on the job ID.
- The
await_complete()
method on a job has been removed; developers will need to implement a mechanism such as polling the job’s status to await the completion of the job. - The
download_predictions()
method on a job has also been removed; developers will need to implement an HTTP call to the API, parse the results, and export them to a file.
Prior to the update, when you started a job and passed in the job configuration, it would be the case that the start_inference_job
would accept the model configs as an array. Now, this is all contained within a typed models object.
Starting an inference job now involves defining configuration options using explicit types for each model. For example, a Face object corresponds to the model’s configuration options. Configurations are passed into a Models object, which in turn is passed into the start_inference_job method. Similar strict typing exists with other batch methods.
from hume import AsyncHumeClient
from hume.expression_measurement.batch import Face, Models
async def main():
# Initialize an authenticated client
client = AsyncHumeClient(api_key=<YOUR_API_KEY>)
# Define the URL(s) of the files you would like to analyze
job_urls = ["https://hume-tutorials.s3.amazonaws.com/faces.zip"]
# Create configurations for each model you would like to use (blank = default)
face_config = Face()
# Create a Models object
models_chosen = Models(face=face_config)
# Start an inference job and print the job_id
job_id = await client.expression_measurement.batch.start_inference_job(
urls=job_urls, models=models_chosen
)
# Await the completion of the inference job
await poll_for_completion(client, job_id, timeout=120)
# After the job is over, access its predictions
job_predictions = await client.expression_measurement.batch.get_job_predictions(
id=job_id
)
if __name__ == "__main__":
asyncio.run(main())
from hume import AsyncHumeClient
from hume.expression_measurement.batch import Face, Models
from hume.expression_measurement.batch.types import InferenceBaseRequest
async def main():
# Initialize an authenticated client
client = AsyncHumeClient(api_key=HUME_API_KEY)
# Define the filepath(s) of the file(s) you would like to analyze
local_filepaths = [open("faces.zip", mode="rb")]
# Create configurations for each model you would like to use (blank = default)
face_config = Face()
# Create a Models object
models_chosen = Models(face=face_config)
# Create a stringified object containing the configuration
stringified_configs = InferenceBaseRequest(models=models_chosen)
# Start an inference job and print the job_id
job_id = await client.expression_measurement.batch.start_inference_job_from_local_file(
json=stringified_configs, file=local_filepaths)
# Await the completion of the inference job
await poll_for_completion(client, job_id, timeout=120)
# After the job is over, access its predictions
job_predictions = await client.expression_measurement.batch.get_job_predictions(
id=job_id
)
if __name__ == "__main__":
asyncio.run(main())
Below is an example implementation of helper methods which incorporate polling the job’s status for completion with exponential backoff.
async def poll_for_completion(client: AsyncHumeClient, job_id, timeout=120):
"""
Polls for the completion of a job with a specified timeout (in seconds).
Uses asyncio.wait_for to enforce a maximum waiting time.
"""
try:
# Wait for the job to complete or until the timeout is reached
await asyncio.wait_for(poll_until_complete(client, job_id), timeout=timeout)
except asyncio.TimeoutError:
# Notify if the polling operation has timed out
print(f"Polling timed out after {timeout} seconds.")
async def poll_until_complete(client: AsyncHumeClient, job_id):
"""
Continuously polls the job status until it is completed, failed, or an unexpected status is encountered.
Implements exponential backoff to reduce the frequency of requests over time.
"""
delay = 1 # Start with a 1-second delay
while True:
# Wait for the specified delay before making the next status check
await asyncio.sleep(delay)
# Retrieve the current job details
job_details = await client.expression_measurement.batch.get_job_details(job_id)
status = job_details.state.status
if status == "COMPLETED":
# Job has completed successfully
print("\nJob completed successfully:")
break
elif status == "FAILED":
# Job has failed
print("\nJob failed:")
break
# Increase the delay exponentially, maxing out at 16 seconds
delay = min(delay * 2, 16)
The SDK may be used to download the job’s artifacts.
with open("artifacts.zip", "wb") as f:
async for new_bytes in client.expression_measurement.batch.get_job_artifacts(job_id):
f.write(new_bytes)
The API must be called directly to download the job’s predictions.
If using the code below, ensure you replace <YOUR_JOB_ID> and <YOUR_API_KEY> below with the respective correct values.
import requests
import json
# Define the URL and headers
url = "https://api.hume.ai/v0/batch/jobs/<YOUR_JOB_ID>/predictions"
headers = {
"X-Hume-Api-Key": "<YOUR_API_KEY>"
}
# Make the GET request
response = requests.get(url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# Write the JSON data to a file
with open("predictions.json", "w") as file:
json.dump(data, file, indent=2)
print("Response has been written to 'predictions.json'.")
else:
print(f"Failed to fetch data. Status code: {response.status_code}")
print(response.text)
from hume import HumeBatchClient
from hume.models.config import FaceConfig
from hume.models.config import ProsodyConfig
client = HumeBatchClient(<HUME_API_KEY>)
urls = ["https://hume-tutorials.s3.amazonaws.com/faces.zip"]
face_config = FaceConfig()
prosody_config = ProsodyConfig()
job = client.submit_job(urls, [face_config, prosody_config])
print(job)
print("Running...")
result = job.await_complete()
job_predictions = client.get_job_predictions(job_id = job.id)
First, retrieve the samples you will use. Then, instantiate the asynchronous client and configure the WebSocket with a Config
object containing the model(s) you would like to use. After you connect to the WebSocket, predictions may be retrieved.
Connecting to the WebSocket now uses the explicit type StreamConnectOptions
. These options accept the Config
object, which contains the configurations for the expression measurement models you wish to use. These configurations are unique to each model and need importing as well, such as with StreamLanguage
.
import asyncio
from hume import AsyncHumeClient
from hume.expression_measurement.stream import Config
from hume.expression_measurement.stream.socket_client import StreamConnectOptions
from hume.expression_measurement.stream.types import StreamLanguage
samples = [
"Mary had a little lamb,",
"Its fleece was white as snow."
"Everywhere the child went,"
"The little lamb was sure to go."
]
async def main():
client = AsyncHumeClient(api_key="<YOUR_API_KEY>")
model_config = Config(language=StreamLanguage())
stream_options = StreamConnectOptions(config=model_config)
async with client.expression_measurement.stream.connect(options=stream_options) as socket:
for sample in samples:
result = await socket.send_text(sample)
print(result.language.predictions[0]['emotions'])
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from hume import HumeStreamClient
from hume.models.config import LanguageConfig
samples = [
"Mary had a little lamb,",
"Its fleece was white as snow."
"Everywhere the child went,"
"The little lamb was sure to go."
]
async def main():
client = HumeStreamClient("<YOUR API KEY>")
config = LanguageConfig()
async with client.connect([config]) as socket:
for sample in samples:
result = await socket.send_text(sample)
emotions = result["language"]["predictions"][0]["emotions"]
print(emotions)
if __name__ == "__main__":
asyncio.run(main())