首页 >后端开发 >Python教程 >在生产应用程序中集成大型语言模型

在生产应用程序中集成大型语言模型

Mary-Kate Olsen
Mary-Kate Olsen原创
2025-01-07 06:24:41890浏览

在本实用指南中,您将学习如何为您的应用程序创建具有内置 LLM 的高度可扩展的模型部署解决方案。
在您的示例中,我们将使用 Hugging Face 的 ChatGPT2 模型,但您可以轻松插入任何其他模型,包括 ChatGPT4、Claude 等。
无论您是设计具有 AI 功能的新应用程序,还是改进现有的 AI 系统,本指南都将帮助您逐步创建强大的 LLM 集成。

了解 LLM 整合基础知识

在开始编写代码之前,让我们弄清楚构建生产 LLM 集成需要什么。在构建生产就绪的 LLM 集成时,API 调用并不是您需要考虑的唯一事情,您还需要考虑可靠性、成本和稳定性等问题。您的生产应用程序必须解决服务中断、速率限制和响应时间变化等问题,同时控制成本。
这是我们将共同构建的内容:

  • 一个强大的 API 客户端,可以优雅地处理失败
  • 用于优化成本和速度的智能缓存系统
  • 适当的提示管理系统
  • 全面的错误处理和监控
  • 完整的内容审核系统作为您的示例项目

先决条件

在我们开始编码之前,请确保您拥有:

  • 您的计算机上安装了 Python 3.8 或更高版本
  • Redis云账号或本地安装
  • 基础Python编程知识
  • REST API 的基本了解
  • Hugging Face API 密钥(或任何其他 LLM 提供商密钥)

想跟随吗?完整的代码可以在您的 GitHub 存储库中找到。

设置您的开发环境

让我们首先准备好您的开发环境。我们将创建一个干净的项目结构并安装所有必要的软件包。

首先,让我们创建项目目录并设置 Python 虚拟环境。打开终端并运行:

mkdir llm_integration && cd llm_integration
python3 -m venv env
syource env/bin/activate

现在让我们设置您的项目依赖项。使用这些基本包创建一个新的requirements.txt 文件:

transformers==4.36.0
huggingface-hub==0.19.4
redis==4.6.0
pydantic==2.5.0
pydantic-settings==2.1.0
tenacity==8.2.3
python-dotenv==1.0.0
fastapi==0.104.1
uvicorn==0.24.0
torch==2.1.0
numpy==1.24.3

让我们来分析一下为什么我们需要这些包:

  • Transformers:这是 Hugging Face 强大的库,我们将用它来与 Qwen2.5-Coder 模型进行交互。
  • Huggingface-hub:使我们能够处理模型加载和版本控制 redis:用于实现请求缓存
  • pydantic:用于数据验证和设置。
  • 坚韧:负责重试功能以提高可靠性
  • python-dotenv:用于加载环境变量
  • fastapi:使用少量代码构建您的 API 端点
  • uvicorn:用于高效运行 FastAPI 应用程序
  • torch:用于运行变压器模型和处理机器学习操作
  • numpy:用于数值计算。

使用以下命令安装所有软件包:

mkdir llm_integration && cd llm_integration
python3 -m venv env
syource env/bin/activate

让我们以干净的结构来组织您的项目。在您的项目目录中创建这些目录和文件:

transformers==4.36.0
huggingface-hub==0.19.4
redis==4.6.0
pydantic==2.5.0
pydantic-settings==2.1.0
tenacity==8.2.3
python-dotenv==1.0.0
fastapi==0.104.1
uvicorn==0.24.0
torch==2.1.0
numpy==1.24.3

构建 LLM 客户端

让我们从您的LLM客户端开始,这是您申请中最重要的组成部分。这是我们与 ChatGPT 模型(或您喜欢的任何其他 LLM)交互的地方。将以下代码片段添加到您的 core/llm_client.py 文件中:

pip install -r requirements.txt

在 LLMClient 类的第一部分中,我们正在建立基础:

  • 我们正在使用 Transformer 库中的 AutoModelForCausalLM 和 AutoTokenizer 来加载您的模型
  • device_map="auto" 参数自动处理 GPU/CPU 分配
  • 我们使用 torch.float16 来优化内存使用,同时保持良好的性能

现在让我们添加与您的模型对话的方法:

llm_integration/
├── core/
│   ├── llm_client.py      # your main LLM interaction code
│   ├── prompt_manager.py  # Handles prompt templates
│   └── response_handler.py # Processes LLM responses
├── cache/
│   └── redis_manager.py   # Manages your caching system
├── config/
│   └── settings.py        # Configuration management
├── api/
│   └── routes.py          # API endpoints
├── utils/
│   ├── monitoring.py      # Usage tracking
│   └── rate_limiter.py    # Rate limiting logic
├── requirements.txt
└── main.py
└── usage_logs.json       

