#!/usr/bin/env python3 import vosk import sys import json import tempfile import os import soundfile as sf import numpy as np import struct # Global model - load once model = None recognizer = None def initialize_vosk(): """Initialize Vosk model""" global model, recognizer model_path = "/app/vosk-model" # Update this path to your model location if not os.path.exists(model_path): return {"success": False, "error": f"Vosk model not found at {model_path}"} try: vosk.SetLogLevel(-1) # Reduce log verbosity model = vosk.Model(model_path) recognizer = vosk.KaldiRecognizer(model, 16000) return {"success": True} except Exception as e: return {"success": False, "error": f"Failed to initialize Vosk: {str(e)}"} def process_audio_chunk(audio_data, request_id): """Process audio data and return transcription""" global recognizer if not recognizer: init_result = initialize_vosk() if not init_result["success"]: return {**init_result, "requestId": request_id} try: # Write audio data to temporary file with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: temp_file.write(audio_data) temp_filename = temp_file.name # Read audio file with soundfile try: audio_data, sample_rate = sf.read(temp_filename, dtype='float32') # Convert to 16kHz if needed if sample_rate != 16000: # Simple resampling (for production, use proper resampling) num_samples = int(len(audio_data) * 16000 / sample_rate) audio_data = np.interp( np.linspace(0, len(audio_data)-1, num_samples), np.arange(len(audio_data)), audio_data ) sample_rate = 16000 # Convert to 16-bit PCM audio_data = (audio_data * 32767).astype('int16') # Process with Vosk if recognizer.AcceptWaveform(audio_data.tobytes()): result = json.loads(recognizer.Result()) text = result.get('text', '') is_final = True else: result = json.loads(recognizer.PartialResult()) text = result.get('partial', '') is_final = False # Clean up os.unlink(temp_filename) return { "success": True, "text": text, "is_final": is_final, "requestId": request_id } except Exception as e: os.unlink(temp_filename) return { "success": False, "error": f"Audio processing error: {str(e)}", "requestId": request_id } except Exception as e: return { "success": False, "error": f"General error: {str(e)}", "requestId": request_id } def main(): """Main loop to process audio chunks from stdin""" # Initialize Vosk on startup init_result = initialize_vosk() if not init_result["success"]: error_response = json.dumps({ **init_result, "requestId": 0 }).encode('utf-8') sys.stdout.buffer.write(struct.pack('>I', len(error_response))) sys.stdout.buffer.write(error_response) sys.stdout.buffer.flush() sys.exit(1) while True: try: # Read length of incoming data (4 bytes) length_data = sys.stdin.buffer.read(4) if not length_data: break length = struct.unpack('>I', length_data)[0] # Read request ID (4 bytes) id_data = sys.stdin.buffer.read(4) if not id_data: break request_id = struct.unpack('>I', id_data)[0] # Read audio data audio_data = sys.stdin.buffer.read(length) if len(audio_data) != length: break # Process audio result = process_audio_chunk(audio_data, request_id) # Send result back response = json.dumps(result).encode('utf-8') sys.stdout.buffer.write(struct.pack('>I', len(response))) sys.stdout.buffer.write(response) sys.stdout.buffer.flush() except Exception as e: error_result = { "success": False, "error": str(e), "requestId": request_id if 'request_id' in locals() else 0 } response = json.dumps(error_result).encode('utf-8') sys.stdout.buffer.write(struct.pack('>I', len(response))) sys.stdout.buffer.write(response) sys.stdout.buffer.flush() if __name__ == "__main__": main()