122 lines
4.1 KiB
Python
122 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
import vosk
|
|
import sys
|
|
import json
|
|
import tempfile
|
|
import os
|
|
import wave
|
|
import soundfile as sf
|
|
|
|
# Global model - load once
|
|
model = None
|
|
recognizer = None
|
|
|
|
def initialize_vosk():
|
|
"""Initialize Vosk model"""
|
|
global model, recognizer
|
|
|
|
model_path = "/app/vosk-model"
|
|
if not os.path.exists(model_path):
|
|
return {"success": False, "error": "Vosk model not found at /app/vosk-model"}
|
|
|
|
try:
|
|
vosk.SetLogLevel(-1) # Reduce log verbosity
|
|
model = vosk.Model(model_path)
|
|
recognizer = vosk.KaldiRecognizer(model, 16000)
|
|
return {"success": True}
|
|
except Exception as e:
|
|
return {"success": False, "error": f"Failed to initialize Vosk: {str(e)}"}
|
|
|
|
def process_audio_chunk(audio_data):
|
|
"""Process audio data and return transcription"""
|
|
global recognizer
|
|
|
|
if not recognizer:
|
|
init_result = initialize_vosk()
|
|
if not init_result["success"]:
|
|
return init_result
|
|
|
|
try:
|
|
# Write audio data to temporary file
|
|
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
|
|
temp_file.write(audio_data)
|
|
temp_filename = temp_file.name
|
|
|
|
# Read audio file with soundfile
|
|
try:
|
|
audio_data, sample_rate = sf.read(temp_filename)
|
|
|
|
# Convert to 16-bit PCM at 16kHz if needed
|
|
if sample_rate != 16000:
|
|
# Simple resampling (for better quality, use librosa)
|
|
import numpy as np
|
|
audio_data = np.interp(
|
|
np.linspace(0, len(audio_data), int(len(audio_data) * 16000 / sample_rate)),
|
|
np.arange(len(audio_data)),
|
|
audio_data
|
|
)
|
|
|
|
# Convert to bytes
|
|
audio_bytes = (audio_data * 32767).astype('int16').tobytes()
|
|
|
|
# Process with Vosk
|
|
if recognizer.AcceptWaveform(audio_bytes):
|
|
result = json.loads(recognizer.Result())
|
|
text = result.get('text', '')
|
|
else:
|
|
result = json.loads(recognizer.PartialResult())
|
|
text = result.get('partial', '')
|
|
|
|
# Clean up
|
|
os.unlink(temp_filename)
|
|
|
|
return {"success": True, "text": text}
|
|
|
|
except Exception as e:
|
|
os.unlink(temp_filename)
|
|
return {"success": False, "error": f"Audio processing error: {str(e)}"}
|
|
|
|
except Exception as e:
|
|
return {"success": False, "error": f"General error: {str(e)}"}
|
|
|
|
def main():
|
|
"""Main loop to process audio chunks from stdin"""
|
|
# Initialize Vosk on startup
|
|
init_result = initialize_vosk()
|
|
if not init_result["success"]:
|
|
error_response = json.dumps(init_result).encode('utf-8')
|
|
sys.stdout.buffer.write(len(error_response).to_bytes(4, byteorder='big'))
|
|
sys.stdout.buffer.write(error_response)
|
|
sys.stdout.buffer.flush()
|
|
sys.exit(1)
|
|
|
|
while True:
|
|
try:
|
|
# Read length of incoming data
|
|
length_data = sys.stdin.buffer.read(4)
|
|
if not length_data:
|
|
break
|
|
|
|
length = int.from_bytes(length_data, byteorder='big')
|
|
|
|
# Read audio data
|
|
audio_data = sys.stdin.buffer.read(length)
|
|
|
|
# Process audio
|
|
result = process_audio_chunk(audio_data)
|
|
|
|
# Send result back
|
|
response = json.dumps(result).encode('utf-8')
|
|
sys.stdout.buffer.write(len(response).to_bytes(4, byteorder='big'))
|
|
sys.stdout.buffer.write(response)
|
|
sys.stdout.buffer.flush()
|
|
|
|
except Exception as e:
|
|
error_result = {"success": False, "error": str(e)}
|
|
response = json.dumps(error_result).encode('utf-8')
|
|
sys.stdout.buffer.write(len(response).to_bytes(4, byteorder='big'))
|
|
sys.stdout.buffer.write(response)
|
|
sys.stdout.buffer.flush()
|
|
|
|
if __name__ == "__main__":
|
|
main() |