让我们分解一下这个完成方法中发生了什么:

  • 添加了@retry装饰器方法来处理临时失败。
  • 使用 torch.no_grad() 上下文管理器通过禁用梯度计算来节省内存。
  • 跟踪输入和输出中的令牌使用情况,这对于成本计算非常重要。
  • 返回包含响应和使用统计信息的结构化字典。

创建您的 LLM 响应处理程序

接下来,我们需要添加响应处理程序来解析和构建 LLM 的原始输出。使用以下代码片段在 core/response_handler.py 文件中执行此操作:

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Dict, Optional
import logging

class LLMClient:
    def __init__(self, model_name: str = "gpt2", timeout: int = 30):
        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                device_map="auto",
                torch_dtype=torch.float16
            )
        except Exception as e:
            logging.error(f"Error loading model: {str(e)}")
            # Fallback to a simpler model if the specified one fails
            self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
            self.model = AutoModelForCausalLM.from_pretrained("gpt2")

        self.timeout = timeout
        self.logger = logging.getLogger(__name__)

添加强大的缓存系统

现在让我们创建您的缓存系统来提高应用程序性能并降低成本。将以下代码片段添加到您的cache/redis_manager.py 文件中:

 @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        reraise=True
    )
    async def complete(self, 
                      prompt: str, 
                      temperature: float = 0.7,
                      max_tokens: Optional[int] = None) -> Dict:
        """Get completion from the model with automatic retries"""
        try:
            inputs = self.tokenizer(prompt, return_tensors="pt").to(
                self.model.device
            )

            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=max_tokens or 100,
                    temperature=temperature,
                    do_sample=True
                )

            response_text = self.tokenizer.decode(
                outputs[0], 
                skip_special_tokens=True
            )

            # Calculate token usage for monitoring
            input_tokens = len(inputs.input_ids[0])
            output_tokens = len(outputs[0]) - input_tokens

            return {
                'content': response_text,
                'usage': {
                    'prompt_tokens': input_tokens,
                    'completion_tokens': output_tokens,
                    'total_tokens': input_tokens + output_tokens
                },
                'model': "gpt2"
            }

        except Exception as e:
            self.logger.error(f"Error in LLM completion: {str(e)}")
            raise

在上面的代码片段中,我们创建了一个 CacheManager 类,它通过以下方式处理所有缓存操作:

  • _generate_key 方法,根据提示和参数创建唯一的缓存键
  • get_cached_response 检查我们是否有给定提示的缓存响应
  • cache_response 存储成功的响应以供将来使用

创建智能提示管理器

让我们创建您的提示管理器来管理您的 LLM 模型的提示。将以下代码添加到您的 core/prompt_manager.py 中:

mkdir llm_integration && cd llm_integration
python3 -m venv env
syource env/bin/activate

然后使用代码片段在您的提示/content_moderation.json 文件中创建用于内容审核的示例提示模板:

transformers==4.36.0
huggingface-hub==0.19.4
redis==4.6.0
pydantic==2.5.0
pydantic-settings==2.1.0
tenacity==8.2.3
python-dotenv==1.0.0
fastapi==0.104.1
uvicorn==0.24.0
torch==2.1.0
numpy==1.24.3

现在,您的提示管理器将能够从 JSON 文件加载提示模板,并获得格式化的提示模板。

设置配置管理器

为了将所有 LLM 配置保存在一个位置并轻松地在您的应用程序中重复使用它们,让我们创建配置设置。将以下代码添加到您的 config/settings.py 文件中:

pip install -r requirements.txt

实施速率限制

接下来,让我们实施速率限制来控制用户访问应用程序资源的方式。为此,请将以下代码添加到您的 utils/rate_limiter.py 文件中:

llm_integration/
├── core/
│   ├── llm_client.py      # your main LLM interaction code
│   ├── prompt_manager.py  # Handles prompt templates
│   └── response_handler.py # Processes LLM responses
├── cache/
│   └── redis_manager.py   # Manages your caching system
├── config/
│   └── settings.py        # Configuration management
├── api/
│   └── routes.py          # API endpoints
├── utils/
│   ├── monitoring.py      # Usage tracking
│   └── rate_limiter.py    # Rate limiting logic
├── requirements.txt
└── main.py
└── usage_logs.json       

在 RateLimiter 中,我们实现了一个可重复使用的 check_rate_limit 方法,该方法可在任何路由中使用,通过简单地传递每个用户在一段时间内允许的周期和请求数量来处理速率限制。

创建您的 API 端点

现在让我们在 api/routes.py 文件中创建 API 端点,以将 LLM 集成到您的应用程序中:

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Dict, Optional
import logging

