|
const isBuffer = require("is-buffer"); |
|
const { Buffer } = require("buffer"); |
|
const {deepgram_key} = require("../config"); |
|
const Session = require("../utils/session.js"); |
|
const { cleanupConnection } = require("./cleangRPCconnections.js"); |
|
const { getgRPCConnection } = require("./makegRPCconnection.js"); |
|
const { updateChathistory } = require("../providers/updateChathistory.js"); |
|
const { createClient, LiveTranscriptionEvents } = require("@deepgram/sdk"); |
|
const deepgram = createClient(deepgram_key); |
|
|
|
const audio_stream = async (wss, req) => { |
|
try { |
|
const session = new Session(); |
|
|
|
wss.send(JSON.stringify({ type: "initial", msg: "connected" })); |
|
|
|
|
|
const connection = deepgram.listen.live({ |
|
punctuate: true, |
|
interim_results: true, |
|
speech_final: true, |
|
encoding: "linear16", |
|
sample_rate: 16000, |
|
model: "nova-2", |
|
speech_final: true, |
|
version: "latest", |
|
}); |
|
|
|
const callMLServer = async (text) => { |
|
try { |
|
session.call.write({ text: text }); |
|
} catch (error) { |
|
console.error("Error in calling ml server : ", error); |
|
} |
|
} |
|
|
|
|
|
connection.on(LiveTranscriptionEvents.Open, () => { |
|
console.log(LiveTranscriptionEvents.Open); |
|
connection.on(LiveTranscriptionEvents.Close, () => { |
|
console.log("Connection closed."); |
|
}); |
|
|
|
connection.on(LiveTranscriptionEvents.Transcript, (data) => { |
|
const text = data?.channel?.alternatives[0]?.transcript; |
|
|
|
if (data.is_final && data.speech_final && text) { |
|
console.log("Response : ", text); |
|
callMLServer(text); |
|
} |
|
}); |
|
|
|
connection.on(LiveTranscriptionEvents.Metadata, (data) => { |
|
console.log(data); |
|
}); |
|
|
|
connection.on(LiveTranscriptionEvents.Error, (err) => { |
|
console.error(err); |
|
}); |
|
}); |
|
|
|
|
|
wss.on("message", async (message) => { |
|
try { |
|
if (isBuffer(message) && session.call) { |
|
try { |
|
const audioChunk = { |
|
audio_data: message, |
|
}; |
|
|
|
try { |
|
if (connection && connection.getReadyState() == 1) { |
|
connection.send(message); |
|
} |
|
} catch (error) { |
|
console.log("Error sending buffer to deepgram : ", error); |
|
} |
|
} catch (err) { |
|
console.error("Error writing to stream: ", err); |
|
} |
|
} |
|
|
|
|
|
if (typeof message === "string") { |
|
try { |
|
const data = JSON.parse(message); |
|
|
|
const { type, msg } = data; |
|
|
|
switch (type) { |
|
case "start": |
|
session.starttime = Date.now(); |
|
session.chathistory = []; |
|
session.chathistorybackup = []; |
|
console.log("Making Connection with gRPC..."); |
|
try { |
|
console.time("grpcconnection"); |
|
session.call = await getgRPCConnection(session); |
|
console.timeEnd("grpcconnection"); |
|
const state = session.channel.getConnectivityState(false); |
|
console.log(`Client : ${state}`); |
|
session.saved = false; |
|
wss.send(JSON.stringify({ type: "ready", msg: "connected" })); |
|
console.log("Connected to gRPC."); |
|
|
|
const { |
|
sessionId, |
|
} = JSON.parse(msg); |
|
const metadata = { |
|
metadata: { |
|
session_id: sessionId, |
|
}, |
|
}; |
|
if (session.call) { |
|
console.log("Sending metadata.") |
|
session.call.write(metadata); |
|
} |
|
} catch (err) { |
|
await cleanupConnection(session); |
|
console.error("Error in making gRPC Connection. : ", err); |
|
} |
|
session.call.on("data", (response) => { |
|
console.log("Data : ", response); |
|
|
|
const {session_id , sequence_id , transcript , buffer} = response; |
|
|
|
const metadata = JSON.stringify({ |
|
session_id: session_id, |
|
sequence_id: sequence_id, |
|
transcript: transcript, |
|
}); |
|
|
|
if (sequence_id === "-2") { |
|
session.latency = Date.now(); |
|
wss.send(JSON.stringify({ type: "clear", msg: "clear" })); |
|
session.chathistory = [...session.chathistorybackup]; |
|
wss.send( |
|
JSON.stringify({ |
|
type: "chathistory", |
|
msg: session.chathistorybackup, |
|
}) |
|
); |
|
const wavBuffer = Buffer.concat([ |
|
Buffer.from(metadata), |
|
Buffer.from([0]), |
|
buffer, |
|
]); |
|
|
|
const base64buffer = wavBuffer.toString("base64"); |
|
wss.send( |
|
JSON.stringify({ type: "media", msg: base64buffer }) |
|
); |
|
session.chathistory.push({ |
|
speaker: "USER", |
|
content: transcript, |
|
}); |
|
wss.send( |
|
JSON.stringify({ |
|
type: "chathistory", |
|
msg: session.chathistory, |
|
}) |
|
); |
|
session.chathistorybackup.push({ |
|
speaker: "USER", |
|
content: transcript, |
|
}); |
|
return; |
|
} |
|
|
|
if (sequence_id === "0") { |
|
wss.send(JSON.stringify({ type: "pause", msg: "pause" })); |
|
session.cansend = false; |
|
return; |
|
} |
|
|
|
if (sequence_id === "-1") { |
|
wss.send( |
|
JSON.stringify({ type: "continue", msg: "continue" }) |
|
); |
|
return; |
|
} |
|
|
|
if (sequence_id === "1") { |
|
const latency = Date.now() - session.latency; |
|
console.log("First Response Latency: ", latency, "ms"); |
|
session.latency = 0; |
|
|
|
session.cansend = true; |
|
} |
|
|
|
if (!buffer) { |
|
return; |
|
} |
|
|
|
if (!session.cansend && sequence_id !== "0") { |
|
return; |
|
} |
|
|
|
|
|
const wavBuffer = Buffer.concat([ |
|
Buffer.from(metadata), |
|
Buffer.from([0]), |
|
buffer, |
|
]); |
|
|
|
const base64buffer = wavBuffer.toString("base64"); |
|
wss.send( |
|
JSON.stringify({ type: "media", msg: base64buffer }) |
|
); |
|
|
|
updateChathistory(transcript, false, session); |
|
|
|
wss.send( |
|
JSON.stringify({ |
|
type: "chathistory", |
|
msg: session.chathistory, |
|
}) |
|
); |
|
}); |
|
|
|
session.call.on("end", async () => { |
|
console.log("Ended"); |
|
await cleanupConnection(session); |
|
try { |
|
wss.send(JSON.stringify({ type: "end", msg: "end" })); |
|
} catch (err) { } |
|
console.log("Stream ended"); |
|
}); |
|
|
|
session.call.on("error", async (error) => { |
|
console.error(`Stream error: ${error}`); |
|
try { |
|
wss.send(JSON.stringify({ type: "end", msg: "end" })); |
|
} catch (err) { } |
|
await cleanupConnection(session); |
|
}); |
|
break; |
|
|
|
case "status": |
|
const { session_id, sequence_id, transcript } = msg; |
|
const status = { |
|
status: { |
|
transcript : transcript, |
|
played_seq: sequence_id, |
|
interrupt_seq: sequence_id, |
|
}, |
|
}; |
|
|
|
if (session.call) { |
|
session.call.write(status); |
|
} |
|
|
|
updateChathistory(transcript, true, session); |
|
break; |
|
|
|
case "stop": |
|
console.log("Client Stoped the stream."); |
|
await cleanupConnection(session); |
|
break; |
|
default: |
|
console.log("Type not handled."); |
|
} |
|
} catch (err) { |
|
console.log(`Not a valid json : ${err}`); |
|
} |
|
} |
|
} catch (err) { |
|
console.error(`Error in wss.onmessage : ${err}`); |
|
} |
|
}); |
|
|
|
wss.on("close", async () => { |
|
await cleanupConnection(session); |
|
console.log("WebSocket connection closed."); |
|
}); |
|
|
|
wss.on("error", async (err) => { |
|
console.error(`WebSocket error: ${err}`); |
|
await cleanupConnection(session); |
|
}); |
|
} catch (err) { |
|
try { |
|
console.log(err) |
|
wss.send(JSON.stringify({ type: "end", msg: "end" })); |
|
} catch (err) { } |
|
} |
|
}; |
|
|
|
module.exports = { audio_stream }; |
|
|