const isBuffer = require("is-buffer"); |
const { Buffer } = require("buffer"); |
const {deepgram_key} = require("../config"); |
const Session = require("../utils/session.js"); |
const { cleanupConnection } = require("./cleangRPCconnections.js"); |
const { getgRPCConnection } = require("./makegRPCconnection.js"); |
const { updateChathistory } = require("../providers/updateChathistory.js"); |
const { createClient, LiveTranscriptionEvents } = require("@deepgram/sdk"); |
const deepgram = createClient(deepgram_key); |
const audio_stream = async (wss, req) => { |
try { |
const session = new Session(); |
wss.send(JSON.stringify({ type: "initial", msg: "connected" })); |
const connection = deepgram.listen.live({ |
punctuate: true, |
interim_results: true, |
speech_final: true, |
encoding: "linear16", |
sample_rate: 16000, |
model: "nova-2", |
speech_final: true, |
version: "latest", |
}); |
const callMLServer = async (text) => { |
try { |
session.call.write({ text: text }); |
} catch (error) { |
console.error("Error in calling ml server : ", error); |
} |
} |
connection.on(LiveTranscriptionEvents.Open, () => { |
console.log(LiveTranscriptionEvents.Open); |
connection.on(LiveTranscriptionEvents.Close, () => { |
console.log("Connection closed."); |
}); |
connection.on(LiveTranscriptionEvents.Transcript, (data) => { |
const text = data?.channel?.alternatives[0]?.transcript; |
if (data.is_final && data.speech_final && text) { |
console.log("Response : ", text); |
callMLServer(text); |
} |
}); |
connection.on(LiveTranscriptionEvents.Metadata, (data) => { |
console.log(data); |
}); |
connection.on(LiveTranscriptionEvents.Error, (err) => { |
console.error(err); |
}); |
}); |
wss.on("message", async (message) => { |
try { |
if (isBuffer(message) && session.call) { |
try { |
const audioChunk = { |
audio_data: message, |
}; |
try { |
if (connection && connection.getReadyState() == 1) { |
connection.send(message); |
} |
} catch (error) { |
console.log("Error sending buffer to deepgram : ", error); |
} |
} catch (err) { |
console.error("Error writing to stream: ", err); |
} |
} |
if (typeof message === "string") { |
try { |
const data = JSON.parse(message); |
const { type, msg } = data; |
switch (type) { |
case "start": |
session.starttime = Date.now(); |
session.chathistory = []; |
session.chathistorybackup = []; |
console.log("Making Connection with gRPC..."); |
try { |
console.time("grpcconnection"); |
session.call = await getgRPCConnection(session); |
console.timeEnd("grpcconnection"); |
const state = session.channel.getConnectivityState(false); |
console.log(`Client : ${state}`); |
session.saved = false; |
wss.send(JSON.stringify({ type: "ready", msg: "connected" })); |
console.log("Connected to gRPC."); |
const { |
sessionId, |
} = JSON.parse(msg); |
const metadata = { |
metadata: { |
session_id: sessionId, |
}, |
}; |
if (session.call) { |
console.log("Sending metadata.") |
session.call.write(metadata); |
} |
} catch (err) { |
await cleanupConnection(session); |
console.error("Error in making gRPC Connection. : ", err); |
} |
session.call.on("data", (response) => { |
console.log("Data : ", response); |
const {session_id , sequence_id , transcript , buffer} = response; |
const metadata = JSON.stringify({ |
session_id: session_id, |
sequence_id: sequence_id, |
transcript: transcript, |
}); |
if (sequence_id === "-2") { |
session.latency = Date.now(); |
wss.send(JSON.stringify({ type: "clear", msg: "clear" })); |
session.chathistory = [...session.chathistorybackup]; |
wss.send( |
JSON.stringify({ |
type: "chathistory", |
msg: session.chathistorybackup, |
}) |
); |
const wavBuffer = Buffer.concat([ |
Buffer.from(metadata), |
Buffer.from([0]), |
buffer, |
]); |
const base64buffer = wavBuffer.toString("base64"); |
wss.send( |
JSON.stringify({ type: "media", msg: base64buffer }) |
); |
session.chathistory.push({ |
speaker: "USER", |
content: transcript, |
}); |
wss.send( |
JSON.stringify({ |
type: "chathistory", |
msg: session.chathistory, |
}) |
); |
session.chathistorybackup.push({ |
speaker: "USER", |
content: transcript, |
}); |
return; |
} |
if (sequence_id === "0") { |
wss.send(JSON.stringify({ type: "pause", msg: "pause" })); |
session.cansend = false; |
return; |
} |
if (sequence_id === "-1") { |
wss.send( |
JSON.stringify({ type: "continue", msg: "continue" }) |
); |
return; |
} |
if (sequence_id === "1") { |
const latency = Date.now() - session.latency; |
console.log("First Response Latency: ", latency, "ms"); |
session.latency = 0; |
session.cansend = true; |
} |
if (!buffer) { |
return; |
} |
if (!session.cansend && sequence_id !== "0") { |
return; |
} |
const wavBuffer = Buffer.concat([ |
Buffer.from(metadata), |
Buffer.from([0]), |
buffer, |
]); |
const base64buffer = wavBuffer.toString("base64"); |
wss.send( |
JSON.stringify({ type: "media", msg: base64buffer }) |
); |
updateChathistory(transcript, false, session); |
wss.send( |
JSON.stringify({ |
type: "chathistory", |
msg: session.chathistory, |
}) |
); |
}); |
session.call.on("end", async () => { |
console.log("Ended"); |
await cleanupConnection(session); |
try { |
wss.send(JSON.stringify({ type: "end", msg: "end" })); |
} catch (err) { } |
console.log("Stream ended"); |
}); |
session.call.on("error", async (error) => { |
console.error(`Stream error: ${error}`); |
try { |
wss.send(JSON.stringify({ type: "end", msg: "end" })); |
} catch (err) { } |
await cleanupConnection(session); |
}); |
break; |
case "status": |
const { session_id, sequence_id, transcript } = msg; |
const status = { |
status: { |
transcript : transcript, |
played_seq: sequence_id, |
interrupt_seq: sequence_id, |
}, |
}; |
if (session.call) { |
session.call.write(status); |
} |
updateChathistory(transcript, true, session); |
break; |
case "stop": |
console.log("Client Stoped the stream."); |
await cleanupConnection(session); |
break; |
default: |
console.log("Type not handled."); |
} |
} catch (err) { |
console.log(`Not a valid json : ${err}`); |
} |
} |
} catch (err) { |
console.error(`Error in wss.onmessage : ${err}`); |
} |
}); |
wss.on("close", async () => { |
await cleanupConnection(session); |
console.log("WebSocket connection closed."); |
}); |
wss.on("error", async (err) => { |
console.error(`WebSocket error: ${err}`); |
await cleanupConnection(session); |
}); |
} catch (err) { |
try { |
console.log(err) |
wss.send(JSON.stringify({ type: "end", msg: "end" })); |
} catch (err) { } |
} |
}; |
module.exports = { audio_stream }; |