diff --git a/server.js b/server.js index 7d2a420..f1d6bdb 100644 --- a/server.js +++ b/server.js @@ -1,7 +1,6 @@ const express = require('express'); const WebSocket = require('ws'); const { spawn } = require('child_process'); -const fs = require('fs'); const app = express(); const PORT = 3000; @@ -17,53 +16,45 @@ const wss = new WebSocket.Server({ server }); class SpeechProcessor { constructor() { this.pythonProcess = null; - this.requestMap = new Map(); + this.activeRequests = new Map(); this.requestCounter = 0; this.initializePythonProcess(); + this.buffer = Buffer.alloc(0); } initializePythonProcess() { - try { - this.pythonProcess = spawn('python3', ['speech_processor.py'], { - stdio: ['pipe', 'pipe', 'pipe'] - }); + this.pythonProcess = spawn('python3', ['speech_processor.py']); - this.pythonProcess.stderr.on('data', (data) => { - console.error('Python STDERR:', data.toString()); - }); + this.pythonProcess.stderr.on('data', (data) => { + console.error('Python STDERR:', data.toString()); + }); - this.pythonProcess.on('close', (code) => { - console.log(`Python process exited with code ${code}`); - this.requestMap.clear(); - setTimeout(() => this.initializePythonProcess(), 1000); - }); + this.pythonProcess.on('close', (code) => { + console.log(`Python process exited with code ${code}`); + this.activeRequests.clear(); + setTimeout(() => this.initializePythonProcess(), 1000); + }); - let buffer = Buffer.alloc(0); - this.pythonProcess.stdout.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - this.processBuffer(buffer); - }); - - console.log('Python processor initialized'); - } catch (error) { - console.error('Failed to start Python:', error); - } + this.pythonProcess.stdout.on('data', (data) => { + this.buffer = Buffer.concat([this.buffer, data]); + this.processBuffer(); + }); } - processBuffer(buffer) { - while (buffer.length >= 8) { - const length = buffer.readUInt32BE(0); - const requestId = buffer.readUInt32BE(4); - - if (buffer.length >= 8 + length) { - const message = buffer.slice(8, 8 + length); - buffer = buffer.slice(8 + length); - + processBuffer() { + while (this.buffer.length >= 8) { + const length = this.buffer.readUInt32BE(0); + const requestId = this.buffer.readUInt32BE(4); + + if (this.buffer.length >= 8 + length) { + const message = this.buffer.slice(8, 8 + length); + this.buffer = this.buffer.slice(8 + length); + try { const result = JSON.parse(message.toString()); - if (this.requestMap.has(requestId)) { - const { resolve } = this.requestMap.get(requestId); - this.requestMap.delete(requestId); + if (this.activeRequests.has(requestId)) { + const { resolve } = this.activeRequests.get(requestId); + this.activeRequests.delete(requestId); resolve(result); } } catch (error) { @@ -73,7 +64,6 @@ class SpeechProcessor { break; } } - return buffer; } async processAudio(audioBuffer) { @@ -84,7 +74,7 @@ class SpeechProcessor { } const requestId = this.requestCounter++; - this.requestMap.set(requestId, { resolve, reject }); + this.activeRequests.set(requestId, { resolve, reject }); const lengthBuffer = Buffer.alloc(4); lengthBuffer.writeUInt32BE(audioBuffer.length, 0); @@ -97,8 +87,8 @@ class SpeechProcessor { this.pythonProcess.stdin.write(audioBuffer); setTimeout(() => { - if (this.requestMap.has(requestId)) { - this.requestMap.delete(requestId); + if (this.activeRequests.has(requestId)) { + this.activeRequests.delete(requestId); reject(new Error('Processing timeout')); } }, 5000); @@ -112,49 +102,61 @@ wss.on('connection', (ws) => { console.log('Client connected'); let lastFinalText = ''; - let lastPartialUpdate = 0; - let partialText = ''; - + let lastPartialText = ''; + let partialTextTimeout = null; + ws.on('message', async (message) => { + if (!Buffer.isBuffer(message)) return; + try { - if (Buffer.isBuffer(message)) { - const result = await speechProcessor.processAudio(message); - - if (result.success) { - if (result.is_final) { - if (result.text && result.text !== lastFinalText) { - lastFinalText = result.text; - ws.send(JSON.stringify({ - type: 'transcription', - text: result.text, - is_final: true - })); - console.log('Final:', result.text); - partialText = ''; - } - } else { - // Only send partial updates every 300ms - const now = Date.now(); - if (result.text && (now - lastPartialUpdate > 300 || !result.text.startsWith(partialText))) { - partialText = result.text; - lastPartialUpdate = now; - ws.send(JSON.stringify({ - type: 'partial_transcription', - text: result.text, - is_final: false - })); - } - } - } else { - console.error('Processing error:', result.error); + const result = await speechProcessor.processAudio(message); + + if (!result.success) { + console.error('Processing error:', result.error); + return; + } + + // Handle final results + if (result.is_final && result.text && result.text !== lastFinalText) { + lastFinalText = result.text; + ws.send(JSON.stringify({ + type: 'transcription', + text: result.text, + is_final: true + })); + console.log('Final:', result.text); + lastPartialText = ''; + if (partialTextTimeout) { + clearTimeout(partialTextTimeout); + partialTextTimeout = null; } + } + // Handle partial results with debouncing + else if (!result.is_final && result.text && result.text !== lastPartialText) { + lastPartialText = result.text; + + if (partialTextTimeout) { + clearTimeout(partialTextTimeout); + } + + partialTextTimeout = setTimeout(() => { + ws.send(JSON.stringify({ + type: 'partial_transcription', + text: result.text, + is_final: false + })); + partialTextTimeout = null; + }, 300); // 300ms debounce } } catch (error) { - console.error('WebSocket error:', error); + console.error('Processing error:', error); } }); ws.on('close', () => { console.log('Client disconnected'); + if (partialTextTimeout) { + clearTimeout(partialTextTimeout); + } }); }); \ No newline at end of file