class LLMClient:
    def __init__(self, model_name: str = "gpt2", timeout: int = 30):
        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                device_map="auto",
                torch_dtype=torch.float16
            )
        except Exception as e:
            logging.error(f"Error loading model: {str(e)}")
            # Fallback to a simpler model if the specified one fails
            self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
            self.model = AutoModelForCausalLM.from_pretrained("gpt2")

        self.timeout = timeout
        self.logger = logging.getLogger(__name__)

这里我们在 APIRouter 类中定义了一个 /moderate 端点,它负责组织 API 路由。 @lru_cache 装饰器应用于依赖项注入函数(get_llm_client、get_response_handler、get_cache_manager 和 get_prompt_manager),以确保 LLMClient、CacheManager 和 PromptManager 的实例被缓存以获得更好的性能。用 @router.post 修饰的moderate_content函数定义了一个用于内容审核的POST路由,并利用FastAPI的Depends机制来注入这些依赖项。在函数内部,RateLimiter 类使用设置中的速率限制设置进行配置,强制执行请求限制。

最后,让我们更新您的 main.py 以将所有内容整合在一起:

 @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        reraise=True
    )
    async def complete(self, 
                      prompt: str, 
                      temperature: float = 0.7,
                      max_tokens: Optional[int] = None) -> Dict:
        """Get completion from the model with automatic retries"""
        try:
            inputs = self.tokenizer(prompt, return_tensors="pt").to(
                self.model.device
            )

            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=max_tokens or 100,
                    temperature=temperature,
                    do_sample=True
                )

            response_text = self.tokenizer.decode(
                outputs[0], 
                skip_special_tokens=True
            )

            # Calculate token usage for monitoring
            input_tokens = len(inputs.input_ids[0])
            output_tokens = len(outputs[0]) - input_tokens

            return {
                'content': response_text,
                'usage': {
                    'prompt_tokens': input_tokens,
                    'completion_tokens': output_tokens,
                    'total_tokens': input_tokens + output_tokens
                },
                'model': "gpt2"
            }

        except Exception as e:
            self.logger.error(f"Error in LLM completion: {str(e)}")
            raise

在上面的代码中,我们使用 /api/v1 前缀下的 api.routes 创建了一个 FastAPI 应用程序和路由器。启用日志记录以显示带有时间戳的信息消息。该应用程序将使用 Uvicorn 运行 localhost:8000,并启用热重载。

运行您的应用程序

现在所有组件都已就位,让我们开始启动并运行您的应用程序。首先,在项目根目录中创建一个 .env 文件并添加您的 HUGGINGFACE_API_KEY 和 REDIS_URL:

mkdir llm_integration && cd llm_integration
python3 -m venv env
syource env/bin/activate

然后确保 Redis 正在您的计算机上运行。在大多数基于 Unix 的系统上,您可以使用以下命令启动它:

transformers==4.36.0
huggingface-hub==0.19.4
redis==4.6.0
pydantic==2.5.0
pydantic-settings==2.1.0
tenacity==8.2.3
python-dotenv==1.0.0
fastapi==0.104.1
uvicorn==0.24.0
torch==2.1.0
numpy==1.24.3

现在您可以开始申请:

pip install -r requirements.txt

您的 FastAPI 服务器将开始在 http://localhost:8000 上运行。自动 API 文档将在 http://localhost:8000/docs 上提供 - 这对于测试您的端点非常有帮助!

Integrating Large Language Models in Production Applications

测试您的内容审核 API

让我们用真实的请求来测试您新创建的 API。打开一个新终端并运行以下curl命令:

llm_integration/
├── core/
│   ├── llm_client.py      # your main LLM interaction code
│   ├── prompt_manager.py  # Handles prompt templates
│   └── response_handler.py # Processes LLM responses
├── cache/
│   └── redis_manager.py   # Manages your caching system
├── config/
│   └── settings.py        # Configuration management
├── api/
│   └── routes.py          # API endpoints
├── utils/
│   ├── monitoring.py      # Usage tracking
│   └── rate_limiter.py    # Rate limiting logic
├── requirements.txt
└── main.py
└── usage_logs.json       

您应该在终端上看到如下响应:

Integrating Large Language Models in Production Applications

添加监控和分析

现在让我们添加一些监控功能来跟踪应用程序的执行情况以及正在使用的资源量。将以下代码添加到您的 utils/monitoring.py 文件中:

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Dict, Optional
import logging

class LLMClient:
    def __init__(self, model_name: str = "gpt2", timeout: int = 30):
        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                device_map="auto",
                torch_dtype=torch.float16
            )
        except Exception as e:
            logging.error(f"Error loading model: {str(e)}")
            # Fallback to a simpler model if the specified one fails
            self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
            self.model = AutoModelForCausalLM.from_pretrained("gpt2")

        self.timeout = timeout
        self.logger = logging.getLogger(__name__)

