const express = require('express'); const WebSocket = require('ws'); const { spawn } = require('child_process'); const fs = require('fs'); const app = express(); const PORT = 3000; app.use(express.static('public')); const server = app.listen(PORT, () => { console.log(`Server running on http://localhost:${PORT}`); }); const wss = new WebSocket.Server({ server }); class SpeechProcessor { constructor() { this.pythonProcess = null; this.requestMap = new Map(); this.requestCounter = 0; this.initializePythonProcess(); } initializePythonProcess() { try { this.pythonProcess = spawn('python3', ['speech_processor.py'], { stdio: ['pipe', 'pipe', 'pipe'] }); this.pythonProcess.stderr.on('data', (data) => { console.error('Python STDERR:', data.toString()); }); this.pythonProcess.on('close', (code) => { console.log(`Python process exited with code ${code}`); this.requestMap.clear(); setTimeout(() => this.initializePythonProcess(), 1000); }); let buffer = Buffer.alloc(0); this.pythonProcess.stdout.on('data', (data) => { buffer = Buffer.concat([buffer, data]); this.processBuffer(buffer); }); console.log('Python processor initialized'); } catch (error) { console.error('Failed to start Python:', error); } } processBuffer(buffer) { while (buffer.length >= 8) { const length = buffer.readUInt32BE(0); const requestId = buffer.readUInt32BE(4); if (buffer.length >= 8 + length) { const message = buffer.slice(8, 8 + length); buffer = buffer.slice(8 + length); try { const result = JSON.parse(message.toString()); if (this.requestMap.has(requestId)) { const { resolve } = this.requestMap.get(requestId); this.requestMap.delete(requestId); resolve(result); } } catch (error) { console.error('Failed to parse message:', error); } } else { break; } } return buffer; } async processAudio(audioBuffer) { return new Promise((resolve, reject) => { if (!this.pythonProcess) { reject(new Error('Processor not ready')); return; } const requestId = this.requestCounter++; this.requestMap.set(requestId, { resolve, reject }); const lengthBuffer = Buffer.alloc(4); lengthBuffer.writeUInt32BE(audioBuffer.length, 0); const idBuffer = Buffer.alloc(4); idBuffer.writeUInt32BE(requestId, 0); this.pythonProcess.stdin.write(lengthBuffer); this.pythonProcess.stdin.write(idBuffer); this.pythonProcess.stdin.write(audioBuffer); setTimeout(() => { if (this.requestMap.has(requestId)) { this.requestMap.delete(requestId); reject(new Error('Processing timeout')); } }, 5000); }); } } const speechProcessor = new SpeechProcessor(); wss.on('connection', (ws) => { console.log('Client connected'); let lastFinalText = ''; let lastPartialUpdate = 0; let partialText = ''; ws.on('message', async (message) => { try { if (Buffer.isBuffer(message)) { const result = await speechProcessor.processAudio(message); if (result.success) { if (result.is_final) { if (result.text && result.text !== lastFinalText) { lastFinalText = result.text; ws.send(JSON.stringify({ type: 'transcription', text: result.text, is_final: true })); console.log('Final:', result.text); partialText = ''; } } else { // Only send partial updates every 300ms const now = Date.now(); if (result.text && (now - lastPartialUpdate > 300 || !result.text.startsWith(partialText))) { partialText = result.text; lastPartialUpdate = now; ws.send(JSON.stringify({ type: 'partial_transcription', text: result.text, is_final: false })); } } } else { console.error('Processing error:', result.error); } } } catch (error) { console.error('WebSocket error:', error); } }); ws.on('close', () => { console.log('Client disconnected'); }); });