| import os
|
| import time
|
| import uuid
|
| import json
|
| import requests
|
| import subprocess
|
| import asyncio
|
| import threading
|
| import hashlib
|
| import re
|
| from datetime import datetime, timedelta
|
| from typing import Optional, Dict, List, Tuple
|
| from dataclasses import dataclass, asdict
|
| from concurrent.futures import ThreadPoolExecutor
|
| import sqlite3
|
| from contextlib import contextmanager
|
| from dotenv import load_dotenv
|
| from azure.storage.blob import BlobServiceClient
|
| import tempfile
|
| import shutil
|
|
|
|
|
| load_dotenv()
|
|
|
| def _require_env_var(varname):
|
| value = os.environ.get(varname)
|
| if not value or value.strip() == "" or "your" in value.lower():
|
| raise ValueError(f"Environment variable {varname} is missing or invalid. Check your .env file.")
|
| return value
|
|
|
|
|
| AZURE_SPEECH_KEY = _require_env_var("AZURE_SPEECH_KEY")
|
| AZURE_SPEECH_KEY_ENDPOINT = _require_env_var("AZURE_SPEECH_KEY_ENDPOINT").rstrip('/')
|
| AZURE_REGION = _require_env_var("AZURE_REGION")
|
| AZURE_BLOB_CONNECTION = _require_env_var("AZURE_BLOB_CONNECTION")
|
| AZURE_CONTAINER = _require_env_var("AZURE_CONTAINER")
|
| AZURE_BLOB_SAS_TOKEN = _require_env_var("AZURE_BLOB_SAS_TOKEN")
|
| ALLOWED_LANGS = json.loads(os.environ.get("ALLOWED_LANGS", "{}"))
|
| API_VERSION = os.environ.get("API_VERSION", "v3.2")
|
|
|
|
|
| UPLOAD_DIR = "uploads"
|
| DB_DIR = "database"
|
| os.makedirs(UPLOAD_DIR, exist_ok=True)
|
| os.makedirs(DB_DIR, exist_ok=True)
|
|
|
| AUDIO_FORMATS = [
|
| "wav", "mp3", "ogg", "opus", "flac", "wma", "aac", "alaw", "mulaw", "amr", "webm", "speex"
|
| ]
|
|
|
| @dataclass
|
| class User:
|
| user_id: str
|
| email: str
|
| username: str
|
| password_hash: str
|
| created_at: str
|
| last_login: Optional[str] = None
|
| is_active: bool = True
|
| gdpr_consent: bool = False
|
| data_retention_agreed: bool = False
|
| marketing_consent: bool = False
|
|
|
| @dataclass
|
| class TranscriptionJob:
|
| job_id: str
|
| user_id: str
|
| original_filename: str
|
| audio_url: str
|
| language: str
|
| status: str
|
| created_at: str
|
| completed_at: Optional[str] = None
|
| transcript_text: Optional[str] = None
|
| transcript_url: Optional[str] = None
|
| error_message: Optional[str] = None
|
| azure_trans_id: Optional[str] = None
|
| settings: Optional[Dict] = None
|
|
|
| class AuthManager:
|
| """Handle user authentication and PDPA compliance"""
|
|
|
| @staticmethod
|
| def hash_password(password: str) -> str:
|
| """Hash password using SHA-256 with salt"""
|
| salt = "azure_speech_transcription_salt_2024"
|
| return hashlib.sha256((password + salt).encode()).hexdigest()
|
|
|
| @staticmethod
|
| def verify_password(password: str, password_hash: str) -> bool:
|
| """Verify password against hash"""
|
| return AuthManager.hash_password(password) == password_hash
|
|
|
| @staticmethod
|
| def validate_email(email: str) -> bool:
|
| """Validate email format"""
|
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
| return re.match(pattern, email) is not None
|
|
|
| @staticmethod
|
| def validate_username(username: str) -> bool:
|
| """Validate username format"""
|
|
|
| pattern = r'^[a-zA-Z0-9_]{3,30}$'
|
| return re.match(pattern, username) is not None
|
|
|
| @staticmethod
|
| def validate_password(password: str) -> Tuple[bool, str]:
|
| """Validate password strength"""
|
| if len(password) < 8:
|
| return False, "Password must be at least 8 characters long"
|
| if not re.search(r'[A-Z]', password):
|
| return False, "Password must contain at least one uppercase letter"
|
| if not re.search(r'[a-z]', password):
|
| return False, "Password must contain at least one lowercase letter"
|
| if not re.search(r'\d', password):
|
| return False, "Password must contain at least one number"
|
| return True, "Password is valid"
|
|
|
| class DatabaseManager:
|
| def __init__(self, db_path: str = None):
|
| self.db_path = db_path or os.path.join(DB_DIR, "transcriptions.db")
|
| self.blob_service = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION)
|
| self.db_blob_name = "shared/database/transcriptions.db"
|
| self._lock = threading.Lock()
|
| self._last_backup_time = 0
|
| self._backup_interval = 30
|
|
|
|
|
| self.init_database()
|
|
|
| def _download_db_from_blob(self):
|
| """Download database from Azure Blob Storage if it exists"""
|
| try:
|
| blob_client = self.blob_service.get_blob_client(container=AZURE_CONTAINER, blob=self.db_blob_name)
|
|
|
|
|
| if blob_client.exists():
|
| print("📥 Downloading existing shared database from Azure Blob Storage...")
|
|
|
|
|
| with tempfile.NamedTemporaryFile(delete=False) as temp_file:
|
| temp_path = temp_file.name
|
|
|
|
|
| with open(temp_path, "wb") as download_file:
|
| download_file.write(blob_client.download_blob().readall())
|
|
|
|
|
| os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
| shutil.move(temp_path, self.db_path)
|
|
|
| print("✅ Shared database downloaded successfully")
|
| return True
|
| else:
|
| print("📝 No existing shared database found in blob storage, will create new one")
|
| return False
|
|
|
| except Exception as e:
|
| print(f"⚠️ Warning: Could not download shared database from blob storage: {e}")
|
| print("📝 Will create new local database")
|
| return False
|
|
|
| def _upload_db_to_blob(self):
|
| """Upload database to Azure Blob Storage with rate limiting"""
|
| try:
|
| current_time = time.time()
|
| if current_time - self._last_backup_time < self._backup_interval:
|
| return
|
|
|
| if not os.path.exists(self.db_path):
|
| return
|
|
|
| blob_client = self.blob_service.get_blob_client(container=AZURE_CONTAINER, blob=self.db_blob_name)
|
|
|
| with open(self.db_path, "rb") as data:
|
| blob_client.upload_blob(data, overwrite=True)
|
|
|
| self._last_backup_time = current_time
|
|
|
| except Exception as e:
|
| print(f"⚠️ Warning: Could not upload shared database to blob storage: {e}")
|
|
|
| @contextmanager
|
| def get_connection(self):
|
| with self._lock:
|
| conn = sqlite3.connect(self.db_path, timeout=30.0)
|
| conn.row_factory = sqlite3.Row
|
| try:
|
| yield conn
|
| finally:
|
| conn.close()
|
|
|
| threading.Thread(target=self._upload_db_to_blob, daemon=True).start()
|
|
|
| def init_database(self):
|
|
|
| self._download_db_from_blob()
|
|
|
|
|
| with self.get_connection() as conn:
|
|
|
| conn.execute("""
|
| CREATE TABLE IF NOT EXISTS users (
|
| user_id TEXT PRIMARY KEY,
|
| email TEXT UNIQUE NOT NULL,
|
| username TEXT UNIQUE NOT NULL,
|
| password_hash TEXT NOT NULL,
|
| created_at TEXT NOT NULL,
|
| last_login TEXT,
|
| is_active BOOLEAN DEFAULT 1,
|
| gdpr_consent BOOLEAN DEFAULT 0,
|
| data_retention_agreed BOOLEAN DEFAULT 0,
|
| marketing_consent BOOLEAN DEFAULT 0
|
| )
|
| """)
|
|
|
|
|
| conn.execute("""
|
| CREATE TABLE IF NOT EXISTS transcriptions (
|
| job_id TEXT PRIMARY KEY,
|
| user_id TEXT NOT NULL,
|
| original_filename TEXT NOT NULL,
|
| audio_url TEXT,
|
| language TEXT NOT NULL,
|
| status TEXT NOT NULL,
|
| created_at TEXT NOT NULL,
|
| completed_at TEXT,
|
| transcript_text TEXT,
|
| transcript_url TEXT,
|
| error_message TEXT,
|
| azure_trans_id TEXT,
|
| settings TEXT,
|
| FOREIGN KEY (user_id) REFERENCES users (user_id)
|
| )
|
| """)
|
|
|
|
|
| conn.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
|
| conn.execute("CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)")
|
| conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_user_id ON transcriptions(user_id)")
|
| conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_status ON transcriptions(status)")
|
| conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_created_at ON transcriptions(created_at DESC)")
|
| conn.execute("CREATE INDEX IF NOT EXISTS idx_transcriptions_user_created ON transcriptions(user_id, created_at DESC)")
|
|
|
| conn.commit()
|
|
|
|
|
| def create_user(self, email: str, username: str, password: str, gdpr_consent: bool = True,
|
| data_retention_agreed: bool = True, marketing_consent: bool = False) -> Tuple[bool, str, Optional[str]]:
|
| """Create new user account"""
|
| try:
|
|
|
| if not AuthManager.validate_email(email):
|
| return False, "Invalid email format", None
|
|
|
| if not AuthManager.validate_username(username):
|
| return False, "Username must be 3-30 characters, alphanumeric and underscore only", None
|
|
|
| is_valid, message = AuthManager.validate_password(password)
|
| if not is_valid:
|
| return False, message, None
|
|
|
| if not gdpr_consent:
|
| return False, "GDPR consent is required to create an account", None
|
|
|
| if not data_retention_agreed:
|
| return False, "Data retention agreement is required", None
|
|
|
| user_id = str(uuid.uuid4())
|
| password_hash = AuthManager.hash_password(password)
|
|
|
| with self.get_connection() as conn:
|
|
|
| existing = conn.execute(
|
| "SELECT email, username FROM users WHERE email = ? OR username = ?",
|
| (email, username)
|
| ).fetchone()
|
|
|
| if existing:
|
| if existing['email'] == email:
|
| return False, "Email already registered", None
|
| else:
|
| return False, "Username already taken", None
|
|
|
|
|
| user = User(
|
| user_id=user_id,
|
| email=email,
|
| username=username,
|
| password_hash=password_hash,
|
| created_at=datetime.now().isoformat(),
|
| gdpr_consent=gdpr_consent,
|
| data_retention_agreed=data_retention_agreed,
|
| marketing_consent=marketing_consent
|
| )
|
|
|
| conn.execute("""
|
| INSERT INTO users
|
| (user_id, email, username, password_hash, created_at, is_active,
|
| gdpr_consent, data_retention_agreed, marketing_consent)
|
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
| """, (
|
| user.user_id, user.email, user.username, user.password_hash,
|
| user.created_at, user.is_active, user.gdpr_consent,
|
| user.data_retention_agreed, user.marketing_consent
|
| ))
|
| conn.commit()
|
|
|
| print(f"👤 New user registered: {username} ({email})")
|
| return True, "Account created successfully", user_id
|
|
|
| except Exception as e:
|
| print(f"❌ Error creating user: {str(e)}")
|
| return False, f"Registration failed: {str(e)}", None
|
|
|
| def authenticate_user(self, login: str, password: str) -> Tuple[bool, str, Optional[User]]:
|
| """Authenticate user by email or username"""
|
| try:
|
| with self.get_connection() as conn:
|
|
|
| user_row = conn.execute("""
|
| SELECT * FROM users
|
| WHERE (email = ? OR username = ?) AND is_active = 1
|
| """, (login, login)).fetchone()
|
|
|
| if not user_row:
|
| return False, "Invalid credentials", None
|
|
|
|
|
| if not AuthManager.verify_password(password, user_row['password_hash']):
|
| return False, "Invalid credentials", None
|
|
|
|
|
| conn.execute(
|
| "UPDATE users SET last_login = ? WHERE user_id = ?",
|
| (datetime.now().isoformat(), user_row['user_id'])
|
| )
|
| conn.commit()
|
|
|
|
|
| user = User(
|
| user_id=user_row['user_id'],
|
| email=user_row['email'],
|
| username=user_row['username'],
|
| password_hash=user_row['password_hash'],
|
| created_at=user_row['created_at'],
|
| last_login=datetime.now().isoformat(),
|
| is_active=bool(user_row['is_active']),
|
| gdpr_consent=bool(user_row['gdpr_consent']),
|
| data_retention_agreed=bool(user_row['data_retention_agreed']),
|
| marketing_consent=bool(user_row['marketing_consent'])
|
| )
|
|
|
| print(f"🔐 User logged in: {user.username} ({user.email})")
|
| return True, "Login successful", user
|
|
|
| except Exception as e:
|
| print(f"❌ Authentication error: {str(e)}")
|
| return False, f"Login failed: {str(e)}", None
|
|
|
| def get_user_by_id(self, user_id: str) -> Optional[User]:
|
| """Get user by ID"""
|
| try:
|
| with self.get_connection() as conn:
|
| user_row = conn.execute(
|
| "SELECT * FROM users WHERE user_id = ? AND is_active = 1",
|
| (user_id,)
|
| ).fetchone()
|
|
|
| if user_row:
|
| return User(
|
| user_id=user_row['user_id'],
|
| email=user_row['email'],
|
| username=user_row['username'],
|
| password_hash=user_row['password_hash'],
|
| created_at=user_row['created_at'],
|
| last_login=user_row['last_login'],
|
| is_active=bool(user_row['is_active']),
|
| gdpr_consent=bool(user_row['gdpr_consent']),
|
| data_retention_agreed=bool(user_row['data_retention_agreed']),
|
| marketing_consent=bool(user_row['marketing_consent'])
|
| )
|
| except Exception as e:
|
| print(f"❌ Error getting user: {str(e)}")
|
| return None
|
|
|
| def update_user_consent(self, user_id: str, marketing_consent: bool) -> bool:
|
| """Update user marketing consent"""
|
| try:
|
| with self.get_connection() as conn:
|
| conn.execute(
|
| "UPDATE users SET marketing_consent = ? WHERE user_id = ?",
|
| (marketing_consent, user_id)
|
| )
|
| conn.commit()
|
| return True
|
| except Exception as e:
|
| print(f"❌ Error updating consent: {str(e)}")
|
| return False
|
|
|
| def delete_user_account(self, user_id: str) -> bool:
|
| """Delete user account and all associated data (GDPR compliance)"""
|
| try:
|
| with self.get_connection() as conn:
|
|
|
| conn.execute("DELETE FROM transcriptions WHERE user_id = ?", (user_id,))
|
|
|
| conn.execute(
|
| "UPDATE users SET is_active = 0, email = ?, username = ? WHERE user_id = ?",
|
| (f"deleted_{user_id}@deleted.com", f"deleted_{user_id}", user_id)
|
| )
|
| conn.commit()
|
| print(f"🗑️ User account deleted: {user_id}")
|
| return True
|
| except Exception as e:
|
| print(f"❌ Error deleting user account: {str(e)}")
|
| return False
|
|
|
|
|
| def save_job(self, job: TranscriptionJob):
|
| with self.get_connection() as conn:
|
| conn.execute("""
|
| INSERT OR REPLACE INTO transcriptions
|
| (job_id, user_id, original_filename, audio_url, language, status,
|
| created_at, completed_at, transcript_text, transcript_url, error_message,
|
| azure_trans_id, settings)
|
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
| """, (
|
| job.job_id, job.user_id, job.original_filename, job.audio_url,
|
| job.language, job.status, job.created_at, job.completed_at,
|
| job.transcript_text, job.transcript_url, job.error_message,
|
| job.azure_trans_id, json.dumps(job.settings) if job.settings else None
|
| ))
|
| conn.commit()
|
|
|
| def get_job(self, job_id: str) -> Optional[TranscriptionJob]:
|
| with self.get_connection() as conn:
|
| row = conn.execute(
|
| "SELECT * FROM transcriptions WHERE job_id = ?", (job_id,)
|
| ).fetchone()
|
| if row:
|
| return self._row_to_job(row)
|
| return None
|
|
|
| def get_user_jobs(self, user_id: str, limit: int = 50) -> List[TranscriptionJob]:
|
| """Get all jobs for a specific user - PDPA compliant"""
|
| with self.get_connection() as conn:
|
| rows = conn.execute("""
|
| SELECT * FROM transcriptions
|
| WHERE user_id = ?
|
| ORDER BY created_at DESC
|
| LIMIT ?
|
| """, (user_id, limit)).fetchall()
|
| return [self._row_to_job(row) for row in rows]
|
|
|
| def get_all_jobs(self, limit: int = 100) -> List[TranscriptionJob]:
|
| """Get all jobs across all users (for admin/global view)"""
|
| with self.get_connection() as conn:
|
| rows = conn.execute("""
|
| SELECT * FROM transcriptions
|
| ORDER BY created_at DESC
|
| LIMIT ?
|
| """, (limit,)).fetchall()
|
| return [self._row_to_job(row) for row in rows]
|
|
|
| def get_pending_jobs(self) -> List[TranscriptionJob]:
|
| """Get pending jobs across all users for background processing"""
|
| with self.get_connection() as conn:
|
| rows = conn.execute(
|
| "SELECT * FROM transcriptions WHERE status IN ('pending', 'processing')"
|
| ).fetchall()
|
| return [self._row_to_job(row) for row in rows]
|
|
|
| def get_user_stats(self, user_id: str) -> Dict:
|
| """Get statistics for a specific user"""
|
| with self.get_connection() as conn:
|
| stats = {}
|
|
|
|
|
| result = conn.execute("""
|
| SELECT COUNT(*) FROM transcriptions WHERE user_id = ?
|
| """, (user_id,)).fetchone()
|
| stats['total_jobs'] = result[0] if result else 0
|
|
|
|
|
| result = conn.execute("""
|
| SELECT status, COUNT(*) FROM transcriptions
|
| WHERE user_id = ?
|
| GROUP BY status
|
| """, (user_id,)).fetchall()
|
| stats['by_status'] = {row[0]: row[1] for row in result}
|
|
|
|
|
| week_ago = (datetime.now() - timedelta(days=7)).isoformat()
|
| result = conn.execute("""
|
| SELECT COUNT(*) FROM transcriptions
|
| WHERE user_id = ? AND created_at >= ?
|
| """, (user_id, week_ago)).fetchone()
|
| stats['recent_jobs'] = result[0] if result else 0
|
|
|
| return stats
|
|
|
| def export_user_data(self, user_id: str) -> Dict:
|
| """Export all user data for GDPR compliance"""
|
| try:
|
| with self.get_connection() as conn:
|
|
|
| user_row = conn.execute(
|
| "SELECT * FROM users WHERE user_id = ?", (user_id,)
|
| ).fetchone()
|
|
|
|
|
| transcription_rows = conn.execute(
|
| "SELECT * FROM transcriptions WHERE user_id = ?", (user_id,)
|
| ).fetchall()
|
|
|
| export_data = {
|
| "export_date": datetime.now().isoformat(),
|
| "user_info": dict(user_row) if user_row else {},
|
| "transcriptions": [dict(row) for row in transcription_rows],
|
| "statistics": self.get_user_stats(user_id)
|
| }
|
|
|
| return export_data
|
|
|
| except Exception as e:
|
| print(f"❌ Error exporting user data: {str(e)}")
|
| return {}
|
|
|
| def _row_to_job(self, row) -> TranscriptionJob:
|
| settings = json.loads(row['settings']) if row['settings'] else None
|
| return TranscriptionJob(
|
| job_id=row['job_id'],
|
| user_id=row['user_id'],
|
| original_filename=row['original_filename'],
|
| audio_url=row['audio_url'],
|
| language=row['language'],
|
| status=row['status'],
|
| created_at=row['created_at'],
|
| completed_at=row['completed_at'],
|
| transcript_text=row['transcript_text'],
|
| transcript_url=row['transcript_url'],
|
| error_message=row['error_message'],
|
| azure_trans_id=row['azure_trans_id'],
|
| settings=settings
|
| )
|
|
|
| class TranscriptionManager:
|
| def __init__(self):
|
| self.db = DatabaseManager()
|
| self.executor = ThreadPoolExecutor(max_workers=5)
|
| self.blob_service = BlobServiceClient.from_connection_string(AZURE_BLOB_CONNECTION)
|
| self._job_status_cache = {}
|
|
|
|
|
| self.running = True
|
| self.worker_thread = threading.Thread(target=self._background_worker, daemon=True)
|
| self.worker_thread.start()
|
|
|
| def _log_status_change(self, job_id: str, old_status: str, new_status: str, filename: str = "", user_id: str = ""):
|
| """Only log when status actually changes"""
|
| cache_key = f"{job_id}_{old_status}_{new_status}"
|
| if cache_key not in self._job_status_cache:
|
| self._job_status_cache[cache_key] = True
|
| user_display = f"[{user_id[:8]}...]" if user_id else ""
|
| if filename:
|
| print(f"🔄 {user_display} Job {job_id[:8]}... ({filename}): {old_status} → {new_status}")
|
| else:
|
| print(f"🔄 {user_display} Job {job_id[:8]}...: {old_status} → {new_status}")
|
|
|
| def _background_worker(self):
|
| """Background worker to process pending transcriptions - minimal logging (from v0.2.1)"""
|
| iteration_count = 0
|
| while self.running:
|
| try:
|
| pending_jobs = self.db.get_pending_jobs()
|
|
|
|
|
| if pending_jobs and iteration_count % 6 == 0:
|
| active_jobs = len([j for j in pending_jobs if j.status == 'processing'])
|
| queued_jobs = len([j for j in pending_jobs if j.status == 'pending'])
|
| if active_jobs > 0 or queued_jobs > 0:
|
| print(f"📊 Background worker: {active_jobs} processing, {queued_jobs} queued")
|
|
|
| for job in pending_jobs:
|
| if job.status == 'pending':
|
| self.executor.submit(self._process_transcription_job, job.job_id)
|
| elif job.status == 'processing' and job.azure_trans_id:
|
| self.executor.submit(self._check_transcription_status, job.job_id)
|
|
|
| time.sleep(10)
|
| iteration_count += 1
|
|
|
| except Exception as e:
|
| print(f"❌ Background worker error: {e}")
|
| time.sleep(30)
|
|
|
| def submit_transcription(
|
| self,
|
| file_bytes: bytes,
|
| original_filename: str,
|
| user_id: str,
|
| language: str,
|
| settings: Dict
|
| ) -> str:
|
| """Submit a new transcription job for authenticated user"""
|
| job_id = str(uuid.uuid4())
|
|
|
| print(f"🚀 [{user_id[:8]}...] New transcription: {original_filename} ({len(file_bytes):,} bytes)")
|
|
|
|
|
| job = TranscriptionJob(
|
| job_id=job_id,
|
| user_id=user_id,
|
| original_filename=original_filename,
|
| audio_url="",
|
| language=language,
|
| status="pending",
|
| created_at=datetime.now().isoformat(),
|
| settings=settings
|
| )
|
|
|
|
|
| self.db.save_job(job)
|
|
|
|
|
| self.executor.submit(self._prepare_audio_file, job_id, file_bytes, original_filename, settings)
|
|
|
| return job_id
|
|
|
| def _prepare_audio_file(self, job_id: str, file_bytes: bytes, original_filename: str, settings: Dict):
|
| """Prepare audio file and upload to blob storage - using v0.2.1 logic but with user awareness"""
|
| try:
|
| job = self.db.get_job(job_id)
|
| if not job:
|
| return
|
|
|
| user_id = job.user_id
|
|
|
|
|
| src_ext = original_filename.split('.')[-1].lower() if '.' in original_filename else "bin"
|
| upload_path = os.path.join(UPLOAD_DIR, f"{job_id}_original.{src_ext}")
|
|
|
| with open(upload_path, "wb") as f:
|
| f.write(file_bytes)
|
|
|
|
|
| audio_format = settings.get('audio_format', 'wav')
|
|
|
|
|
| if src_ext == audio_format and audio_format == 'wav':
|
|
|
| try:
|
| probe_cmd = [
|
| 'ffprobe', '-v', 'quiet', '-print_format', 'json',
|
| '-show_streams', upload_path
|
| ]
|
| result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=30)
|
|
|
| if result.returncode == 0:
|
| import json
|
| probe_data = json.loads(result.stdout)
|
| audio_stream = probe_data.get('streams', [{}])[0]
|
|
|
| sample_rate = int(audio_stream.get('sample_rate', 0))
|
| channels = int(audio_stream.get('channels', 0))
|
|
|
|
|
| if sample_rate == 16000 and channels == 1:
|
| out_path = upload_path
|
| else:
|
| print(f"🔄 [{user_id[:8]}...] Converting {original_filename} to 16kHz mono")
|
| out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
| self._convert_to_audio(upload_path, out_path, audio_format)
|
| else:
|
| out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
| self._convert_to_audio(upload_path, out_path, audio_format)
|
|
|
| except Exception as e:
|
| print(f"⚠️ [{user_id[:8]}...] Audio probing failed for {original_filename}: {e}")
|
| out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
| self._convert_to_audio(upload_path, out_path, audio_format)
|
| else:
|
|
|
| print(f"🔄 [{user_id[:8]}...] Converting {original_filename}: {src_ext} → {audio_format}")
|
| out_path = os.path.join(UPLOAD_DIR, f"{job_id}_converted.{audio_format}")
|
|
|
| try:
|
| self._convert_to_audio(upload_path, out_path, audio_format)
|
| except Exception as e:
|
| print(f"❌ [{user_id[:8]}...] Audio conversion failed for {original_filename}: {str(e)}")
|
| job.status = "failed"
|
| job.error_message = f"Audio conversion failed: {str(e)}"
|
| job.completed_at = datetime.now().isoformat()
|
| self.db.save_job(job)
|
|
|
|
|
| try:
|
| os.remove(upload_path)
|
| except:
|
| pass
|
| return
|
|
|
|
|
| try:
|
|
|
| final_audio_name = f"audio/{job_id}.{audio_format}"
|
| audio_url = self._upload_blob(out_path, final_audio_name)
|
|
|
|
|
| if out_path != upload_path:
|
| orig_blob_name = f"originals/{job_id}_{original_filename}"
|
| self._upload_blob(upload_path, orig_blob_name)
|
| else:
|
|
|
| orig_blob_name = f"originals/{job_id}_{original_filename}"
|
| self._upload_blob(upload_path, orig_blob_name)
|
|
|
| print(f"☁️ [{user_id[:8]}...] {original_filename} uploaded to blob storage")
|
|
|
|
|
| job.audio_url = audio_url
|
| job.status = "pending"
|
| self.db.save_job(job)
|
|
|
| except Exception as e:
|
| print(f"❌ [{user_id[:8]}...] Blob upload failed for {original_filename}: {str(e)}")
|
| job.status = "failed"
|
| job.error_message = f"Blob storage upload failed: {str(e)}"
|
| job.completed_at = datetime.now().isoformat()
|
| self.db.save_job(job)
|
|
|
|
|
| try:
|
| if os.path.exists(upload_path):
|
| os.remove(upload_path)
|
| if out_path != upload_path and os.path.exists(out_path):
|
| os.remove(out_path)
|
| except Exception as e:
|
| print(f"⚠️ [{user_id[:8]}...] Warning: Could not clean up local files for {original_filename}: {e}")
|
|
|
| except Exception as e:
|
| print(f"❌ File preparation error for {original_filename}: {e}")
|
| job = self.db.get_job(job_id)
|
| if job:
|
| job.status = "failed"
|
| job.error_message = f"File preparation failed: {str(e)}"
|
| job.completed_at = datetime.now().isoformat()
|
| self.db.save_job(job)
|
|
|
| def _process_transcription_job(self, job_id: str):
|
| """Process a transcription job - using v0.2.1 logic"""
|
| try:
|
| job = self.db.get_job(job_id)
|
| if not job or job.status != 'pending' or not job.audio_url:
|
| return
|
|
|
| old_status = job.status
|
|
|
| job.status = "processing"
|
| self.db.save_job(job)
|
|
|
| self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
|
|
|
|
| settings = job.settings or {}
|
| azure_trans_id = self._create_transcription(
|
| job.audio_url,
|
| job.language,
|
| settings.get('diarization_enabled', False),
|
| settings.get('speakers', 2),
|
| settings.get('profanity', 'masked'),
|
| settings.get('punctuation', 'automatic'),
|
| settings.get('timestamps', True),
|
| settings.get('lexical', False),
|
| settings.get('language_id_enabled', False),
|
| settings.get('candidate_locales', None)
|
| )
|
|
|
|
|
| job.azure_trans_id = azure_trans_id
|
| self.db.save_job(job)
|
|
|
| except Exception as e:
|
| print(f"❌ Transcription submission failed for job {job_id[:8]}...: {str(e)}")
|
| job = self.db.get_job(job_id)
|
| if job:
|
| old_status = job.status
|
| job.status = "failed"
|
| job.error_message = f"Transcription submission failed: {str(e)}"
|
| job.completed_at = datetime.now().isoformat()
|
| self.db.save_job(job)
|
| self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
|
|
| def _check_transcription_status(self, job_id: str):
|
| """Check status of Azure transcription - using v0.2.1 logic"""
|
| try:
|
| job = self.db.get_job(job_id)
|
| if not job or job.status != 'processing' or not job.azure_trans_id:
|
| return
|
|
|
|
|
| url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/{API_VERSION}/transcriptions/{job.azure_trans_id}"
|
| headers = {"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY}
|
|
|
| r = requests.get(url, headers=headers)
|
| data = r.json()
|
|
|
| if data.get("status") == "Succeeded":
|
|
|
| content_url = self._get_transcription_result_url(job.azure_trans_id)
|
| if content_url:
|
| transcript = self._fetch_transcript(content_url)
|
|
|
|
|
| transcript_blob_name = f"transcripts/{job_id}.txt"
|
| transcript_path = os.path.join(UPLOAD_DIR, f"{job_id}_transcript.txt")
|
|
|
| with open(transcript_path, "w", encoding="utf-8") as f:
|
| f.write(transcript)
|
|
|
| transcript_url = self._upload_blob(transcript_path, transcript_blob_name)
|
|
|
|
|
| old_status = job.status
|
| job.status = "completed"
|
| job.transcript_text = transcript
|
| job.transcript_url = transcript_url
|
| job.completed_at = datetime.now().isoformat()
|
| self.db.save_job(job)
|
|
|
| self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
| print(f"✅ [{job.user_id[:8]}...] Transcription completed: {job.original_filename}")
|
|
|
|
|
| try:
|
| os.remove(transcript_path)
|
| except:
|
| pass
|
|
|
| elif data.get("status") in ("Failed", "FailedWithPartialResults"):
|
| error_message = ""
|
| if "properties" in data and "error" in data["properties"]:
|
| error_message = data["properties"]["error"].get("message", "")
|
| elif "error" in data:
|
| error_message = data["error"].get("message", "")
|
|
|
| old_status = job.status
|
| job.status = "failed"
|
| job.error_message = f"Azure transcription failed: {error_message}"
|
| job.completed_at = datetime.now().isoformat()
|
| self.db.save_job(job)
|
|
|
| self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
| print(f"❌ [{job.user_id[:8]}...] Transcription failed: {job.original_filename} - {error_message}")
|
|
|
| except Exception as e:
|
| print(f"❌ Status check failed for job {job_id[:8]}...: {str(e)}")
|
| job = self.db.get_job(job_id)
|
| if job:
|
| old_status = job.status
|
| job.status = "failed"
|
| job.error_message = f"Status check failed: {str(e)}"
|
| job.completed_at = datetime.now().isoformat()
|
| self.db.save_job(job)
|
| self._log_status_change(job_id, old_status, job.status, job.original_filename, job.user_id)
|
|
|
| def get_job_status(self, job_id: str) -> Optional[TranscriptionJob]:
|
| """Get current job status"""
|
| return self.db.get_job(job_id)
|
|
|
| def get_user_history(self, user_id: str, limit: int = 50) -> List[TranscriptionJob]:
|
| """Get user's transcription history - PDPA compliant"""
|
| return self.db.get_user_jobs(user_id, limit)
|
|
|
| def get_all_history(self, limit: int = 100) -> List[TranscriptionJob]:
|
| """Get all transcription history across all users (admin view)"""
|
| return self.db.get_all_jobs(limit)
|
|
|
| def get_user_stats(self, user_id: str) -> Dict:
|
| """Get user statistics"""
|
| return self.db.get_user_stats(user_id)
|
|
|
| def download_transcript(self, job_id: str, user_id: str) -> Optional[str]:
|
| """Download transcript content - with user verification for PDPA compliance"""
|
| job = self.db.get_job(job_id)
|
| if job and job.user_id == user_id and job.transcript_text:
|
| return job.transcript_text
|
| return None
|
|
|
|
|
| def register_user(self, email: str, username: str, password: str, gdpr_consent: bool = True,
|
| data_retention_agreed: bool = True, marketing_consent: bool = False) -> Tuple[bool, str, Optional[str]]:
|
| """Register new user"""
|
| return self.db.create_user(email, username, password, gdpr_consent, data_retention_agreed, marketing_consent)
|
|
|
| def login_user(self, login: str, password: str) -> Tuple[bool, str, Optional[User]]:
|
| """Login user"""
|
| return self.db.authenticate_user(login, password)
|
|
|
| def get_user(self, user_id: str) -> Optional[User]:
|
| """Get user by ID"""
|
| return self.db.get_user_by_id(user_id)
|
|
|
| def update_user_consent(self, user_id: str, marketing_consent: bool) -> bool:
|
| """Update user marketing consent"""
|
| return self.db.update_user_consent(user_id, marketing_consent)
|
|
|
| def export_user_data(self, user_id: str) -> Dict:
|
| """Export all user data for GDPR compliance"""
|
| return self.db.export_user_data(user_id)
|
|
|
| def delete_user_account(self, user_id: str) -> bool:
|
| """Delete user account and all data"""
|
| return self.db.delete_user_account(user_id)
|
|
|
|
|
| def _convert_to_audio(self, input_path, output_path, audio_format="wav"):
|
| """Convert audio/video file to specified audio format - from v0.2.1"""
|
|
|
| os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
|
|
|
| if audio_format in {"wav", "alaw", "mulaw"}:
|
| cmd = [
|
| "ffmpeg", "-y", "-i", input_path,
|
| "-ar", "16000", "-ac", "1",
|
| output_path
|
| ]
|
| else:
|
| cmd = [
|
| "ffmpeg", "-y", "-i", input_path,
|
| output_path
|
| ]
|
|
|
| try:
|
| result = subprocess.run(
|
| cmd,
|
| stdout=subprocess.PIPE,
|
| stderr=subprocess.PIPE,
|
| timeout=300,
|
| text=True
|
| )
|
|
|
| if result.returncode != 0:
|
| error_output = result.stderr
|
| raise Exception(f"FFmpeg conversion failed: {error_output}")
|
|
|
|
|
| if not os.path.exists(output_path):
|
| raise Exception(f"Output file was not created: {output_path}")
|
|
|
| file_size = os.path.getsize(output_path)
|
| if file_size == 0:
|
| raise Exception(f"Output file is empty: {output_path}")
|
|
|
| except subprocess.TimeoutExpired:
|
| raise Exception(f"FFmpeg conversion timed out after 5 minutes")
|
| except Exception as e:
|
| if "FFmpeg conversion failed" in str(e):
|
| raise
|
| else:
|
| raise Exception(f"FFmpeg error: {str(e)}")
|
|
|
| def _upload_blob(self, local_file, blob_name):
|
| blob_client = self.blob_service.get_blob_client(container=AZURE_CONTAINER, blob=blob_name)
|
| with open(local_file, "rb") as data:
|
| blob_client.upload_blob(data, overwrite=True)
|
| sas = AZURE_BLOB_SAS_TOKEN.lstrip("?")
|
| return f"{blob_client.url}?{sas}"
|
|
|
| def _create_transcription(self, audio_url, language, diarization_enabled, speakers,
|
| profanity, punctuation, timestamps, lexical,
|
| language_id_enabled=False, candidate_locales=None):
|
| url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/{API_VERSION}/transcriptions"
|
| headers = {
|
| "Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY,
|
| "Content-Type": "application/json"
|
| }
|
|
|
| properties = {
|
| "profanityFilterMode": profanity,
|
| "punctuationMode": punctuation,
|
| "wordLevelTimestampsEnabled": timestamps,
|
| "displayFormWordLevelTimestampsEnabled": timestamps,
|
| "lexical": lexical
|
| }
|
| if diarization_enabled:
|
| properties["diarizationEnabled"] = True
|
| properties["diarization"] = {
|
| "speakers": {
|
| "minCount": 1,
|
| "maxCount": int(speakers)
|
| }
|
| }
|
| if language_id_enabled and candidate_locales:
|
| properties["languageIdentification"] = {
|
| "mode": "continuous",
|
| "candidateLocales": candidate_locales
|
| }
|
|
|
| properties = {k: v for k, v in properties.items() if v is not None}
|
| body = {
|
| "displayName": f"Transcription_{uuid.uuid4()}",
|
| "description": "Batch speech-to-text with advanced options",
|
| "locale": language,
|
| "contentUrls": [audio_url],
|
| "properties": properties,
|
| "customProperties": {}
|
| }
|
| r = requests.post(url, headers=headers, json=body)
|
| r.raise_for_status()
|
| trans_id = r.headers["Location"].split("/")[-1].split("?")[0]
|
| return trans_id
|
|
|
| def _get_transcription_result_url(self, trans_id):
|
| url = f"{AZURE_SPEECH_KEY_ENDPOINT}/speechtotext/{API_VERSION}/transcriptions/{trans_id}"
|
| headers = {"Ocp-Apim-Subscription-Key": AZURE_SPEECH_KEY}
|
|
|
| r = requests.get(url, headers=headers)
|
| data = r.json()
|
|
|
| if data.get("status") == "Succeeded":
|
| files_url = None
|
| if "links" in data and "files" in data["links"]:
|
| files_url = data["links"]["files"]
|
| if files_url:
|
| r2 = requests.get(files_url, headers=headers)
|
| file_list = r2.json().get("values", [])
|
| for f in file_list:
|
| if f.get("kind", "").lower() == "transcription":
|
| return f["links"]["contentUrl"]
|
| return None
|
|
|
| def _fetch_transcript(self, content_url):
|
| """Enhanced transcript fetching with improved timestamp handling - from v0.2.1"""
|
| r = requests.get(content_url)
|
| try:
|
| j = r.json()
|
| out = []
|
|
|
| def get_text(phrase):
|
| if 'nBest' in phrase and phrase['nBest']:
|
| return phrase['nBest'][0].get('display', '') or phrase.get('display', '')
|
| return phrase.get('display', '')
|
|
|
| def safe_offset(val):
|
| try:
|
| return int(val)
|
| except (ValueError, TypeError):
|
| return None
|
|
|
| def format_time(seconds):
|
| """Format seconds into HH:MM:SS format"""
|
| try:
|
| td = timedelta(seconds=int(seconds))
|
| hours, remainder = divmod(td.total_seconds(), 3600)
|
| minutes, seconds = divmod(remainder, 60)
|
| return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
|
| except:
|
| return "00:00:00"
|
|
|
|
|
| if 'recognizedPhrases' in j:
|
| for phrase in j['recognizedPhrases']:
|
| speaker_id = phrase.get('speaker', 0)
|
| text = get_text(phrase)
|
|
|
| if not text.strip():
|
| continue
|
|
|
|
|
| timestamp_seconds = None
|
|
|
|
|
| if 'offset' in phrase and phrase['offset'] is not None:
|
| offset_100ns = safe_offset(phrase['offset'])
|
| if offset_100ns is not None:
|
| timestamp_seconds = offset_100ns / 10_000_000
|
|
|
|
|
| if timestamp_seconds is None and 'words' in phrase and phrase['words']:
|
| first_word = phrase['words'][0]
|
| if 'offset' in first_word and first_word['offset'] is not None:
|
| offset_100ns = safe_offset(first_word['offset'])
|
| if offset_100ns is not None:
|
| timestamp_seconds = offset_100ns / 10_000_000
|
|
|
|
|
| if timestamp_seconds is None and 'offsetInTicks' in phrase:
|
| offset_ticks = safe_offset(phrase['offsetInTicks'])
|
| if offset_ticks is not None:
|
| timestamp_seconds = offset_ticks / 10_000_000
|
|
|
|
|
| if timestamp_seconds is not None:
|
| time_str = format_time(timestamp_seconds)
|
| if 'speaker' in phrase:
|
|
|
| out.append(f"[{time_str}] Speaker {speaker_id}: {text}")
|
| else:
|
|
|
| out.append(f"[{time_str}] {text}")
|
| else:
|
|
|
| if 'speaker' in phrase:
|
| out.append(f"Speaker {speaker_id}: {text}")
|
| else:
|
| out.append(text)
|
|
|
| if out:
|
| return '\n\n'.join(out)
|
|
|
|
|
| if 'combinedRecognizedPhrases' in j:
|
| combined_results = []
|
| for combined_phrase in j['combinedRecognizedPhrases']:
|
| text = combined_phrase.get('display', '')
|
| if text.strip():
|
| combined_results.append(text)
|
|
|
| if combined_results:
|
| return '\n\n'.join(combined_results)
|
|
|
|
|
| return json.dumps(j, ensure_ascii=False, indent=2)
|
|
|
| except Exception as e:
|
| return f"Unable to parse transcription result: {str(e)}\n\nRaw response: {r.text[:1000]}..."
|
|
|
|
|
| transcription_manager = TranscriptionManager()
|
|
|
|
|
| def allowed_file(filename):
|
| """Check if file extension is supported"""
|
| if not filename or filename in ["upload.unknown", ""]:
|
| return True
|
|
|
| if '.' not in filename:
|
| return True
|
|
|
| ext = filename.rsplit('.', 1)[1].lower()
|
| supported_extensions = set(AUDIO_FORMATS) | {
|
| 'mp4', 'mov', 'avi', 'mkv', 'webm', 'm4a', '3gp', 'f4v',
|
| 'wmv', 'asf', 'rm', 'rmvb', 'flv', 'mpg', 'mpeg', 'mts', 'vob'
|
| }
|
|
|
| return ext in supported_extensions
|
|
|
| def generate_user_session():
|
| """Generate a unique user session ID - kept for compatibility"""
|
| return str(uuid.uuid4()) |