Skip to content
Open
Comment thread
mcdgavin marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 stt_v2.py still uses the old _api_key pattern and was not updated

The file livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/stt_v2.py still uses the old self._api_key / Authorization: Token {self._api_key} pattern (lines 119, 190, 288, 487). It was not modified in this PR. If Cloudflare AI Gateway support is expected to work with stt_v2 as well, it would need parallel changes. This may be intentional if stt_v2 is legacy/deprecated.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@mcdgavin mcdgavin Jun 17, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional. STTv2 is the Deepgram v2 / Flux class (different protocol, /v2/listen). This PR scopes Cloudflare support to the stable STT (nova-3) and TTS (Aura).

A STTv2.with_cloudflare for Flux is a sensible follow-up rather than part of this change.

Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def __init__(
numerals: bool = False,
mip_opt_out: bool = False,
vad_events: bool = True,
extra_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN,
# deprecated
keyterms: NotGivenOr[list[str]] = NOT_GIVEN,
) -> None:
Expand Down Expand Up @@ -135,9 +136,13 @@ def __init__(
mip_opt_out: Whether to take part in the model improvement program
vad_events: Whether to enable VAD (Voice Activity Detection) events.
When enabled, SpeechStarted events are sent when speech is detected. Defaults to True.
extra_headers: Additional HTTP headers sent on every connection, merged over the
default ``Authorization: Token`` header. Useful for self-hosted Deepgram or
gateways with custom auth. When no API key is set, these become the sole auth.

Raises:
ValueError: If no API key is provided or found in environment variables.
ValueError: If no API key is provided or found in environment variables
(unless ``extra_headers`` is supplied).

Note:
The api_key must be set either through the constructor argument or by setting
Expand All @@ -154,12 +159,17 @@ def __init__(
)

deepgram_api_key = api_key if is_given(api_key) else os.environ.get("DEEPGRAM_API_KEY")
if not deepgram_api_key:
extra = dict(extra_headers) if is_given(extra_headers) else {}
if not deepgram_api_key and not extra:
raise ValueError(
"Deepgram API key is required, either as argument or set"
" DEEPGRAM_API_KEY environment variable"
)
self._api_key = deepgram_api_key
# default Token auth only when a key is present; extra_headers merge on top (and are
# the sole auth when no key is set, e.g. the Cloudflare AI Gateway)
self._connect_headers = (
{"Authorization": f"Token {deepgram_api_key}"} if deepgram_api_key else {}
) | extra

model = _validate_model(model, language)
if is_given(keyterms):
Expand Down Expand Up @@ -203,6 +213,70 @@ def model(self) -> str:
def provider(self) -> str:
return "Deepgram"

@staticmethod
def with_cloudflare(
*,
model: DeepgramModels | str = "nova-3",
account_id: str | None = None,
gateway_id: str = "default",
cf_aig_token: str | None = None,
base_url: str | None = None,
language: DeepgramLanguages | str = "en-US",
interim_results: bool = True,
sample_rate: int = 16000,
http_session: aiohttp.ClientSession | None = None,
) -> STT:
"""Create a Deepgram STT routed through the Cloudflare AI Gateway.

Connects to the gateway's ``workers-ai`` WebSocket, which proxies Deepgram's
streaming protocol. Auth uses the ``cf-aig-authorization`` header; no Deepgram
API key is required.

Args:
model: Deepgram model name (e.g. ``"nova-3"``); the ``@cf/deepgram/`` prefix is
added automatically. A value already prefixed with ``@cf/`` is used as-is.
account_id: Cloudflare account ID. Falls back to ``CLOUDFLARE_ACCOUNT_ID``.
Required unless ``base_url`` is given.
gateway_id: Gateway name. Defaults to ``"default"``.
cf_aig_token: Gateway token for ``cf-aig-authorization``. Falls back to
``CLOUDFLARE_AI_GATEWAY_TOKEN``.
base_url: Full gateway endpoint; overrides ``account_id`` / ``gateway_id``.
language: Recognition language, forwarded to ``STT``. Defaults to ``"en-US"``.
interim_results: Whether to emit interim transcripts, forwarded to ``STT``.
sample_rate: Audio sample rate in Hz, forwarded to ``STT``.
http_session: Optional aiohttp session, forwarded to ``STT``.
"""
cf_aig_token = cf_aig_token or os.environ.get("CLOUDFLARE_AI_GATEWAY_TOKEN")
if not cf_aig_token:
raise ValueError(
"Cloudflare AI Gateway token is required, either as argument or set"
" CLOUDFLARE_AI_GATEWAY_TOKEN environment variable"
)
if base_url is None:
account_id = account_id or os.environ.get("CLOUDFLARE_ACCOUNT_ID")
if not account_id:
raise ValueError(
"Cloudflare account_id is required, either as argument or set"
" CLOUDFLARE_ACCOUNT_ID environment variable (or pass base_url directly)"
)
base_url = f"https://gateway.ai.cloudflare.com/v1/{account_id}/{gateway_id}/workers-ai"

if not model.startswith("@cf/"):
model = f"@cf/deepgram/{model}"
Comment thread
mcdgavin marked this conversation as resolved.

return STT(
model=model,
language=language,
interim_results=interim_results,
sample_rate=sample_rate,
base_url=base_url,
http_session=http_session,
# explicit empty key opts out of the DEEPGRAM_API_KEY env fallback, so the gateway
# only ever receives cf-aig-authorization (no stray Authorization: Token header)
api_key="",
extra_headers={"cf-aig-authorization": cf_aig_token},
)
Comment thread
mcdgavin marked this conversation as resolved.

def _ensure_session(self) -> aiohttp.ClientSession:
if not self._session:
self._session = utils.http_context.http_session()
Expand Down Expand Up @@ -243,7 +317,7 @@ async def _recognize_impl(
url=_to_deepgram_url(recognize_config, self._opts.endpoint_url, websocket=False),
data=rtc.combine_audio_frames(buffer).to_wav_bytes(),
headers={
"Authorization": f"Token {self._api_key}",
**self._connect_headers,
"Accept": "application/json",
"Content-Type": "audio/wav",
},
Expand Down Expand Up @@ -280,9 +354,9 @@ def stream(
stt=self,
conn_options=conn_options,
opts=config,
api_key=self._api_key,
http_session=self._ensure_session(),
base_url=self._opts.endpoint_url,
connect_headers=self._connect_headers,
)
self._streams.add(stream)
return stream
Expand Down Expand Up @@ -403,9 +477,9 @@ def __init__(
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
http_session: aiohttp.ClientSession,
base_url: str,
connect_headers: dict[str, str],
) -> None:
if opts.detect_language or opts.language is None:
raise ValueError(
Expand All @@ -415,7 +489,7 @@ def __init__(

super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)
self._opts = opts
self._api_key = api_key
self._connect_headers = dict(connect_headers)
self._session = http_session
self._opts.endpoint_url = base_url
self._speaking = False
Expand Down Expand Up @@ -671,7 +745,7 @@ async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
ws = await asyncio.wait_for(
self._session.ws_connect(
_to_deepgram_url(live_config, base_url=self._opts.endpoint_url, websocket=True),
headers={"Authorization": f"Token {self._api_key}"},
headers=self._connect_headers,
),
self._conn_options.timeout,
)
Expand Down Expand Up @@ -841,6 +915,12 @@ def prerecorded_transcription_to_speech_event(
)


def _bare_model(model: DeepgramModels | str) -> str:
# Cloudflare AI Gateway routes Deepgram models as "@cf/deepgram/<model>"; strip the routing
# prefix so model-name checks see the underlying Deepgram model (e.g. "nova-3").
return model.removeprefix("@cf/deepgram/")


def _validate_model(
model: DeepgramModels | str, language: NotGivenOr[DeepgramLanguages | str]
) -> DeepgramModels | str:
Expand All @@ -855,11 +935,14 @@ def _validate_model(
"nova-2-drivethru",
"nova-2-automotive",
}
if is_given(language) and language not in ("en-US", "en") and model in en_only_models:
bare = _bare_model(model)
if is_given(language) and language not in ("en-US", "en") and bare in en_only_models:
logger.warning(
f"{model} does not support language {language}, falling back to nova-2-general"
)
return "nova-2-general"
# preserve any routing prefix (e.g. "@cf/deepgram/") on the fallback model
prefix = model[: len(model) - len(bare)]
return f"{prefix}nova-2-general"
return model


Expand All @@ -880,13 +963,13 @@ def _validate_keyterm(
Validating keyterm and keywords for model compatibility.
See: https://developers.deepgram.com/docs/keyterm and https://developers.deepgram.com/docs/keywords
"""
if model.startswith("nova-3") and is_given(keywords):
if _bare_model(model).startswith("nova-3") and is_given(keywords):
raise ValueError(
"Keywords is only available for use with Nova-2, Nova-1, Enhanced, and "
"Base speech to text models. For Nova-3, use Keyterm Prompting."
)

if is_given(keyterm) and (not model.startswith("nova-3")):
if is_given(keyterm) and (not _bare_model(model).startswith("nova-3")):
raise ValueError(
"Keyterm Prompting is only available for transcription using the Nova-3 Model. "
"To boost recognition of keywords using another model, use the Keywords feature."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class _TTSOptions:
sample_rate: int
word_tokenizer: tokenize.WordTokenizer
base_url: str
api_key: str
mip_opt_out: bool = False


Expand All @@ -55,6 +54,7 @@ def __init__(
word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
mip_opt_out: bool = False,
extra_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN,
) -> None:
"""
Create a new instance of Deepgram TTS.
Expand All @@ -67,6 +67,9 @@ def __init__(
base_url (str): Base URL for Deepgram TTS API. Defaults to "https://api.deepgram.com/v1/speak"
word_tokenizer (tokenize.WordTokenizer): Tokenizer for processing text. Defaults to basic WordTokenizer.
http_session (aiohttp.ClientSession): Optional aiohttp session to use for requests.
extra_headers: Additional HTTP headers sent on every connection, merged over the
default ``Authorization: Token`` header. When no API key is set, these become
the sole auth (e.g. the Cloudflare AI Gateway).

""" # noqa: E501
super().__init__(
Expand All @@ -75,9 +78,16 @@ def __init__(
num_channels=NUM_CHANNELS,
)

api_key = api_key or os.environ.get("DEEPGRAM_API_KEY")
if not api_key:
# only fall back to the env var when no api_key was passed at all; an explicit "" means
# "no Deepgram key" (e.g. with_cloudflare), so it must not pick up DEEPGRAM_API_KEY
if api_key is None:
api_key = os.environ.get("DEEPGRAM_API_KEY")
extra = dict(extra_headers) if is_given(extra_headers) else {}
if not api_key and not extra:
raise ValueError("Deepgram API key required. Set DEEPGRAM_API_KEY or provide api_key.")
# default Token auth only when a key is present; extra_headers merge on top (and are
# the sole auth when no key is set, e.g. the Cloudflare AI Gateway)
self._connect_headers = ({"Authorization": f"Token {api_key}"} if api_key else {}) | extra

if not is_given(word_tokenizer):
word_tokenizer = tokenize.basic.WordTokenizer(ignore_punctuation=False)
Expand All @@ -88,7 +98,6 @@ def __init__(
sample_rate=sample_rate,
word_tokenizer=word_tokenizer,
base_url=base_url,
api_key=api_key,
mip_opt_out=mip_opt_out,
)
self._session = http_session
Expand All @@ -109,6 +118,70 @@ def model(self) -> str:
def provider(self) -> str:
return "Deepgram"

@staticmethod
def with_cloudflare(
*,
model: str = "aura-1",
account_id: str | None = None,
gateway_id: str = "default",
cf_aig_token: str | None = None,
base_url: str | None = None,
encoding: str = "linear16",
sample_rate: int = 24000,
word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
) -> TTS:
"""Create a Deepgram TTS routed through the Cloudflare AI Gateway.

Connects to the gateway's ``workers-ai`` WebSocket, which proxies Deepgram's
streaming protocol. Auth uses the ``cf-aig-authorization`` header; no Deepgram
API key is required.

Args:
model: Deepgram model name (e.g. ``"aura-1"``); the ``@cf/deepgram/`` prefix is
added automatically. A value already prefixed with ``@cf/`` is used as-is.
account_id: Cloudflare account ID. Falls back to ``CLOUDFLARE_ACCOUNT_ID``.
Required unless ``base_url`` is given.
gateway_id: Gateway name. Defaults to ``"default"``.
cf_aig_token: Gateway token for ``cf-aig-authorization``. Falls back to
``CLOUDFLARE_AI_GATEWAY_TOKEN``.
base_url: Full gateway endpoint; overrides ``account_id`` / ``gateway_id``.
encoding: Audio encoding, forwarded to ``TTS``. Defaults to ``"linear16"``.
sample_rate: Audio sample rate in Hz, forwarded to ``TTS``.
word_tokenizer: Optional tokenizer, forwarded to ``TTS``.
http_session: Optional aiohttp session, forwarded to ``TTS``.
"""
cf_aig_token = cf_aig_token or os.environ.get("CLOUDFLARE_AI_GATEWAY_TOKEN")
if not cf_aig_token:
raise ValueError(
"Cloudflare AI Gateway token is required, either as argument or set"
" CLOUDFLARE_AI_GATEWAY_TOKEN environment variable"
)
if base_url is None:
account_id = account_id or os.environ.get("CLOUDFLARE_ACCOUNT_ID")
if not account_id:
raise ValueError(
"Cloudflare account_id is required, either as argument or set"
" CLOUDFLARE_ACCOUNT_ID environment variable (or pass base_url directly)"
)
base_url = f"https://gateway.ai.cloudflare.com/v1/{account_id}/{gateway_id}/workers-ai"

if not model.startswith("@cf/"):
model = f"@cf/deepgram/{model}"

return TTS(
model=model,
encoding=encoding,
sample_rate=sample_rate,
base_url=base_url,
word_tokenizer=word_tokenizer,
http_session=http_session,
# explicit empty key opts out of the DEEPGRAM_API_KEY env fallback, so the gateway
# only ever receives cf-aig-authorization (no stray Authorization: Token header)
api_key="",
extra_headers={"cf-aig-authorization": cf_aig_token},
)

async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
session = self._ensure_session()
config = {
Expand All @@ -120,7 +193,7 @@ async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
ws = await asyncio.wait_for(
session.ws_connect(
_to_deepgram_url(config, self._opts.base_url, websocket=True),
headers={"Authorization": f"Token {self._opts.api_key}"},
headers=self._connect_headers,
),
timeout,
)
Expand Down Expand Up @@ -214,7 +287,7 @@ async def _run(self, output_emitter: tts.AudioEmitter) -> None:
websocket=False,
),
headers={
"Authorization": f"Token {self._opts.api_key}",
**self._tts._connect_headers,
"Content-Type": "application/json",
},
json={"text": self._input_text},
Expand Down
Loading