一个消息队列连接泄漏问题浅析
- Authors
- @SLIPPERTOPIA
在一个小项目中,笔者用到了 RabbitMQ 进行任务的异步处理。 RabbitMQ 服务搭建在一个 VPS 服务上,客户端使用了 Python 的 aio_pika
库进行消息的发送和接收。
但用着用着,发现 rabbitmq 服务连接数量越来越多,有时候甚至达到了 200 多个,但实际上这个项目中只有两个消费任务, 6 个消费者在消费。
一开始我还以为是 aio_pika
库的问题,心想这也太拉胯了,开源果然是最贵的,但研究了一下发现是我使用姿势不对。
项目中的使用方式
一开始的 publisher 及 consumer 实现如下:
class ConnectionConfig(BaseModel):
pass
class BaseProducer:
def __init__(self, config: ConnectionConfig):
self.config = config
self.connection: typing.Optional[aio_pika.Connection] = None
self.channel: typing.Optional[aio_pika.Channel] = None
self.queue: typing.Optional[aio_pika.Queue] = None
self._is_connected: bool = False
async def connect(self) -> None:
try:
if not self._is_connected:
self.connection = await aio_pika.connect_robust(
self.config.url,
timeout=self.config.connection_timeout
)
self.channel = await self.connection.channel()
self.queue = await self.channel.declare_queue(self.config.queue_name)
self._is_connected = True
logger.info(f"Connected to queue: {self.config.queue_name}")
except Exception as e:
self._is_connected = False
logger.error(f"Connection error: {str(e)}")
class BaseConsumer:
def __init__(self, config: ConnectionConfig):
self.config = config
self.connection: typing.Optional[aio_pika.Connection] = None
self.channel: typing.Optional[aio_pika.Channel] = None
self.queue: typing.Optional[aio_pika.Queue] = None
self._is_connected: bool = False
async def connect(self) -> None:
try:
if not self._is_connected:
self.connection = await aio_pika.connect_robust(
self.config.url,
timeout=self.config.connection_timeout
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=self.config.prefetch_count)
self.queue = await self.channel.declare_queue(self.config.queue_name)
self._is_connected = True
logger.info(f"Connected to queue: {self.config.queue_name}")
except Exception as e:
self._is_connected = False
logger.error(f"Connection error: {str(e)}")
async def process_queue(self) -> typing.AsyncGenerator[bytes, None]:
try:
await self.connect()
async with self.queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process() as processed_message:
yield processed_message.body
except Exception as e:
self._is_connected = False
logger.error(f"Connection error: {str(e)}")
这里就有一个很明显的但笔者一开始没有意识到的问题,那就是 connection
和 channel
在 Exception
里没有主动关闭。 那在消费异常时,consumer 会不断创建新的连接,而旧的连接仍然保留着,从而导致连接数越来越多。
Q: 那为什么连接(connection)不自己关闭呢?
A: 一句话来说,应用层的异常不会影响连接层的正常运行,二者是解耦的,这是设计中的基操。
从资源管理的角度上讲,在很多编程实践中,特别是涉及网络、文件等外部资源时,资源的释放(关闭)通常需要显式地进行管理。 原因有以下几点:
- 连接状态的复杂性:网络连接有多种状态(如 ESTABLISHED、CLOSE_WAIT 等),系统无法自动判断 app 何时真正"完成"了连接交互。
- 资源管理的可预测性:显式关闭让开发者可以在最合适的时机释放资源,避免资源泄漏或过早释放。
- 连接复用需求:在高性能应用中,连接通常被重复使用(如连接池),而不是每次通信后就关闭。
- 错误处理的需要:允许开发者处理关闭过程中可能出现的错误,如文件锁或租约问题。
所以即使在消费时出现异常,比如拿到的数据不合法,也不会影响连接层的正常运行。也就是说,虽然你的代码崩了,但连接还在。
修复
因此正确的做法是,在应用层(消费时)出现问题时,把异常报出来了,然后继续处理下一个消息。
对于连接的创建,要么通过资源管理器(async with
)自动关闭连接,要么使用连接池进行连接复用。
第一种适合短期、单次使用的连接需求。比如 publish 消息时,如果频率不是很高,消息发送后,连接就可以关闭了,下次使用时再重新创建,这样可以减少资源占用。
async def publish(self, message_list: list[bytes]) -> None:
async with self.connection:
async with self.channel:
for message in message_list:
await self.channel.default_exchange.publish(
aio_pika.Message(body=message),
routing_key=self.config.queue_name
)
第二种适合适合需要频繁创建连接的高并发应用,可以显著减少连接建立的延迟和资源消耗。同时也可以限制最大连接数,防止资源耗尽。
connection_pool = Pool(get_connection, max_size=2)
channel_pool = Pool(get_channel, max_size=2)
async with connection_pool, channel_pool:
async for message in self.queue.iterator():
async with message.process() as processed_message:
yield processed_message.body
小结
资源管理是一个容易被忽视的问题,特别是在异步编程中。在开发时,还是需要多注意一下的,不然就会像笔者一样,被坑了才发现。