Pour améliorer les performances :
# Optimisation modèles plus légers
pip install --upgrade torch torchaudio --index-url https://download.pytorch.org/whl/cpu
# Alternative plus rapide pour Whisper
pip install faster-whisper
# Configuration GPU si disponible
echo 'gpu_mem_split=128' | sudo tee -a /boot/config.txtConfiguration optimisée pour Whisper (/home/pi/whisper_optimized.py) :
# Alternative plus rapide avec faster-whisper
from faster_whisper import WhisperModel
# Modèle optimisé pour Pi 5
model = WhisperModel("base", device="cpu", compute_type="int8")
segments, info = model.transcribe(audio_path, language="fr", beam_size=1)Réunions longues (>1h) :
Environnements bruyants :
Interlocuteurs multiples (>4 personnes) :
Configuration sécurisée :
# Chiffrement du SSD
sudo cryptsetup luksFormat /dev/sda1
sudo cryptsetup luksOpen /dev/sda1 secure_storage
# Firewall restrictif
sudo ufw allow from 192.168.1.0/24 to any port 5000
sudo ufw allow from 192.168.1.0/24 to any port 8080
sudo ufw --force enableExport vers des outils de productivité :
# Ajout au script serveur
@app.route('/export/<format>/<timestamp>')
def export_transcription(format, timestamp):
if format == 'notion':
return export_to_notion(timestamp)
elif format == 'trello':
return create_trello_cards(timestamp)
elif format == 'slack':
return send_to_slack(timestamp)Synchronisation cloud sécurisée :
# Chiffrement avant upload
from cryptography.fernet import Fernet
def secure_cloud_sync(transcription_data):
key = Fernet.generate_key()
f = Fernet(key)
encrypted_data = f.encrypt(json.dumps(transcription_data).encode())
upload_to_cloud(encrypted_data)Fine-tuning pour votre domaine :
# Adaptation du modèle de résumé
def train_domain_model():
# Collecte de vos transcriptions historiques
training_data = load_company_transcriptions()
# Fine-tuning du modèle Gemma pour votre vocabulaire
fine_tune_model(training_data, domain="tech_meetings")Détection d'émotions et sentiment :
# Ajout d'analyse sentiment
pip install vaderSentiment transformers
def analyze_sentiment(text, speaker):
sentiment = sentiment_analyzer.analyze(text)
emotions = emotion_detector.predict(text)
return {
'speaker': speaker,
'sentiment': sentiment,
'emotions': emotions,
'engagement_level': calculate_engagement(text)
}Vérification santé système (/home/pi/health_check.py) :
#!/usr/bin/env python3
import psutil, requests, os
from datetime import datetime
def system_health():
return {
'cpu_usage': psutil.cpu_percent(),
'memory_usage': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/media/ssd').percent,
'temperature': get_cpu_temperature(),
'services_status': check_services_status()
}
def send_health_report():
health = system_health()
if health['cpu_usage'] > 80 or health['memory_usage'] > 85:
send_alert(f"Système en surcharge: CPU {health['cpu_usage']}%, RAM {health['memory_usage']}%")Backup automatique (/home/pi/backup_system.py) :
#!/usr/bin/env python3
import shutil, os
from datetime import datetime, timedelta
def daily_backup():
backup_dir = f"/media/backup/backup_{datetime.now().strftime('%Y%m%d')}"
os.makedirs(backup_dir, exist_ok=True)
# Sauvegarde transcriptions
shutil.copytree('/media/ssd/transcriptions', f"{backup_dir}/transcriptions")
# Sauvegarde base interlocuteurs
shutil.copy('/media/ssd/speakers_db.pkl', f"{backup_dir}/speakers_db.pkl")
# Purge anciens backups (>30 jours)
cleanup_old_backups(30)Configuration logging avancée :
import logging
from logging.handlers import RotatingFileHandler
# Logger avec rotation
handler = RotatingFileHandler('/var/log/meeting_assistant.log', maxBytes=10485760, backupCount=5)
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)| Fonctionnalité | Notre solution | Plaud | Otter.ai | Fireflies |
|---|---|---|---|---|
| Prix | 250€ one-shot | 159€ + abonnement | 20€/mois | 18€/mois |
| Confidentialité | 100% local | Cloud | Cloud | Cloud |
| Reconnaissance interlocuteurs | ✅ Personnalisable | ✅ Basic | ✅ | ✅ |
| Résumés IA | ✅ Local | ✅ Cloud | ✅ | ✅ |
| Autonomie | ✅ Batterie 8h | ✅ 30h | ❌ Smartphone | ❌ Smartphone |
| Personnalisation | ✅ Code source | ❌ | ❌ | ❌ |
| Coût 3 ans | 250€ | 800€+ | 720€ | 650€ |
Cette solution représente un véritable assistant de réunion professionnel équivalent aux solutions commerciales mais avec un contrôle total de vos données, une personnalisation infinie et des coûts réduits. La reconnaissance d'interlocuteurs et les résumés automatiques transforment complètement l'expérience utilisateur ! 🚀# Projet Alternative Plaud - Enregistreur Audio Autonome
Architecture :
Écran :
Microphone :
Boutons et contrôles :
Alimentation :
Boîtier :
Stockage :
Refroidissement :
Alimentation :
# Mise à jour du système
sudo apt update && sudo apt upgrade -y
# Installation des dépendances
sudo apt install -y git python3-pip python3-numpy python3-scipy
# Configuration du ReSpeaker 2-Mics HAT
git clone https://github.com/respeaker/seeed-voicecard.git
cd seeed-voicecard
sudo ./install.sh
sudo rebootAprès redémarrage, vérifiez l'installation :
# Vérifier que la carte audio est détectée
arecord -l
# Vous devriez voir : card 1: seeed2micvoicec [seeed-2mic-voicecard]
# Test d'enregistrement
arecord -D plughw:1,0 -f cd -t wav -d 5 test.wav
aplay test.wav# Installation des bibliothèques SPI
sudo apt install -y python3-spidev python3-rpi.gpio python3-pil
# Activation SPI dans raspi-config
sudo raspi-config
# Interface Options -> SPI -> Enable
# Installation de la bibliothèque Waveshare
pip3 install waveshare-epd
# Téléchargement des exemples
git clone https://github.com/waveshare/e-Paper.gitAjoutez au fichier /boot/config.txt :
# Configuration des boutons
gpio=2=ip,pu # Bouton Record/Stop
gpio=3=ip,pu # Bouton Menu
gpio=4=ip,pu # Bouton Confirmation
gpio=17=ip,pu # Encodeur rotatif A
gpio=27=ip,pu # Encodeur rotatif B
gpio=22=ip,pu # Bouton encodeurCréez le fichier /home/pi/recorder.py :
#!/usr/bin/env python3
import pyaudio
import wave
import os
import time
import json
import requests
from datetime import datetime
import RPi.GPIO as GPIO
from threading import Thread, Event
import queue
# Configuration GPIO
BUTTON_RECORD = 2
BUTTON_MENU = 3
BUTTON_CONFIRM = 4
ENCODER_A = 17
ENCODER_B = 27
ENCODER_BTN = 22
# Configuration audio
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
class AudioRecorder:
def __init__(self):
self.recording = False
self.audio = pyaudio.PyAudio()
self.frames = []
self.record_event = Event()
# Configuration GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setup([BUTTON_RECORD, BUTTON_MENU, BUTTON_CONFIRM,
ENCODER_A, ENCODER_B, ENCODER_BTN], GPIO.IN, pull_up_down=GPIO.PUD_UP)
# Callbacks boutons
GPIO.add_event_detect(BUTTON_RECORD, GPIO.FALLING,
callback=self.toggle_recording, bouncetime=300)
def toggle_recording(self, channel):
if not self.recording:
self.start_recording()
else:
self.stop_recording()
def start_recording(self):
self.recording = True
self.frames = []
print("Début enregistrement...")
# Thread d'enregistrement
record_thread = Thread(target=self._record_audio)
record_thread.start()
def _record_audio(self):
stream = self.audio.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
input_device_index=1, # ReSpeaker
frames_per_buffer=CHUNK)
while self.recording:
data = stream.read(CHUNK)
self.frames.append(data)
stream.stop_stream()
stream.close()
def stop_recording(self):
if self.recording:
self.recording = False
print("Fin enregistrement...")
# Sauvegarde du fichier
filename = f"recording_{datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
filepath = f"/home/pi/recordings/{filename}"
wf = wave.open(filepath, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(self.frames))
wf.close()
# Envoi vers le serveur de traitement
self.send_to_server(filepath)
def send_to_server(self, filepath):
try:
with open(filepath, 'rb') as f:
files = {'audio': f}
response = requests.post('http://192.168.1.100:5000/transcribe',
files=files, timeout=30)
if response.status_code == 200:
print("Transcription envoyée avec succès")
# Supprimer le fichier local après envoi
os.remove(filepath)
else:
print(f"Erreur serveur: {response.status_code}")
except Exception as e:
print(f"Erreur envoi: {e}")
if __name__ == "__main__":
# Créer le dossier d'enregistrements
os.makedirs("/home/pi/recordings", exist_ok=True)
recorder = AudioRecorder()
print("Enregistreur prêt. Appuyez sur le bouton pour démarrer/arrêter.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
GPIO.cleanup()Créez /etc/systemd/system/recorder.service :
[Unit]
Description=Audio Recorder Service
After=network.target
[Service]
ExecStart=/usr/bin/python3 /home/pi/recorder.py
Restart=always
User=pi
Group=pi
[Install]
WantedBy=multi-user.targetActivez le service :
sudo systemctl enable recorder.service
sudo systemctl start recorder.service# Mise à jour du système
sudo apt update && sudo apt upgrade -y
# Si vous utilisez un HAT NVMe, configurez le PCIe
echo 'dtparam=pciex1' | sudo tee -a /boot/config.txt
# Redémarrez et vérifiez la détection du SSD
sudo reboot
lsblk# Installation des dépendances système
sudo apt install -y python3-pip python3-venv git ffmpeg build-essential
# Création d'un environnement virtuel
python3 -m venv /home/pi/whisper-env
source /home/pi/whisper-env/bin/activate
# Installation de Whisper et dépendances de base
pip install openai-whisper flask requests numpy torch torchaudio
# Installation des bibliothèques de diarisation
pip install pyannote.audio speechbrain simple-diarizer
# Installation d'autres outils utiles
pip install pydub soundfile librosa
# Installation d'Ollama pour les LLM locaux (optionnel)
curl -fsSL https://ollama.ai/install.sh | sh
# Installation d'un modèle LLM léger pour résumés
ollama pull gemma2:2b# Création d'un token Hugging Face (requis pour pyannote)
# 1. Créez un compte sur https://huggingface.co/
# 2. Allez sur https://huggingface.co/settings/tokens
# 3. Créez un token d'accès
# 4. Acceptez les conditions sur :
# - https://huggingface.co/pyannote/speaker-diarization-3.1
# - https://huggingface.co/pyannote/segmentation-3.0
# Testez l'installation
python3 -c "from pyannote.audio import Pipeline; print('pyannote installé avec succès')"Créez /home/pi/transcription_server.py :
#!/usr/bin/env python3
from flask import Flask, request, jsonify
import whisper
import os
import tempfile
import logging
from datetime import datetime
import json
import torch
import numpy as np
from pyannote.audio import Pipeline, Inference, Model
from scipy.spatial.distance import cosine
import requests
import pickle
from collections import defaultdict
# Configuration logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Chargement des modèles
print("Chargement du modèle Whisper...")
whisper_model = whisper.load_model("base")
print("Modèle Whisper chargé avec succès")
print("Chargement du modèle de diarisation...")
try:
diarization_pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token="YOUR_HF_TOKEN"
)
if torch.cuda.is_available():
diarization_pipeline.to(torch.device("cuda"))
print("Modèle de diarisation chargé avec succès")
except Exception as e:
logger.error(f"Erreur chargement diarisation: {e}")
diarization_pipeline = None
print("Chargement du modèle d'embedding pour reconnaissance vocale...")
try:
embedding_model = Model.from_pretrained(
"pyannote/embedding",
use_auth_token="YOUR_HF_TOKEN"
)
embedding_inference = Inference(embedding_model, window="whole")
if torch.cuda.is_available():
embedding_inference.to(torch.device("cuda"))
print("Modèle d'embedding chargé avec succès")
except Exception as e:
logger.error(f"Erreur chargement embedding: {e}")
embedding_model = None
embedding_inference = None
# Base de données des voix connues
SPEAKERS_DB_PATH = "/media/ssd/speakers_db.pkl"
known_speakers = {}
def load_speakers_database():
"""Charge la base de données des interlocuteurs connus"""
global known_speakers
if os.path.exists(SPEAKERS_DB_PATH):
with open(SPEAKERS_DB_PATH, 'rb') as f:
known_speakers = pickle.load(f)
logger.info(f"Base de données chargée: {len(known_speakers)} interlocuteurs connus")
else:
known_speakers = {}
logger.info("Nouvelle base de données créée")
def save_speakers_database():
"""Sauvegarde la base de données des interlocuteurs"""
with open(SPEAKERS_DB_PATH, 'wb') as f:
pickle.dump(known_speakers, f)
logger.info("Base de données sauvegardée")
def extract_speaker_embedding(audio_path, start_time=None, end_time=None):
"""Extrait l'embedding vocal d'un segment audio"""
try:
if embedding_inference is None:
return None
if start_time and end_time:
from pyannote.core import Segment
segment = Segment(start_time, end_time)
embedding = embedding_inference.crop(audio_path, segment)
else:
embedding = embedding_inference(audio_path)
return embedding.flatten()
except Exception as e:
logger.error(f"Erreur extraction embedding: {e}")
return None
def identify_speaker(embedding):
"""Identifie un interlocuteur à partir de son embedding"""
if embedding is None or not known_speakers:
return None, 1.0
best_match = None
best_similarity = 0.0
threshold = 0.7 # Seuil de similarité
for speaker_name, speaker_embeddings in known_speakers.items():
for ref_embedding in speaker_embeddings:
similarity = 1 - cosine(embedding, ref_embedding)
if similarity > best_similarity and similarity > threshold:
best_similarity = similarity
best_match = speaker_name
return best_match, best_similarity
def register_speaker(name, audio_path, start_time=None, end_time=None):
"""Enregistre un nouvel interlocuteur ou ajoute un échantillon"""
embedding = extract_speaker_embedding(audio_path, start_time, end_time)
if embedding is None:
return False
if name not in known_speakers:
known_speakers[name] = []
known_speakers[name].append(embedding)
save_speakers_database()
logger.info(f"Interlocuteur '{name}' enregistré/mis à jour")
return True
def perform_speaker_diarization(audio_file_path):
"""Effectue la diarisation des interlocuteurs"""
try:
if diarization_pipeline is None:
return None
diarization = diarization_pipeline(audio_file_path)
speakers_timeline = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
speakers_timeline.append({
"start": round(turn.start, 2),
"end": round(turn.end, 2),
"speaker": speaker,
"duration": round(turn.end - turn.start, 2)
})
return speakers_timeline
except Exception as e:
logger.error(f"Erreur diarisation: {e}")
return None
def enhance_speakers_with_recognition(audio_path, speakers_timeline):
"""Améliore la diarisation avec la reconnaissance d'interlocuteurs connus"""
if not speakers_timeline or embedding_inference is None:
return speakers_timeline
enhanced_timeline = []
for segment in speakers_timeline:
# Extraire l'embedding pour ce segment
embedding = extract_speaker_embedding(
audio_path,
segment['start'],
segment['end']
)
# Identifier l'interlocuteur
identified_name, confidence = identify_speaker(embedding)
enhanced_segment = segment.copy()
if identified_name:
enhanced_segment['identified_speaker'] = identified_name
enhanced_segment['confidence'] = round(confidence, 3)
enhanced_segment['display_name'] = identified_name
else:
enhanced_segment['identified_speaker'] = None
enhanced_segment['confidence'] = 0.0
enhanced_segment['display_name'] = f"Interlocuteur_{segment['speaker']}"
enhanced_timeline.append(enhanced_segment)
return enhanced_timeline
def assign_text_to_speakers(transcription_segments, speakers_timeline):
"""Assigne le texte transcrit aux bons interlocuteurs"""
if not speakers_timeline:
return transcription_segments
enhanced_segments = []
for segment in transcription_segments:
segment_start = segment['start']
segment_end = segment['end']
segment_text = segment['text']
# Trouver l'interlocuteur correspondant
best_speaker = "inconnu"
best_display_name = "Inconnu"
best_overlap = 0
for speaker_info in speakers_timeline:
if (segment_start < speaker_info['end'] and
segment_end > speaker_info['start']):
overlap_start = max(segment_start, speaker_info['start'])
overlap_end = min(segment_end, speaker_info['end'])
overlap_duration = overlap_end - overlap_start
if overlap_duration > best_overlap:
best_overlap = overlap_duration
best_speaker = speaker_info['speaker']
best_display_name = speaker_info.get('display_name', f"Interlocuteur_{best_speaker}")
enhanced_segments.append({
'start': segment_start,
'end': segment_end,
'text': segment_text,
'speaker': best_speaker,
'display_name': best_display_name
})
return enhanced_segments
def generate_llm_summary(transcript_data, speakers_summary):
"""Génère un résumé automatique avec un LLM local"""
try:
# Préparation du prompt pour Ollama
full_text = transcript_data.get('full_text', '')
prompt = f"""Analysez cette transcription de réunion et générez un résumé structuré :
TRANSCRIPTION:
{full_text}
INTERLOCUTEURS:
{json.dumps(speakers_summary, indent=2, ensure_ascii=False)}
Générez un résumé JSON avec cette structure exacte :
{{
"resume_general": "Résumé de 2-3 phrases de la réunion",
"points_cles": ["Point clé 1", "Point clé 2", "Point clé 3"],
"actions_decidees": [
{{"action": "Description", "responsable": "Nom", "echeance": "Date estimée"}}
],
"decisions_prises": ["Décision 1", "Décision 2"],
"sujets_abordes": ["Sujet 1", "Sujet 2"],
"resume_par_personne": {{
"Nom_Personne": {{"contribution": "Résumé de sa contribution", "temps_parole": "X%"}}
}}
}}
Répondez UNIQUEMENT avec le JSON, sans autre texte."""
# Appel à Ollama
response = requests.post(
'http://localhost:11434/api/generate',
json={
'model': 'gemma2:2b',
'prompt': prompt,
'stream': False,
'options': {
'temperature': 0.3,
'num_ctx': 8192
}
},
timeout=60
)
if response.status_code == 200:
result = response.json()
summary_text = result.get('response', '{}')
# Nettoyer et parser le JSON
summary_text = summary_text.strip()
if summary_text.startswith('```json'):
summary_text = summary_text[7:]
if summary_text.endswith('```'):
summary_text = summary_text[:-3]
try:
return json.loads(summary_text)
except json.JSONDecodeError:
logger.error("Erreur parsing JSON du résumé")
return generate_fallback_summary(speakers_summary)
else:
logger.error(f"Erreur Ollama: {response.status_code}")
return generate_fallback_summary(speakers_summary)
except Exception as e:
logger.error(f"Erreur génération résumé LLM: {e}")
return generate_fallback_summary(speakers_summary)
def generate_fallback_summary(speakers_summary):
"""Génère un résumé basique en cas d'échec du LLM"""
return {
"resume_general": "Réunion transcrite automatiquement avec identification des interlocuteurs",
"points_cles": ["Transcription automatique réalisée", "Diarisation des interlocuteurs effectuée"],
"actions_decidees": [],
"decisions_prises": [],
"sujets_abordes": ["Discussion générale"],
"resume_par_personne": {
speaker: {
"contribution": f"A parlé pendant {data['percentage']}% du temps",
"temps_parole": f"{data['percentage']}%"
}
for speaker, data in speakers_summary.items()
}
}
@app.route('/transcribe', methods=['POST'])
def transcribe_audio():
try:
# Vérification du fichier audio
if 'audio' not in request.files:
return jsonify({'error': 'Pas de fichier audio'}), 400
audio_file = request.files['audio']
if audio_file.filename == '':
return jsonify({'error': 'Nom de fichier vide'}), 400
# Sauvegarde temporaire
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
audio_file.save(tmp_file.name)
# ÉTAPE 1: Diarisation des interlocuteurs
logger.info("Début diarisation...")
speakers_timeline = perform_speaker_diarization(tmp_file.name)
# ÉTAPE 2: Reconnaissance d'interlocuteurs connus
if speakers_timeline:
logger.info("Reconnaissance des interlocuteurs...")
speakers_timeline = enhance_speakers_with_recognition(tmp_file.name, speakers_timeline)
logger.info(f"Diarisation terminée: {len(set([s['speaker'] for s in speakers_timeline]))} interlocuteurs détectés")
else:
logger.warning("Diarisation échouée")
# ÉTAPE 3: Transcription avec Whisper
logger.info("Début transcription...")
result = whisper_model.transcribe(tmp_file.name, language='fr')
# ÉTAPE 4: Association texte <-> interlocuteurs
enhanced_segments = assign_text_to_speakers(result['segments'], speakers_timeline)
# ÉTAPE 5: Génération des statistiques
speakers_summary = {}
if speakers_timeline:
for segment in enhanced_segments:
display_name = segment.get('display_name', 'Inconnu')
if display_name not in speakers_summary:
speakers_summary[display_name] = {
'total_time': 0,
'segments_count': 0,
'text': []
}
speakers_summary[display_name]['total_time'] += segment['end'] - segment['start']
speakers_summary[display_name]['segments_count'] += 1
speakers_summary[display_name]['text'].append(segment['text'].strip())
# Calcul des pourcentages
total_duration = max([s['end'] for s in speakers_timeline]) if speakers_timeline else 1
for speaker in speakers_summary:
speakers_summary[speaker]['percentage'] = round(
(speakers_summary[speaker]['total_time'] / total_duration) * 100, 1
)
speakers_summary[speaker]['full_text'] = ' '.join(speakers_summary[speaker]['text'])
# ÉTAPE 6: Génération du résumé automatique
logger.info("Génération du résumé automatique...")
ai_summary = generate_llm_summary(result, speakers_summary)
# Nettoyage du fichier temporaire
os.unlink(tmp_file.name)
# Sauvegarde des résultats
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
transcription_data = {
'timestamp': timestamp,
'full_text': result['text'],
'language': result['language'],
'segments': enhanced_segments,
'speakers_timeline': speakers_timeline,
'speakers_summary': speakers_summary,
'total_speakers': len(speakers_summary) if speakers_summary else 0,
'ai_summary': ai_summary
}
# Sauvegarde sur SSD
os.makedirs('/media/ssd/transcriptions', exist_ok=True)
with open(f'/media/ssd/transcriptions/transcription_{timestamp}.json', 'w') as f:
json.dump(transcription_data, f, indent=2, ensure_ascii=False)
logger.info("Transcription avec diarisation et résumé terminée")
return jsonify({
'success': True,
'text': result['text'],
'timestamp': timestamp,
'speakers_count': len(speakers_summary) if speakers_summary else 0,
'speakers_summary': speakers_summary,
'ai_summary': ai_summary
})
except Exception as e:
logger.error(f"Erreur transcription: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/register_speaker', methods=['POST'])
def register_speaker_endpoint():
"""Enregistre un nouvel interlocuteur"""
try:
data = request.json
name = data.get('name')
audio_path = data.get('audio_path')
start_time = data.get('start_time')
end_time = data.get('end_time')
if not name or not audio_path:
return jsonify({'error': 'Nom et chemin audio requis'}), 400
success = register_speaker(name, audio_path, start_time, end_time)
if success:
return jsonify({'success': True, 'message': f'Interlocuteur {name} enregistré'})
else:
return jsonify({'error': 'Erreur lors de l\'enregistrement'}), 500
except Exception as e:
logger.error(f"Erreur enregistrement interlocuteur: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/speakers', methods=['GET'])
def get_speakers():
"""Liste les interlocuteurs connus"""
return jsonify({
'speakers': list(known_speakers.keys()),
'total': len(known_speakers)
})
@app.route('/transcriptions', methods=['GET'])
def get_transcriptions():
"""Récupère la liste des transcriptions"""
try:
transcriptions = []
transcription_dir = '/media/ssd/transcriptions'
if os.path.exists(transcription_dir):
for filename in sorted(os.listdir(transcription_dir), reverse=True):
if filename.endswith('.json'):
with open(os.path.join(transcription_dir, filename), 'r') as f:
data = json.load(f)
transcriptions.append({
'timestamp': data['timestamp'],
'preview': data['full_text'][:200] + '...' if len(data['full_text']) > 200 else data['full_text'],
'speakers_count': data.get('total_speakers', 0),
'language': data['language'],
'has_summary': 'ai_summary' in data
})
return jsonify(transcriptions)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/transcription/<timestamp>', methods=['GET'])
def get_transcription_detail(timestamp):
"""Récupère le détail d'une transcription"""
try:
filepath = f'/media/ssd/transcriptions/transcription_{timestamp}.json'
if os.path.exists(filepath):
with open(filepath, 'r') as f:
return jsonify(json.load(f))
else:
return jsonify({'error': 'Transcription non trouvée'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Chargement de la base de données des interlocuteurs
load_speakers_database()
# Création du dossier de transcriptions sur SSD
os.makedirs('/media/ssd/transcriptions', exist_ok=True)
app.run(host='0.0.0.0', port=5000, debug=False)
``` data['full_text'],
'speakers_count': data.get('total_speakers', 0),
'language': data['language']
})
return jsonify(transcriptions)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/transcription/<timestamp>', methods=['GET'])
def get_transcription_detail(timestamp):
"""Récupère le détail d'une transcription"""
try:
filepath = f'/media/ssd/transcriptions/transcription_{timestamp}.json'
if os.path.exists(filepath):
with open(filepath, 'r') as f:
return jsonify(json.load(f))
else:
return jsonify({'error': 'Transcription non trouvée'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Création du dossier de transcriptions sur SSD
os.makedirs('/media/ssd/transcriptions', exist_ok=True)
app.run(host='0.0.0.0', port=5000, debug=False)
```
### Étape 4 : Configuration du service serveur
Créez `/etc/systemd/system/transcription-server.service` :
```ini
[Unit]
Description=Transcription Server
After=network.target
[Service]
ExecStart=/home/pi/whisper-env/bin/python /home/pi/transcription_server.py
Restart=always
User=pi
Group=pi
Environment=PATH=/home/pi/whisper-env/bin
WorkingDirectory=/home/pi
[Install]
WantedBy=multi-user.target
```
Activez le service :
```bash
sudo systemctl enable transcription-server.service
sudo systemctl start transcription-server.service
```
## Configuration réseau
### Sur les deux Raspberry Pi
Configurez une IP fixe pour le Pi 5 en éditant `/etc/dhcpcd.conf` :
```bash
# Configuration IP fixe pour le Pi 5
interface wlan0
static ip_address=192.168.1.100/24
static routers=192.168.1.1
static domain_name_servers=192.168.1.1 8.8.8.8
```
## Tests et validation
### Test du système complet
1. **Test du Pi Zero (Enregistreur) :**
```bash
# Vérification du service
sudo systemctl status recorder.service
# Test des boutons
python3 -c "import RPi.GPIO as GPIO; GPIO.setmode(GPIO.BCM); print('GPIO OK')"
```
2. **Test du Pi 5 (Serveur) :**
```bash
# Vérification du service
sudo systemctl status transcription-server.service
# Test API
curl -X GET http://localhost:5000/transcriptions
# Test de la diarisation (avec un fichier audio test)
curl -X POST -F "audio=@test.wav" http://localhost:5000/transcribe
```
3. **Test de communication :**
```bash
# Depuis le Pi Zero, testez la connexion au serveur
curl -X GET http://192.168.1.100:5000/transcriptions
```
### Configuration du token Hugging Face
**IMPORTANT :** Pour utiliser la diarisation, vous devez :
1. Créer un compte sur https://huggingface.co/
2. Aller sur https://huggingface.co/settings/tokens et créer un token
3. Accepter les conditions sur :
- https://huggingface.co/pyannote/speaker-diarization-3.1
- https://huggingface.co/pyannote/segmentation-3.0
4. Remplacer `"YOUR_HF_TOKEN"` dans le script par votre token
### Test de diarisation
```bash
# Test avec un fichier audio contenant plusieurs interlocuteurs
source /home/pi/whisper-env/bin/activate
python3 -c "
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization-3.1', use_auth_token='YOUR_TOKEN')
diarization = pipeline('test_audio.wav')
for turn, _, speaker in diarization.itertracks(yield_label=True):
print(f'Interlocuteur {speaker}: {turn.start:.1f}s à {turn.end:.1f}s')
"
```
## Utilisation avancée avec reconnaissance et résumés
### 1. Enregistrement d'interlocuteurs connus
**Première utilisation - Entraînement du système :**
```bash
# Enregistrer Jean après une transcription
curl -X POST http://192.168.1.100:5000/register_speaker \
-H "Content-Type: application/json" \
-d '{
"name": "Jean",
"audio_path": "/media/ssd/transcriptions/transcription_20250102_143025.wav",
"start_time": 5.2,
"end_time": 15.8
}'
# Voir les interlocuteurs enregistrés
curl http://192.168.1.100:5000/speakers
```
### 2. Flux de traitement complet
1. **Enregistrement** : Bouton sur Pi Zero
2. **Diarisation automatique** : Détection SPEAKER_00, SPEAKER_01...
3. **Reconnaissance vocale** : Jean (95%), Marie (87%)
4. **Transcription** : Attribution du texte par personne
5. **Résumé IA** : Points clés, actions, résumé par personne
### 3. Format de sortie enrichi
```json
{
"timestamp": "20250102_143025",
"total_speakers": 2,
"speakers_summary": {
"Jean": {
"total_time": 125.3,
"percentage": 65.2,
"segments_count": 12,
"full_text": "Bonjour tout le monde. Je pense qu'on devrait..."
},
"Marie": {
"total_time": 67.1,
"percentage": 34.8,
"segments_count": 8,
"full_text": "Salut Jean. Je suis d'accord avec ta proposition..."
}
},
"ai_summary": {
"resume_general": "Réunion de planification projet avec définition des priorités et attribution des tâches",
"points_cles": [
"Budget approuvé pour Q1 2025",
"Nouveau timeline établi",
"Recrutement développeur validé"
],
"actions_decidees": [
{
"action": "Créer specs techniques détaillées",
"responsable": "Jean",
"echeance": "15 janvier 2025"
},
{
"action": "Organiser entretiens développeur",
"responsable": "Marie",
"echeance": "20 janvier 2025"
}
],
"decisions_prises": [
"Budget de 50k€ validé",
"Livraison prévue mars 2025"
],
"resume_par_personne": {
"Jean": {
"contribution": "Présentation du budget et timeline, proposition architecture technique",
"temps_parole": "65.2%"
},
"Marie": {
"contribution": "Validation budget, organisation recrutement, questions techniques",
"temps_parole": "34.8%"
}
}
}
}
```
## Fonctionnalités avancées possibles
- Interface web pour consultation des transcriptions
- Synchronisation cloud (Google Drive, Dropbox)
- Reconnaissance de différents locuteurs
- Export vers différents formats (PDF, Word)
- Analyse sentiment et mots-clés
## Estimation des coûts
- Composants électroniques : ~150€
- Raspberry Pi (si pas déjà possédés) : ~100€
- **Total projet : ~250€**
Soit environ 5 fois moins cher qu'une solution Plaud équivalente.