Maison >développement back-end >Tutoriel Python >Créer un assistant d'analyse de données financières au niveau de l'entreprise : pratique du système RAG de données multi-sources basée sur LangChain
Alors que la transformation numérique des marchés financiers continue de s’approfondir, des quantités massives de données financières sont générées quotidiennement sur les marchés mondiaux. Des rapports financiers aux actualités du marché, des cotations en temps réel aux rapports de recherche, ces données ont une valeur énorme tout en posant des défis sans précédent aux professionnels de la finance. Comment extraire rapidement et précisément des informations précieuses à partir de données complexes à l’ère de l’explosion de l’information ? Cette question préoccupe l’ensemble du secteur financier.
Au service de nos clients financiers, nous entendons souvent des analystes se plaindre : « Devoir lire autant de rapports de recherche et d'actualités, tout en traitant des données dans différents formats, c'est vraiment écrasant. » En effet, les analystes financiers modernes sont confrontés à de multiples défis :
La première est la fragmentation des données. Les rapports financiers peuvent exister au format PDF, les données de marché dans des feuilles de calcul Excel et les rapports de recherche de diverses institutions sont disponibles dans divers formats. Les analystes doivent basculer entre ces différents formats de données, comme reconstituer un puzzle, ce qui prend du temps et demande beaucoup de main d'œuvre.
Le deuxième est le défi en temps réel. Les marchés financiers évoluent rapidement et des nouvelles importantes peuvent changer la direction du marché en quelques minutes. Les méthodes d'analyse manuelle traditionnelles peuvent difficilement suivre le rythme du marché, et des opportunités sont souvent manquées une fois l'analyse terminée.
Troisièmement, il y a la question du seuil professionnel. Pour exceller dans l’analyse financière, il faut non seulement de solides connaissances financières, mais également des capacités de traitement des données, ainsi qu’une compréhension des politiques et réglementations du secteur. La formation de tels talents composés prend beaucoup de temps, coûte cher et est difficile à mettre à l'échelle.
Sur la base de ces problèmes pratiques, nous avons commencé à réfléchir : pourrions-nous utiliser les dernières technologies d'IA, en particulier les technologies LangChain et RAG, pour créer un assistant intelligent d'analyse de données financières ?
Les objectifs de ce système sont clairs : il doit fonctionner comme un analyste financier expérimenté mais avec l'efficacité et la précision d'une machine. Plus précisément :
Cela devrait abaisser le seuil d'analyse, rendant l'analyse professionnelle compréhensible pour les investisseurs ordinaires. C'est comme avoir un expert à vos côtés, prêt à répondre aux questions et à traduire des termes financiers complexes dans un langage facile à comprendre.
Cela devrait améliorer considérablement l'efficacité de l'analyse, en compressant le traitement des données qui prenait initialement des heures en quelques minutes. Le système peut intégrer automatiquement des données multi-sources et générer des rapports professionnels, permettant aux analystes de se concentrer davantage sur la réflexion stratégique.
En attendant, il doit garantir la qualité des analyses. Grâce à la validation croisée de données multi-sources et de modèles financiers professionnels, il fournit des conclusions d'analyse fiables. Chaque conclusion doit être bien étayée pour garantir la fiabilité de la décision.
Plus important encore, ce système doit contrôler efficacement les coûts. Grâce à des mécanismes intelligents de planification des ressources et de mise en cache, les coûts d'exploitation sont maintenus dans une fourchette raisonnable tout en garantissant les performances.
Lors de la conception de ce système d'analyse de données financières, notre principal défi était : comment construire une architecture à la fois flexible et stable, capable de gérer avec élégance des données hétérogènes multi-sources tout en garantissant l'évolutivité du système ?
Après des validations et des mises en pratique répétées, nous avons finalement adopté une conception d'architecture à trois couches :
La couche d'ingestion de données gère diverses sources de données, comme un traducteur multilingue, capable de comprendre et de transformer les formats de données provenant de différents canaux. Qu'il s'agisse de cotations en temps réel d'échanges ou d'actualités de sites Web financiers, tout peut être standardisé dans le système.
La couche intermédiaire de traitement d'analyse est le cerveau du système, où le moteur RAG basé sur LangChain est déployé. À l’instar des analystes expérimentés, il combine des données historiques et des informations en temps réel pour une analyse et un raisonnement multidimensionnels. Nous avons particulièrement mis l'accent sur la conception modulaire dans cette couche, facilitant l'intégration de nouveaux modèles d'analyse.
La couche de présentation d'interaction supérieure fournit des interfaces API standard et des composants de visualisation riches. Les utilisateurs peuvent obtenir des résultats d'analyse via un dialogue en langage naturel, et le système convertit automatiquement l'analyse de données complexes en graphiques et rapports intuitifs.
Sur la base de cette architecture, nous avons construit plusieurs modules fonctionnels clés :
La conception de laCouche d'acquisition de données se concentre sur la résolution des problèmes d'exhaustivité et de temps réel des données. En prenant comme exemple le traitement des rapports financiers, nous avons développé un moteur d'analyse intelligent capable d'identifier avec précision les états financiers dans différents formats et d'extraire automatiquement les indicateurs clés. Pour les actualités du marché, le système surveille plusieurs sources d'actualités via des robots d'exploration distribués pour garantir que les informations importantes sont capturées en temps réel.
LaLa couche de traitement d'analyse est le cœur du système, où nous avons apporté de nombreuses innovations :
Couche de présentation d'interaction se concentre sur l'expérience utilisateur :
Lors de la création de systèmes d'entreprise, les performances, les coûts et la qualité sont toujours les principales considérations. Sur la base d'une vaste expérience pratique, nous avons développé un ensemble complet de solutions pour ces fonctionnalités clés.
Lors du traitement des données financières, nous rencontrons souvent des rapports de recherche très longs ou de grandes quantités de données de trading historiques. Sans optimisation, il est facile d'atteindre les limites des jetons de LLM et même d'engager d'énormes coûts d'appels d'API. Pour cela, nous avons conçu un mécanisme intelligent de gestion des Tokens :
Pour les documents longs, le système effectue automatiquement une segmentation sémantique. Par exemple, un rapport annuel d’une centaine de pages sera décomposé en plusieurs segments sémantiquement connectés. Ces segments sont classés par importance, les informations de base étant traitées en premier. Parallèlement, nous avons mis en œuvre une gestion dynamique du budget des jetons, ajustant automatiquement les quotas de jetons pour chaque tâche d'analyse en fonction de la complexité et de l'importance des requêtes.
Sur les marchés financiers, chaque seconde compte. Une bonne opportunité d’analyse pourrait rapidement disparaître. Pour minimiser la latence du système :
Nous avons adopté une architecture de traitement de streaming en chaîne complète. Lorsque les utilisateurs lancent des demandes d'analyse, le système démarre immédiatement le traitement et utilise des mécanismes de réponse en continu pour permettre aux utilisateurs de voir la progression de l'analyse en temps réel. Par exemple, lors de l'analyse d'un stock, les informations de base sont renvoyées immédiatement, tandis que les résultats de l'analyse approfondie sont affichés au fur et à mesure de la progression des calculs.
Pendant ce temps, les tâches d'analyse complexes sont conçues pour une exécution asynchrone. Le système effectue une analyse approfondie et fastidieuse en arrière-plan, permettant aux utilisateurs de voir les résultats préliminaires sans attendre la fin de tous les calculs. Cette conception améliore considérablement l'expérience utilisateur tout en garantissant la qualité de l'analyse.
Les systèmes d'entreprise doivent contrôler les coûts d'exploitation dans une fourchette raisonnable tout en garantissant les performances :
Nous avons mis en œuvre des stratégies de mise en cache à plusieurs niveaux. Les données chaudes sont intelligemment mises en cache, telles que les indicateurs financiers couramment utilisés ou les résultats d'analyse fréquemment interrogés. Le système ajuste automatiquement les stratégies de mise en cache en fonction des caractéristiques d'actualité des données, garantissant à la fois la fraîcheur des données et réduisant considérablement les calculs répétés.
Pour la sélection des modèles, nous avons adopté un mécanisme de planification dynamique. Les requêtes simples peuvent nécessiter uniquement des modèles légers, tandis que les tâches d'analyse complexes nécessiteront des modèles plus puissants. Cette stratégie de traitement différenciée garantit la qualité des analyses tout en évitant le gaspillage de ressources.
Dans l'analyse financière, l'exactitude des données et la fiabilité des résultats d'analyse sont cruciales, car même une petite erreur peut entraîner des biais de décision importants. Par conséquent, nous avons construit un mécanisme d'assurance qualité rigoureux :
Dans la phase de validation des données, nous avons adopté plusieurs stratégies de vérification :
En termes de vérification des résultats, nous avons mis en place un système de validation multi-niveaux :
Nous avons notamment également introduit un mécanisme de « notation de confiance ». Le système marque les niveaux de confiance pour chaque résultat d'analyse, aidant ainsi les utilisateurs à mieux évaluer les risques liés aux décisions :
Grâce à ce système complet d'assurance qualité, nous garantissons que chaque conclusion émise par le système a fait l'objet d'une vérification stricte, permettant aux utilisateurs d'appliquer en toute confiance les résultats de l'analyse aux décisions réelles.
Dans l'analyse des données financières, les données des rapports financiers sont l'une des sources de données les plus fondamentales et les plus importantes. Nous avons développé une solution complète de traitement des données des rapports financiers :
Nous avons implémenté une interface d'analyse unifiée pour les rapports financiers dans différents formats :
class FinancialReportParser: def __init__(self): self.pdf_parser = PDFParser() self.excel_parser = ExcelParser() self.html_parser = HTMLParser() def parse(self, file_path): file_type = self._detect_file_type(file_path) if file_type == 'pdf': return self.pdf_parser.extract_tables(file_path) elif file_type == 'excel': return self.excel_parser.parse_sheets(file_path) elif file_type == 'html': return self.html_parser.extract_data(file_path)
En particulier pour les rapports financiers au format PDF, nous avons utilisé une technologie de reconnaissance de tableaux basée sur la vision par ordinateur pour extraire avec précision les données de divers états financiers.
Pour garantir la cohérence des données, nous avons établi un modèle de données financières unifié :
class FinancialDataNormalizer: def normalize(self, raw_data): # 1. Field mapping standardization mapped_data = self._map_to_standard_fields(raw_data) # 2. Value unit unification unified_data = self._unify_units(mapped_data) # 3. Time series alignment aligned_data = self._align_time_series(unified_data) # 4. Data quality check validated_data = self._validate_data(aligned_data) return validated_data
Le système peut calculer et extraire automatiquement des indicateurs financiers clés :
class FinancialMetricsCalculator: def calculate_metrics(self, financial_data): metrics = { 'profitability': { 'roe': self._calculate_roe(financial_data), 'roa': self._calculate_roa(financial_data), 'gross_margin': self._calculate_gross_margin(financial_data) }, 'solvency': { 'debt_ratio': self._calculate_debt_ratio(financial_data), 'current_ratio': self._calculate_current_ratio(financial_data) }, 'growth': { 'revenue_growth': self._calculate_revenue_growth(financial_data), 'profit_growth': self._calculate_profit_growth(financial_data) } } return metrics
Nous avons construit un système de collecte d'actualités distribué :
class NewsAggregator: def __init__(self): self.rss_sources = self._load_rss_sources() self.news_queue = Queue() def start_collection(self): for source in self.rss_sources: Thread( target=self._collect_from_source, args=(source,) ).start() def _collect_from_source(self, source): while True: news_items = self._fetch_news(source) for item in news_items: if self._is_relevant(item): self.news_queue.put(item) time.sleep(source.refresh_interval)
Mise en œuvre d'un système de classification des actualités basé sur l'apprentissage automatique :
class NewsClassifier: def __init__(self): self.model = self._load_classifier_model() self.categories = [ 'earnings', 'merger_acquisition', 'market_analysis', 'policy_regulation' ] def classify(self, news_item): # 1. Feature extraction features = self._extract_features(news_item) # 2. Predict category category = self.model.predict(features) # 3. Calculate confidence confidence = self.model.predict_proba(features).max() return { 'category': category, 'confidence': confidence }
Mise en œuvre d'une file d'attente de mise à jour en temps réel basée sur Redis :
class RealTimeNewsUpdater: def __init__(self): self.redis_client = Redis() self.update_interval = 60 # seconds def process_updates(self): while True: # 1. Get latest news news_items = self.news_queue.get_latest() # 2. Update vector store self._update_vector_store(news_items) # 3. Trigger real-time analysis self._trigger_analysis(news_items) # 4. Notify subscribed clients self._notify_subscribers(news_items)
Mise en place d'un système d'intégration de données de marché performant :
class MarketDataStreamer: def __init__(self): self.websocket = None self.buffer_size = 1000 self.data_buffer = deque(maxlen=self.buffer_size) async def connect(self, market_url): self.websocket = await websockets.connect(market_url) asyncio.create_task(self._process_stream()) async def _process_stream(self): while True: data = await self.websocket.recv() parsed_data = self._parse_market_data(data) self.data_buffer.append(parsed_data) await self._trigger_analysis(parsed_data)
Implémentation d'un framework de traitement de flux basé sur Apache Flink :
class MarketDataProcessor: def __init__(self): self.flink_env = StreamExecutionEnvironment.get_execution_environment() self.window_size = Time.seconds(10) def setup_pipeline(self): # 1. Create data stream market_stream = self.flink_env.add_source( MarketDataSource() ) # 2. Set time window windowed_stream = market_stream.window_all( TumblingEventTimeWindows.of(self.window_size) ) # 3. Aggregate calculations aggregated_stream = windowed_stream.aggregate( MarketAggregator() ) # 4. Output results aggregated_stream.add_sink( MarketDataSink() )
Mise en œuvre d'un système efficace de calcul de métriques en temps réel :
class RealTimeMetricsCalculator: def __init__(self): self.metrics_cache = LRUCache(capacity=1000) self.update_threshold = 0.01 # 1% change threshold def calculate_metrics(self, market_data): # 1. Technical indicator calculation technical_indicators = self._calculate_technical(market_data) # 2. Statistical metrics calculation statistical_metrics = self._calculate_statistical(market_data) # 3. Volatility analysis volatility_metrics = self._calculate_volatility(market_data) # 4. Update cache self._update_cache(market_data.symbol, { 'technical': technical_indicators, 'statistical': statistical_metrics, 'volatility': volatility_metrics }) return self.metrics_cache[market_data.symbol]
Grâce à la mise en œuvre de ces composants de base, nous avons réussi à construire un système d'analyse financière capable de traiter des données hétérogènes multi-sources. Le système peut non seulement analyser avec précision divers types de données financières, mais également traiter la dynamique du marché en temps réel, fournissant ainsi une base de données fiable pour une analyse et une prise de décision ultérieures.
Dans les scénarios financiers, les stratégies traditionnelles de segmentation de longueur fixe ne parviennent souvent pas à maintenir l'intégrité sémantique des documents. Nous avons conçu une stratégie de segmentation intelligente pour différents types de documents financiers :
Nous avons mis en œuvre une stratégie de segmentation sémantique pour les états financiers :
class FinancialReportChunker: def __init__(self): self.section_patterns = { 'balance_sheet': r'资产负债表|Balance Sheet', 'income_statement': r'利润表|Income Statement', 'cash_flow': r'现金流量表|Cash Flow Statement' } def chunk_report(self, report_text): chunks = [] # 1. Identify main sections of the report sections = self._identify_sections(report_text) # 2. Chunk by accounting subjects for section in sections: section_chunks = self._chunk_by_accounts(section) # 3. Add contextual information enriched_chunks = self._enrich_context(section_chunks) chunks.extend(enriched_chunks) return chunks
Pour le contenu d'actualité, nous avons mis en œuvre une stratégie de segmentation dynamique basée sur la sémantique :
class FinancialReportParser: def __init__(self): self.pdf_parser = PDFParser() self.excel_parser = ExcelParser() self.html_parser = HTMLParser() def parse(self, file_path): file_type = self._detect_file_type(file_path) if file_type == 'pdf': return self.pdf_parser.extract_tables(file_path) elif file_type == 'excel': return self.excel_parser.parse_sheets(file_path) elif file_type == 'html': return self.html_parser.extract_data(file_path)
Pour les données de trading haute fréquence, nous avons mis en œuvre une stratégie de segmentation basée sur des fenêtres temporelles :
class FinancialDataNormalizer: def normalize(self, raw_data): # 1. Field mapping standardization mapped_data = self._map_to_standard_fields(raw_data) # 2. Value unit unification unified_data = self._unify_units(mapped_data) # 3. Time series alignment aligned_data = self._align_time_series(unified_data) # 4. Data quality check validated_data = self._validate_data(aligned_data) return validated_data
Pour améliorer la qualité de la représentation sémantique dans les textes financiers, nous avons effectué une adaptation de domaine sur des modèles pré-entraînés :
class FinancialMetricsCalculator: def calculate_metrics(self, financial_data): metrics = { 'profitability': { 'roe': self._calculate_roe(financial_data), 'roa': self._calculate_roa(financial_data), 'gross_margin': self._calculate_gross_margin(financial_data) }, 'solvency': { 'debt_ratio': self._calculate_debt_ratio(financial_data), 'current_ratio': self._calculate_current_ratio(financial_data) }, 'growth': { 'revenue_growth': self._calculate_revenue_growth(financial_data), 'profit_growth': self._calculate_profit_growth(financial_data) } } return metrics
Compte tenu de la nature multilingue des données financières, nous avons mis en œuvre des capacités de récupération multilingues :
class NewsAggregator: def __init__(self): self.rss_sources = self._load_rss_sources() self.news_queue = Queue() def start_collection(self): for source in self.rss_sources: Thread( target=self._collect_from_source, args=(source,) ).start() def _collect_from_source(self, source): while True: news_items = self._fetch_news(source) for item in news_items: if self._is_relevant(item): self.news_queue.put(item) time.sleep(source.refresh_interval)
Pour garantir la rapidité des résultats de récupération, nous avons mis en place un mécanisme de mise à jour incrémentielle de l'index :
class NewsClassifier: def __init__(self): self.model = self._load_classifier_model() self.categories = [ 'earnings', 'merger_acquisition', 'market_analysis', 'policy_regulation' ] def classify(self, news_item): # 1. Feature extraction features = self._extract_features(news_item) # 2. Predict category category = self.model.predict(features) # 3. Calculate confidence confidence = self.model.predict_proba(features).max() return { 'category': category, 'confidence': confidence }
Implémentation d'un calcul de pertinence basé sur la décroissance temporelle :
class RealTimeNewsUpdater: def __init__(self): self.redis_client = Redis() self.update_interval = 60 # seconds def process_updates(self): while True: # 1. Get latest news news_items = self.news_queue.get_latest() # 2. Update vector store self._update_vector_store(news_items) # 3. Trigger real-time analysis self._trigger_analysis(news_items) # 4. Notify subscribed clients self._notify_subscribers(news_items)
Pour améliorer la précision de la récupération, nous avons mis en œuvre la récupération hybride sur plusieurs dimensions :
class MarketDataStreamer: def __init__(self): self.websocket = None self.buffer_size = 1000 self.data_buffer = deque(maxlen=self.buffer_size) async def connect(self, market_url): self.websocket = await websockets.connect(market_url) asyncio.create_task(self._process_stream()) async def _process_stream(self): while True: data = await self.websocket.recv() parsed_data = self._parse_market_data(data) self.data_buffer.append(parsed_data) await self._trigger_analysis(parsed_data)
Mise en œuvre d'un algorithme de classement par pertinence prenant en compte plusieurs facteurs :
class MarketDataProcessor: def __init__(self): self.flink_env = StreamExecutionEnvironment.get_execution_environment() self.window_size = Time.seconds(10) def setup_pipeline(self): # 1. Create data stream market_stream = self.flink_env.add_source( MarketDataSource() ) # 2. Set time window windowed_stream = market_stream.window_all( TumblingEventTimeWindows.of(self.window_size) ) # 3. Aggregate calculations aggregated_stream = windowed_stream.aggregate( MarketAggregator() ) # 4. Output results aggregated_stream.add_sink( MarketDataSink() )
Grâce à ces mesures d'optimisation, nous avons considérablement amélioré les performances du système RAG dans des scénarios financiers. Le système a démontré une excellente précision de récupération et une vitesse de réponse excellente, en particulier lors du traitement de données financières avec des exigences élevées en temps réel et une complexité professionnelle.
Avant de procéder à une analyse des données financières, un prétraitement systématique des données brutes est nécessaire. Nous avons mis en œuvre un pipeline complet de prétraitement des données :
class RealTimeMetricsCalculator: def __init__(self): self.metrics_cache = LRUCache(capacity=1000) self.update_threshold = 0.01 # 1% change threshold def calculate_metrics(self, market_data): # 1. Technical indicator calculation technical_indicators = self._calculate_technical(market_data) # 2. Statistical metrics calculation statistical_metrics = self._calculate_statistical(market_data) # 3. Volatility analysis volatility_metrics = self._calculate_volatility(market_data) # 4. Update cache self._update_cache(market_data.symbol, { 'technical': technical_indicators, 'statistical': statistical_metrics, 'volatility': volatility_metrics }) return self.metrics_cache[market_data.symbol]
class FinancialReportChunker: def __init__(self): self.section_patterns = { 'balance_sheet': r'资产负债表|Balance Sheet', 'income_statement': r'利润表|Income Statement', 'cash_flow': r'现金流量表|Cash Flow Statement' } def chunk_report(self, report_text): chunks = [] # 1. Identify main sections of the report sections = self._identify_sections(report_text) # 2. Chunk by accounting subjects for section in sections: section_chunks = self._chunk_by_accounts(section) # 3. Add contextual information enriched_chunks = self._enrich_context(section_chunks) chunks.extend(enriched_chunks) return chunks
class NewsChunker: def __init__(self): self.nlp = spacy.load('zh_core_web_lg') self.min_chunk_size = 100 self.max_chunk_size = 500 def chunk_news(self, news_text): # 1. Semantic paragraph recognition doc = self.nlp(news_text) semantic_paragraphs = self._get_semantic_paragraphs(doc) # 2. Dynamically adjust chunk size chunks = [] current_chunk = [] current_size = 0 for para in semantic_paragraphs: if self._should_start_new_chunk(current_size, len(para)): if current_chunk: chunks.append(self._create_chunk(current_chunk)) current_chunk = [para] current_size = len(para) else: current_chunk.append(para) current_size += len(para) return chunks
class MarketDataChunker: def __init__(self): self.time_window = timedelta(minutes=5) self.overlap = timedelta(minutes=1) def chunk_market_data(self, market_data): chunks = [] current_time = market_data[0]['timestamp'] end_time = market_data[-1]['timestamp'] while current_time < end_time: window_end = current_time + self.time_window # Extract data within time window window_data = self._extract_window_data( market_data, current_time, window_end ) # Calculate window statistical features window_features = self._calculate_window_features(window_data) chunks.append({ 'time_window': (current_time, window_end), 'data': window_data, 'features': window_features }) current_time += (self.time_window - self.overlap) return chunks
class FinancialEmbeddingOptimizer: def __init__(self): self.base_model = SentenceTransformer('base_model') self.financial_terms = self._load_financial_terms() def optimize_embeddings(self, texts): # 1. Identify financial terminology financial_entities = self._identify_financial_terms(texts) # 2. Enhance weights for financial terms weighted_texts = self._apply_term_weights(texts, financial_entities) # 3. Generate optimized embeddings embeddings = self.base_model.encode( weighted_texts, normalize_embeddings=True ) return embeddings
class MultilingualEmbedder: def __init__(self): self.models = { 'zh': SentenceTransformer('chinese_model'), 'en': SentenceTransformer('english_model') } self.translator = MarianMTTranslator() def generate_embeddings(self, text): # 1. Language detection lang = self._detect_language(text) # 2. Translation if necessary if lang not in self.models: text = self.translator.translate(text, target_lang='en') lang = 'en' # 3. Generate vector representation embedding = self.models[lang].encode(text) return { 'embedding': embedding, 'language': lang }
class RealTimeIndexUpdater: def __init__(self): self.vector_store = MilvusClient() self.update_buffer = [] self.buffer_size = 100 async def update_index(self, new_data): # 1. Add to update buffer self.update_buffer.append(new_data) # 2. Check if batch update is needed if len(self.update_buffer) >= self.buffer_size: await self._perform_batch_update() async def _perform_batch_update(self): try: # Generate vector representations embeddings = self._generate_embeddings(self.update_buffer) # Update vector index self.vector_store.upsert( embeddings, [doc['id'] for doc in self.update_buffer] ) # Clear buffer self.update_buffer = [] except Exception as e: logger.error(f"Index update failed: {e}")
class TemporalRetriever: def __init__(self): self.decay_factor = 0.1 self.max_age_days = 30 def retrieve(self, query, top_k=5): # 1. Basic semantic retrieval base_results = self._semantic_search(query) # 2. Apply time decay scored_results = [] for result in base_results: age_days = self._calculate_age(result['timestamp']) if age_days <= self.max_age_days: time_score = math.exp(-self.decay_factor * age_days) final_score = result['score'] * time_score scored_results.append({ 'content': result['content'], 'score': final_score, 'timestamp': result['timestamp'] }) # 3. Rerank results return sorted(scored_results, key=lambda x: x['score'], reverse=True)[:top_k]
class HybridRetriever: def __init__(self): self.semantic_weight = 0.6 self.keyword_weight = 0.2 self.temporal_weight = 0.2 def retrieve(self, query): # 1. Semantic retrieval semantic_results = self._semantic_search(query) # 2. Keyword retrieval keyword_results = self._keyword_search(query) # 3. Temporal relevance temporal_results = self._temporal_search(query) # 4. Result fusion merged_results = self._merge_results( semantic_results, keyword_results, temporal_results ) return merged_results
Ces implémentations garantissent l'exhaustivité et la fiabilité du pipeline d'analyse, du prétraitement des données à la visualisation finale. Chaque composant est soigneusement conçu et optimisé. Le système peut gérer des tâches d'analyse financière complexes et présenter les résultats de manière intuitive.
Dans les scénarios de recherche en investissement, notre système implémente des applications approfondies grâce à l'architecture de collaboration multimodèle décrite précédemment. Plus précisément :
Au niveau de la base de connaissances, nous standardisons les données non structurées telles que les rapports de recherche, les annonces et les actualités via des workflows de prétraitement des données. Grâce à des solutions de vectorisation, ces textes sont transformés en vecteurs de grande dimension stockés dans des bases de données vectorielles. Pendant ce temps, les méthodes de construction de graphes de connaissances établissent des relations entre les entreprises, les industries et le personnel clé.
Dans les applications pratiques, lorsque les analystes ont besoin de rechercher une entreprise, le système extrait d'abord avec précision les informations pertinentes de la base de connaissances via le mécanisme de récupération RAG. Ensuite, grâce à une collaboration multi-modèle, différents modèles fonctionnels sont chargés de :
Enfin, grâce au mécanisme de synthèse des résultats, les résultats d'analyse de plusieurs modèles sont intégrés dans des rapports de recherche complets.
Dans les scénarios de gestion des risques, nous utilisons pleinement les capacités de traitement en temps réel du système. Sur la base de l'architecture d'ingestion de données, le système reçoit des données de marché en temps réel, des informations sur les sentiments et des événements à risque.
Grâce à un pipeline d'analyse en temps réel, le système peut :
En particulier lors de la gestion d'événements à risque soudains, le mécanisme de traitement en continu garantit une réponse rapide du système. La conception de l'explicabilité aide le personnel de contrôle des risques à comprendre la base de décision du système.
Dans les scénarios de service aux investisseurs, notre système fournit des services précis grâce au mécanisme de gestion adaptative du dialogue conçu précédemment. Plus précisément :
Grâce aux flux de travail de traitement des données, le système maintient une base de connaissances professionnelles couvrant les produits financiers, les stratégies d'investissement et la connaissance du marché.
Lorsque les investisseurs soulèvent des questions, le mécanisme de récupération RAG localise avec précision les points de connaissances pertinents.
Grâce à une collaboration multi-modèle :
Le système personnalise également les réponses en fonction de mécanismes de profilage des utilisateurs, garantissant ainsi que la profondeur professionnelle correspond aux niveaux d'expertise des utilisateurs.
Grâce aux applications des scénarios ci-dessus, le système a obtenu des résultats significatifs dans la pratique :
Amélioration de l'efficacité de la recherche : l'efficacité du travail de recherche quotidien des analystes a augmenté de 40 %, particulièrement remarquable dans le traitement d'informations massives.
Précision du contrôle des risques : grâce à une analyse multidimensionnelle, la précision des alertes de risque a atteint plus de 85 %, soit une amélioration de 30 % par rapport aux méthodes traditionnelles.
Qualité du service : la précision de la première réponse aux demandes des investisseurs a dépassé 90 %, avec des taux de satisfaction atteignant 4,8/5.
Ces résultats valident la praticité et l'efficacité des différents modules techniques conçus dans les sections précédentes. Pendant ce temps, les commentaires recueillis lors de la mise en œuvre nous aident à optimiser en permanence l'architecture du système et les implémentations spécifiques.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!