UsageMonitor 类将执行以下操作:

  • 使用时间戳跟踪每个 API 请求
  • 记录代币使用情况以进行成本监控
  • 测量响应时间
  • 将所有内容存储在结构化日志文件中(在将应用程序部署到生产环境之前将其替换为数据库)

接下来,添加一个新的方法来计算使用情况统计:

 @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        reraise=True
    )
    async def complete(self, 
                      prompt: str, 
                      temperature: float = 0.7,
                      max_tokens: Optional[int] = None) -> Dict:
        """Get completion from the model with automatic retries"""
        try:
            inputs = self.tokenizer(prompt, return_tensors="pt").to(
                self.model.device
            )

            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=max_tokens or 100,
                    temperature=temperature,
                    do_sample=True
                )

            response_text = self.tokenizer.decode(
                outputs[0], 
                skip_special_tokens=True
            )

            # Calculate token usage for monitoring
            input_tokens = len(inputs.input_ids[0])
            output_tokens = len(outputs[0]) - input_tokens

            return {
                'content': response_text,
                'usage': {
                    'prompt_tokens': input_tokens,
                    'completion_tokens': output_tokens,
                    'total_tokens': input_tokens + output_tokens
                },
                'model': "gpt2"
            }

        except Exception as e:
            self.logger.error(f"Error in LLM completion: {str(e)}")
            raise

更新您的API以添加UsageMonitor类中的监控功能:

from typing import Dict
import logging

class ResponseHandler:
    def __init__(self):
        self.logger = logging.getLogger(__name__)

    def parse_moderation_response(self, raw_response: str) -> Dict:
        """Parse and structure the raw LLM response for moderation"""
        try:
            # Default response structure
            structured_response = {
                "is_appropriate": True,
                "confidence_score": 0.0,
                "reason": None
            }

            # Simple keyword-based analysis
            lower_response = raw_response.lower()

            # Check for inappropriate content signals
            if any(word in lower_response for word in ['inappropriate', 'unsafe', 'offensive', 'harmful']):
                structured_response["is_appropriate"] = False
                structured_response["confidence_score"] = 0.9
                # Extract reason if present
                if "because" in lower_response:
                    reason_start = lower_response.find("because")
                    structured_response["reason"] = raw_response[reason_start:].split('.')[0].strip()
            else:
                structured_response["confidence_score"] = 0.95

            return structured_response

        except Exception as e:
            self.logger.error(f"Error parsing response: {str(e)}")
            return {
                "is_appropriate": True,
                "confidence_score": 0.5,
                "reason": "Failed to parse response"
            }

    def format_response(self, raw_response: Dict) -> Dict:
        """Format the final response with parsed content and usage stats"""
        try:
            return {
                "content": self.parse_moderation_response(raw_response["content"]),
                "usage": raw_response["usage"],
                "model": raw_response["model"]
            }
        except Exception as e:
            self.logger.error(f"Error formatting response: {str(e)}")
            raise

现在,通过运行以下curl 命令来测试您的 /stats 端点:

import redis
from typing import Optional, Any
import json
import hashlib

class CacheManager:
    def __init__(self, redis_url: str, ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl

    def _generate_key(self, prompt: str, params: dict) -> str:
        """Generate a unique cache key"""
        cache_data = {
            'prompt': prompt,
            'params': params
        }
        serialized = json.dumps(cache_data, sort_keys=True)
        return hashlib.sha256(serialized.encode()).hexdigest()

    async def get_cached_response(self, 
                                prompt: str, 
                                params: dict) -> Optional[dict]:
        """Retrieve cached LLM response"""
        key = self._generate_key(prompt, params)
        cached = self.redis.get(key)
        return json.loads(cached) if cached else None

    async def cache_response(self, 
                           prompt: str, 
                           params: dict, 
                           response: dict) -> None:
        """Cache LLM response"""
        key = self._generate_key(prompt, params)
        self.redis.setex(
            key,
            self.ttl,
            json.dumps(response)
        )

上面的命令将向您显示 /moderate 端点上的请求的统计信息,如下面的屏幕截图所示:

Integrating Large Language Models in Production Applications

结论

通过本教程,我们学习了如何在生产应用程序中使用大型语言模型。您实现了 API 客户端、缓存、提示管理和错误处理等功能。作为这些概念的示例,您开发了一个内容审核系统。

现在您已经有了坚实的基础,您可以通过以下方式增强您的系统:

  • 实时应用程序的流式响应
  • A/B 测试以立即改进
  • 基于网络的界面来管理提示
  • 自定义模型微调
  • 与第三方监控服务集成

请记住,在示例中您使用了 ChatGPT2 模型,但您可以调整该系统以与任何 LLM 提供商合作。因此,请选择满足您的要求且在预算之内的型号。

如果您有疑问或想告诉我您正在使用此系统构建什么,请随时与我联系。

编码愉快! ?

以上是在生产应用程序中集成大型语言模型的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn