Appearance
我们将借助一个实际部署于生产环境的服务来讲解Ray的使用,服务描述:基于BERT模型的语义识别服务,服务于超大规模的用户查询请求,日均调用量在亿级别以上。
这是一个非常经典且适合使用 Ray 来解决的场景。将基于BERT的语义识别服务部署到亿级调用量的生产环境中,挑战主要在于:
- 模型推理成本高:BERT模型大,单次推理耗时较长,对计算资源(尤其是GPU)需求高。
- 高并发和吞吐量:亿级日均调用量意味着峰值QPS(每秒查询数)可能达到数千甚至上万。
- 弹性伸缩:用户查询量通常有波峰波谷,需要服务能自动扩缩容以节省成本。
- 低延迟:用户体验要求毫秒级的响应。
Ray,特别是其内置的 Ray Serve 库,就是为了解决这些问题而设计的。它能让你用纯Python代码,轻松地将机器学习模型转化为一个可水平扩展、高可用、低延迟的在线服务。
核心思路:利用 Ray Serve 的三大法宝
- Actor模型 & 水平扩展 (Horizontal Scaling):Ray Serve 会将你的模型服务封装成一个或多个副本 (Replica)。每个副本都是一个独立的Ray Actor(一个有状态的Python进程),可以独立处理请求。你可以轻松增加副本数量,将它们分布在集群的多台机器、多个GPU上,从而实现水平扩展,提高总吞吐量。
- 动态请求批处理 (Dynamic Request Batching):这是针对BERT这类模型进行GPU优化的关键。单个请求无法充分利用GPU的并行计算能力。Ray Serve 的
@serve.batch
装饰器能自动将短时间内到达的多个独立请求打包成一个批次(Batch),然后一次性送入GPU进行推理,极大提升GPU利用率和吞吐量。 - 自动扩缩容 (Autoscaling):根据实时的请求负载(如排队请求数),Ray Serve可以自动增加或减少副本数量,实现资源的高效利用。高峰期自动扩容,低谷期自动缩容,为你节省大量成本。
Step-by-Step 部署指南
第一步:环境准备
首先,你需要安装必要的库。
bash
# 安装 Ray 及其服务部署库 Ray Serve
pip install "ray[serve]"
# 安装 PyTorch 和 Transformers 库
pip install torch transformers
# 如果你有NVIDIA GPU,请确保安装了CUDA版本的PyTorch
# 访问 https://pytorch.org/get-started/locally/ 获取正确的安装命令
第二步:编写模型服务代码
我们将创建一个Python类来封装模型的加载和推理逻辑。这是整个服务的核心。
python
# deploy_bert.py
import ray
from ray import serve
from starlette.requests import Request
from typing import List
from transformers import pipeline
# 1. 使用 @serve.deployment 装饰器定义一个服务
# - num_replicas: 初始副本数,可以根据负载调整
# - ray_actor_options: 为每个副本(Actor)分配资源,这里是关键!我们为每个副本分配一个GPU。
@serve.deployment(
num_replicas=2, # 启动2个副本
ray_actor_options={"num_gpus": 1} # 每个副本使用1个GPU
)
class SemanticBERT:
def __init__(self):
# 构造函数 __init__ 在每个副本启动时仅运行一次
# 这是加载大模型的最佳位置,避免了每次请求都加载
print("Initializing model...")
self.model = pipeline(
"text-classification",
model="bert-base-uncased", # 你可以换成你自己的模型
device=0 # Ray Serve 会自动将该副本分配到一个有GPU的节点上,device=0即可
)
print("Model initialized.")
# 2. 定义推理逻辑,并使用 @serve.batch 进行批处理优化
# - max_batch_size: 每个批次的最大请求数
# - batch_wait_timeout_s: 等待收集请求以形成批次的最长时间(秒)
# 这两个参数需要根据你的模型和负载进行调优
@serve.batch(max_batch_size=32, batch_wait_timeout_s=0.2)
async def recognize_batch(self, queries: List[str]) -> List[dict]:
# 这个方法现在接收的是一个字符串列表 (List[str])
print(f"Processing batch of size: {len(queries)}")
# 使用pipeline进行批量推理,效率远高于循环单次推理
results = self.model(queries)
return results
# 3. 定义一个对外的API接口
# Starlette的Request对象可以让你获取请求的各种信息
async def __call__(self, request: Request) -> List[dict]:
# 可以是单个查询,也可以是JSON列表
if await request.body():
queries = await request.json()
else:
return [{"error": "No query provided in request body."}]
if isinstance(queries, str):
queries = [queries]
# 这里调用的是我们上面定义的批处理方法
# Ray Serve 会自动处理请求的打包和分发
results = await self.recognize_batch(queries)
return results
# 4. 将你的部署绑定到一个应用,这样Ray Serve才能运行它
app = SemanticBERT.bind()
代码要点解释:
@serve.deployment
: 这是定义一个Ray Serve部署单元的核心。num_replicas
: 静态地指定启动多少个模型副本。对于亿级流量,你会需要一个更大的数字,或者使用下面的自动扩缩容。ray_actor_options={"num_gpus": 1}
: 这是告诉Ray,每个SemanticBERT
副本都需要1个GPU。Ray集群的调度器会自动在有可用GPU的节点上启动这些副本。这是实现GPU加速的关键。
__init__(self)
: 模型加载等一次性、重量级的操作都放在这里。它只在副本创建时执行一次。@serve.batch
: Ray Serve的性能优化神器。它会自动拦截进入recognize_batch
的单个请求,将它们凑成一个最大max_batch_size
的批次,或者在等待batch_wait_timeout_s
秒后,将当前收集到的所有请求作为一个批次进行处理。你只需要编写处理批次数据的逻辑即可。__call__(self, ...)
: 这是服务处理HTTP请求的入口点。它接收一个请求,解析出查询,然后调用内部的批处理方法。Ray Serve会自动将__call__
的调用路由到recognize_batch
。
第三步:启动和测试服务
启动Ray集群: 在本地测试时,Ray会自动启动一个临时集群。在生产中,你需要在一个多节点集群上启动Ray(见后文)。
运行服务: 在终端中,使用
serve run
命令来启动你的服务。bashserve run deploy_bert:app
服务默认会启动在
http://127.0.0.1:8000
。测试服务:
bashcurl -X POST -H "Content-Type: application/json" -d '["This is a great movie!", "I did not like the book."]' http://127.0.0.1:8000/
迈向生产:处理亿级流量的架构
对于日均亿级的调用量,你需要一个真正的分布式集群和更高级的配置。
1. 生产级集群管理:KubeRay
在生产环境中,强烈建议使用 Kubernetes 来管理你的计算资源,并使用 KubeRay 来在Kubernetes上部署和管理Ray集群。
- KubeRay是什么:一个让你在K8s上轻松部署和管理Ray集群的工具(Operator)。
- 为什么用它:
- 生命周期管理:自动处理Ray集群的创建、删除和升级。
- 容错性:当某个运行Ray的节点(Pod)失败时,KubeRay会自动在其他地方重启它。
- 生态整合:与Kubernetes的日志(Logging)、监控(Monitoring, e.g., Prometheus)、服务发现等生态无缝集成。
2. 配置自动扩缩容 (Autoscaling)
对于有流量波峰波谷的业务,静态设置num_replicas
是不经济的。你需要自动扩缩容。
修改@serve.deployment
的配置:
python
# deploy_bert_autoscaling.py
# ... (imports and other code are the same)
@serve.deployment(
autoscaling_config={
"min_replicas": 8, # 至少保留8个副本,应对突发流量
"max_replicas": 100, # 最多扩展到100个副本(根据你的集群规模设定)
"target_num_ongoing_requests_per_replica": 50, # 每个副本处理50个并发请求时就考虑扩容
},
ray_actor_options={"num_gpus": 1}
)
class SemanticBERT:
# ... (the rest of the class is the same)
# ...
app = SemanticBERT.bind()
autoscaling_config
解释:
min_replicas
: 流量低谷时最少保留的副本数。max_replicas
: 流量高峰时最多可以扩展到的副本数。这个值受限于你的集群总资源(总GPU数)。target_num_ongoing_requests_per_replica
: 这是扩缩容决策的核心指标。当平均每个副本正在处理的请求数超过这个阈值时,Ray Serve就会开始增加新的副本。当低于这个值时,就会考虑缩减副本。
3. 模型性能优化
为了进一步提升性能和降低成本,可以考虑:
- 模型量化/剪枝:使用技术减小模型大小和计算量。
- 使用更快的推理引擎:将你的PyTorch模型转换为ONNX格式,并使用ONNX Runtime或TensorRT进行推理。这些引擎通常比原生的PyTorch有更高的性能。Ray Serve可以轻松集成这些引擎。
- 使用优化过的模型:如
DistilBERT
,它比BERT-base
更小更快,同时在许多任务上保持了相当的精度。
最终架构图
你的生产架构看起来会是这样:
User Request
|
v
[ Kubernetes Ingress / Load Balancer ] <-- (Nginx, Traefik, etc.)
|
v
[ Ray Serve HTTP Proxy Actor ] <-- (Ray Serve automatically creates and manages)
| (Request Routing)
|-------------------------------------------------|
| | |
v v v
[ Replica 1 ] [ Replica 2 ] ... [ Replica N ] (Running on different nodes/GPUs)
(SemanticBERT Actor) (SemanticBERT Actor) (SemanticBERT Actor)
- GPU 1 - GPU 2 - GPU N
- Model loaded - Model loaded - Model loaded
- Processes batches - Processes batches - Processes batches
^ ^ ^
| (Autoscaling) | |
|-------------------------------------------------|
|
[ Ray Autoscaler ] <-- (Monitor queue length, decide to add or remove Replica)
|
v
[ Ray Cluster on Kubernetes (managed by KubeRay) ]
总结
使用Ray/Ray Serve部署你的大规模BERT服务,最佳实践流程如下:
- 代码层面:
- 将模型加载逻辑放在
__init__
中。 - 使用
@serve.batch
开启动态批处理,并调优max_batch_size
和batch_wait_timeout_s
。 - 在
@serve.deployment
中为每个副本通过ray_actor_options
申请GPU资源。
- 将模型加载逻辑放在
- 部署层面:
- 在生产环境中,使用KubeRay在Kubernetes上部署和管理Ray集群,以获得高可用性和可维护性。
- 使用
autoscaling_config
来配置弹性伸缩,以应对流量波动并节约成本。
- 优化层面:
- 考虑使用TensorRT/ONNX等推理引擎加速模型。
- 选择或训练更轻量的模型,如DistilBERT。
通过这套方案,你可以构建一个既能满足亿级调用量需求,又具备高可用和成本效益的顶级AI服务。