VocRT / backend /handle-realtime-tts /sttModelSocket.js
anuragsingh922's picture
Upload folder using huggingface_hub
d7dfeff verified
const isBuffer = require("is-buffer");
const { Buffer } = require("buffer");
const {deepgram_key} = require("../config");
const Session = require("../utils/session.js");
const { cleanupConnection } = require("./cleangRPCconnections.js");
const { getgRPCConnection } = require("./makegRPCconnection.js");
const { updateChathistory } = require("../providers/updateChathistory.js");
const { createClient, LiveTranscriptionEvents } = require("@deepgram/sdk");
const deepgram = createClient(deepgram_key);
const audio_stream = async (wss, req) => {
try {
const session = new Session();
wss.send(JSON.stringify({ type: "initial", msg: "connected" }));
const connection = deepgram.listen.live({
punctuate: true,
interim_results: true,
speech_final: true,
encoding: "linear16",
sample_rate: 16000,
model: "nova-2",
speech_final: true,
version: "latest",
});
const callMLServer = async (text) => {
try {
session.call.write({ text: text });
} catch (error) {
console.error("Error in calling ml server : ", error);
}
}
connection.on(LiveTranscriptionEvents.Open, () => {
console.log(LiveTranscriptionEvents.Open);
connection.on(LiveTranscriptionEvents.Close, () => {
console.log("Connection closed.");
});
connection.on(LiveTranscriptionEvents.Transcript, (data) => {
const text = data?.channel?.alternatives[0]?.transcript;
// console.log("Response : ", text);
if (data.is_final && data.speech_final && text) {
console.log("Response : ", text);
callMLServer(text);
}
});
connection.on(LiveTranscriptionEvents.Metadata, (data) => {
console.log(data);
});
connection.on(LiveTranscriptionEvents.Error, (err) => {
console.error(err);
});
});
wss.on("message", async (message) => {
try {
if (isBuffer(message) && session.call) {
try {
const audioChunk = {
audio_data: message,
};
try {
if (connection && connection.getReadyState() == 1) {
connection.send(message);
}
} catch (error) {
console.log("Error sending buffer to deepgram : ", error);
}
} catch (err) {
console.error("Error writing to stream: ", err);
}
}
// Handle message not of typeof buffer
if (typeof message === "string") {
try {
const data = JSON.parse(message);
const { type, msg } = data;
switch (type) {
case "start":
session.starttime = Date.now();
session.chathistory = [];
session.chathistorybackup = [];
console.log("Making Connection with gRPC...");
try {
console.time("grpcconnection");
session.call = await getgRPCConnection(session);
console.timeEnd("grpcconnection");
const state = session.channel.getConnectivityState(false);
console.log(`Client : ${state}`);
session.saved = false;
wss.send(JSON.stringify({ type: "ready", msg: "connected" }));
console.log("Connected to gRPC.");
const {
sessionId,
} = JSON.parse(msg);
const metadata = {
metadata: {
session_id: sessionId,
},
};
if (session.call) {
console.log("Sending metadata.")
session.call.write(metadata);
}
} catch (err) {
await cleanupConnection(session);
console.error("Error in making gRPC Connection. : ", err);
}
session.call.on("data", (response) => {
console.log("Data : ", response);
const {session_id , sequence_id , transcript , buffer} = response;
const metadata = JSON.stringify({
session_id: session_id,
sequence_id: sequence_id,
transcript: transcript,
});
if (sequence_id === "-2") {
session.latency = Date.now();
wss.send(JSON.stringify({ type: "clear", msg: "clear" }));
session.chathistory = [...session.chathistorybackup];
wss.send(
JSON.stringify({
type: "chathistory",
msg: session.chathistorybackup,
})
);
const wavBuffer = Buffer.concat([
Buffer.from(metadata),
Buffer.from([0]),
buffer,
]);
const base64buffer = wavBuffer.toString("base64");
wss.send(
JSON.stringify({ type: "media", msg: base64buffer })
);
session.chathistory.push({
speaker: "USER",
content: transcript,
});
wss.send(
JSON.stringify({
type: "chathistory",
msg: session.chathistory,
})
);
session.chathistorybackup.push({
speaker: "USER",
content: transcript,
});
return;
}
if (sequence_id === "0") {
wss.send(JSON.stringify({ type: "pause", msg: "pause" }));
session.cansend = false;
return;
}
if (sequence_id === "-1") {
wss.send(
JSON.stringify({ type: "continue", msg: "continue" })
);
return;
}
if (sequence_id === "1") {
const latency = Date.now() - session.latency;
console.log("First Response Latency: ", latency, "ms");
session.latency = 0;
// wss.send(JSON.stringify({ type: "clear", msg: "clear" }));
session.cansend = true;
}
if (!buffer) {
return;
}
if (!session.cansend && sequence_id !== "0") {
return;
}
// Combine header and PCM data into a single Buffer
const wavBuffer = Buffer.concat([
Buffer.from(metadata),
Buffer.from([0]),
buffer,
]);
const base64buffer = wavBuffer.toString("base64");
wss.send(
JSON.stringify({ type: "media", msg: base64buffer })
);
updateChathistory(transcript, false, session);
wss.send(
JSON.stringify({
type: "chathistory",
msg: session.chathistory,
})
);
});
session.call.on("end", async () => {
console.log("Ended");
await cleanupConnection(session);
try {
wss.send(JSON.stringify({ type: "end", msg: "end" }));
} catch (err) { }
console.log("Stream ended");
});
session.call.on("error", async (error) => {
console.error(`Stream error: ${error}`);
try {
wss.send(JSON.stringify({ type: "end", msg: "end" }));
} catch (err) { }
await cleanupConnection(session);
});
break;
case "status":
const { session_id, sequence_id, transcript } = msg;
const status = {
status: {
transcript : transcript,
played_seq: sequence_id,
interrupt_seq: sequence_id,
},
};
if (session.call) {
session.call.write(status);
}
updateChathistory(transcript, true, session);
break;
case "stop":
console.log("Client Stoped the stream.");
await cleanupConnection(session);
break;
default:
console.log("Type not handled.");
}
} catch (err) {
console.log(`Not a valid json : ${err}`);
}
}
} catch (err) {
console.error(`Error in wss.onmessage : ${err}`);
}
});
wss.on("close", async () => {
await cleanupConnection(session);
console.log("WebSocket connection closed.");
});
wss.on("error", async (err) => {
console.error(`WebSocket error: ${err}`);
await cleanupConnection(session);
});
} catch (err) {
try {
console.log(err)
wss.send(JSON.stringify({ type: "end", msg: "end" }));
} catch (err) { }
}
};
module.exports = { audio_stream };