master
ov 2025-06-26 15:45:08 +00:00
parent 263e9aa209
commit 66b95d3974
5 changed files with 129 additions and 42 deletions

Binary file not shown.

BIN
output.wav Normal file

Binary file not shown.

View File

@ -13,3 +13,5 @@ echo 'Welcome to the world of speech synthesis!' | \
./piper/piper --model en_US-lessac-medium.onnx --output_file welcome.wav
or run any script
gunicorn -w 4 -b 0.0.0.0:4005 stt_piper:app

View File

@ -3,88 +3,83 @@ import subprocess
import os
import time
import random
import re
from datetime import datetime
from werkzeug.middleware.proxy_fix import ProxyFix
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
# Ensure storage directories exist
os.makedirs('texts', exist_ok=True)
os.makedirs('audio', exist_ok=True)
# Configuration
MAX_TEXT_LENGTH = 1000
TEXT_DIR = 'data/texts'
AUDIO_DIR = 'data/audio'
# Ensure directories exist
os.makedirs(TEXT_DIR, exist_ok=True)
os.makedirs(AUDIO_DIR, exist_ok=True)
def sanitize_text(text):
"""Remove potentially dangerous characters"""
return re.sub(r'[;$`|]', '', text)
def generate_filename():
"""Generate timestamp + random number filename"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
random_num = random.randint(1000, 9999)
return f"{timestamp}_{random_num}"
@app.route('/tts', methods=['POST'])
def tts():
# Validate input
if not request.is_json:
return {"error": "Request must be JSON"}, 400
text = request.json.get('text', '').strip()
text = sanitize_text(request.json.get('text', '').strip())
if not text:
return {"error": "No text provided"}, 400
if len(text) > 1000:
return {"error": "Text too long (max 1000 characters)"}, 400
if len(text) > MAX_TEXT_LENGTH:
return {"error": f"Text too long (max {MAX_TEXT_LENGTH} characters)"}, 400
# Generate unique filename
base_filename = generate_filename()
text_filename = f"data/texts/{base_filename}.txt"
wav_filename = f"data/audio/{base_filename}.wav"
text_filename = os.path.join(TEXT_DIR, f"{base_filename}.txt")
wav_filename = os.path.join(AUDIO_DIR, f"{base_filename}.wav")
try:
# Save the input text
with open(text_filename, 'w') as f:
# Save input text
with open(text_filename, 'w', encoding='utf-8') as f:
f.write(text)
# Generate WAV audio with Piper - directly to file first
# SAFE Piper execution (no shell=True)
piper_cmd = [
'echo', f'"{text}"', '|',
'./piper/piper',
'--model', './model/en_US-amy-medium.onnx',
'--output_file', wav_filename
]
# Run the command
process = subprocess.run(
' '.join(piper_cmd),
shell=True,
piper_cmd,
input=text.encode('utf-8'),
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Verify the output file was created
if not os.path.exists(wav_filename):
raise Exception("Piper failed to create audio file")
raise Exception("Audio file not created")
# Get file size for logging
file_size = os.path.getsize(wav_filename)
# Read the generated audio
with open(wav_filename, 'rb') as f:
audio_data = f.read()
# Log the successful generation
print(f"Generated TTS: {len(text)} chars -> {file_size} bytes audio")
# Return WAV audio directly
return Response(
audio_data,
f.read(),
mimetype='audio/wav',
headers={'Content-Disposition': f'attachment; filename={base_filename}.wav'}
)
except subprocess.CalledProcessError as e:
error_msg = f"Piper TTS failed: {e.stderr.decode().strip()}"
print(error_msg)
return {"error": "TTS generation failed", "details": error_msg}, 500
app.logger.error(f"Piper failed: {e.stderr.decode()}")
return {"error": "TTS generation failed"}, 500
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
print(error_msg)
return {"error": "TTS processing failed", "details": error_msg}, 500
app.logger.error(f"Unexpected error: {str(e)}")
return {"error": "Processing failed"}, 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=4005, debug=True)
app.run(host='0.0.0.0', port=4005) # Remove debug=True for production

90
stt_piper.py1 Normal file
View File

@ -0,0 +1,90 @@
from flask import Flask, request, Response
import subprocess
import os
import time
import random
from datetime import datetime
app = Flask(__name__)
# Ensure storage directories exist
os.makedirs('texts', exist_ok=True)
os.makedirs('audio', exist_ok=True)
def generate_filename():
"""Generate timestamp + random number filename"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
random_num = random.randint(1000, 9999)
return f"{timestamp}_{random_num}"
@app.route('/tts', methods=['POST'])
def tts():
# Validate input
if not request.is_json:
return {"error": "Request must be JSON"}, 400
text = request.json.get('text', '').strip()
if not text:
return {"error": "No text provided"}, 400
if len(text) > 1000:
return {"error": "Text too long (max 1000 characters)"}, 400
# Generate unique filename
base_filename = generate_filename()
text_filename = f"data/texts/{base_filename}.txt"
wav_filename = f"data/audio/{base_filename}.wav"
try:
# Save the input text
with open(text_filename, 'w') as f:
f.write(text)
# Generate WAV audio with Piper - directly to file first
piper_cmd = [
'echo', f'"{text}"', '|',
'./piper/piper',
'--model', './model/en_US-amy-medium.onnx',
'--output_file', wav_filename
]
# Run the command
process = subprocess.run(
' '.join(piper_cmd),
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Verify the output file was created
if not os.path.exists(wav_filename):
raise Exception("Piper failed to create audio file")
# Get file size for logging
file_size = os.path.getsize(wav_filename)
# Read the generated audio
with open(wav_filename, 'rb') as f:
audio_data = f.read()
# Log the successful generation
print(f"Generated TTS: {len(text)} chars -> {file_size} bytes audio")
# Return WAV audio directly
return Response(
audio_data,
mimetype='audio/wav',
headers={'Content-Disposition': f'attachment; filename={base_filename}.wav'}
)
except subprocess.CalledProcessError as e:
error_msg = f"Piper TTS failed: {e.stderr.decode().strip()}"
print(error_msg)
return {"error": "TTS generation failed", "details": error_msg}, 500
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
print(error_msg)
return {"error": "TTS processing failed", "details": error_msg}, 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=4005, debug=True)