一个现代化的 Python 库,为 Loguru 提供与 Plumelog 系统的集成功能,支持异步 Redis 日志传输。
- 🚀 异步处理: 基于
asyncio的高性能异步日志传输,对业务代码无阻塞。 - 📦 智能批量: 聚合日志记录,按数量或时间间隔批量发送,大幅减少网络开销。
- 🔒 类型安全: 完整的 Python 3.10+ 类型提示,享受现代 IDE 的智能提示与静态检查。
- 🔄 智能重试: 内置指数退避重试机制;重试耗尽后会显式记录丢弃数量,避免关闭流程无限阻塞。
- 🏊 连接池: 高效管理 Redis 连接,提升高并发场景下的性能和稳定性。
- ⚙️ 灵活配置: 基于 Pydantic 的配置模型,支持环境变量,易于在不同环境中部署。
- 🧵 线程安全: 专为多线程环境设计,保证在复杂应用中安全运行。
使用 uv 安装(推荐):
uv add plumelog-loguru使用 pip 安装:
pip install plumelog-logurufrom loguru import logger
from plumelog_loguru import create_redis_sink
# 使用默认配置添加 Redis sink
# 推荐添加 type: ignore 注释以兼容静态类型检查器
logger.add(create_redis_sink()) # type: ignore[arg-type]
# 开始记录日志,这些日志将被异步发送到 Redis
logger.info("Hello, Plumelog!")
logger.error("这是一个错误日志,包含堆栈信息。")from loguru import logger
from plumelog_loguru import create_redis_sink, PlumelogSettings
# 创建自定义配置实例
config = PlumelogSettings(
app_name="my_awesome_app",
env="production",
redis_host="redis.example.com",
redis_port=6379,
redis_password="your_secret_password",
batch_size=200, # 提高批量大小
batch_interval_seconds=1.0, # 缩短发送间隔
)
# 使用自定义配置创建 sink
redis_sink = create_redis_sink(config)
logger.add(redis_sink) # type: ignore[arg-type]
logger.info("日志已配置为生产环境。")在 asyncio 应用中,可以使用异步上下文管理器来确保所有缓冲区的日志在程序退出前被完全发送。
import asyncio
from loguru import logger
from plumelog_loguru import RedisSink, PlumelogSettings
async def main():
config = PlumelogSettings(app_name="async_app")
# 使用 async with 确保 sink 在退出时优雅关闭
async with RedisSink(config) as sink:
logger.add(sink) # type: ignore[arg-type]
logger.info("这是一条在异步环境中记录的日志。")
# 在此期间,日志在后台发送
await asyncio.sleep(0.5)
logger.warning("应用即将退出。")
# 上下文结束时,RedisSink 会自动处理并发送所有剩余日志
print("所有日志已刷新。")
asyncio.run(main())所有配置项均可通过 PLUMELOG_ 前缀的环境变量进行设置。
| 配置项 | 环境变量 | 默认值 | 说明 |
|---|---|---|---|
app_name |
PLUMELOG_APP_NAME |
"default" |
应用名称,用于日志归属分类 |
env |
PLUMELOG_ENV |
"dev" |
应用运行环境 (如: dev, test, prod) |
redis_host |
PLUMELOG_REDIS_HOST |
"localhost" |
Redis 主机地址 |
redis_port |
PLUMELOG_REDIS_PORT |
6379 |
Redis 端口 |
redis_db |
PLUMELOG_REDIS_DB |
0 |
Redis 数据库编号 |
redis_password |
PLUMELOG_REDIS_PASSWORD |
None |
Redis 密码 |
redis_key |
PLUMELOG_REDIS_KEY |
"plume_log_list" |
日志存储的 Redis List 键名 |
batch_size |
PLUMELOG_BATCH_SIZE |
100 |
触发批量发送的日志数量阈值 |
batch_interval_seconds |
PLUMELOG_BATCH_INTERVAL_SECONDS |
2.0 |
触发批量发送的时间间隔(秒) |
queue_max_size |
PLUMELOG_QUEUE_MAX_SIZE |
10000 |
内存中转队列的最大容量 |
retry_count |
PLUMELOG_RETRY_COUNT |
3 |
Redis 操作失败时的最大重试次数 |
retry_delay |
PLUMELOG_RETRY_DELAY |
2.0 |
首次重试延迟,后续按指数退避 |
socket_connect_timeout |
PLUMELOG_SOCKET_CONNECT_TIMEOUT |
5.0 |
Redis 连接建立超时 |
socket_timeout |
PLUMELOG_SOCKET_TIMEOUT |
5.0 |
Redis 命令读写超时 |
temp_buffer_max_size |
PLUMELOG_TEMP_BUFFER_MAX_SIZE |
1000 |
临时缓存最大容量 |
max_connections |
PLUMELOG_MAX_CONNECTIONS |
5 |
Redis 连接池的最大连接数 |
本库遵循现代高性能应用的设计原则,旨在提供一个可靠且对业务无侵入的日志解决方案。
- 生产者-消费者模型: 用户应用代码(生产者)与日志发送任务(消费者)通过一个异步队列完全解耦。
- 专用异步运行时:
RedisSink启动独立守护线程托管asyncio事件循环,确保多线程环境也能安全入队。 - 非阻塞 I/O: 所有网络操作(Redis 通信)均基于
asyncio,不会阻塞主业务线程。 - 批处理优化: 聚合多条日志为单次网络请求,显著降低 I/O 压力和 Redis
OPS。 - 优雅降级: 内置有限重试、线程安全临时缓存与容错机制;当 Redis 长时间不可用时,会显式记录被丢弃的日志数量,避免消费者和关闭流程无限重试。
下图展示了从日志产生到最终存储的完整数据流和核心组件。
sequenceDiagram
participant UA as 用户应用
participant RS as RedisSink
participant AQ as AsyncQueue
participant BP as Consumer Task
participant RC as AsyncRedisClient
participant RD as Redis
UA->>RS: 1 logger.info(...) 同步(非阻塞)
RS->>AQ: 2 入队(异步)
note right of AQ: 内存队列 解耦生产/消费
BP->>AQ: 拉取日志批次
BP->>RC: 4 批量发送
RC->>RD: 5 LPUSH plume_log_list
RD-->>RC: OK
RC-->>BP: 结果
处理流程解读:
- 同步调用: 你的业务代码调用
logger.info()。RedisSink会先做日志转换,再将发送任务交给专用事件循环处理。 - 异步入队: 日志记录被线程安全地投递到专用事件循环中的
asyncio.Queue。提交过程使用asyncio.run_coroutine_threadsafe;当后台积压超过上限时,会退回到有界临时缓存并打印告警。 - 后台处理 (未在图中编号): 专用线程中的后台任务(Consumer Task)持续监控队列。
- 批量发送: 当队列中的日志达到一定数量(
batch_size)或经过一定时间(batch_interval_seconds),后台任务会将它们打包。 - 持久化存储: 打包后的日志通过
AsyncRedisClient使用连接池中的连接,以LPUSH命令高效地一次性写入 Redis。如果发生网络错误,将触发智能重试。
下图深入剖析了同步区域和异步区域的交互细节。
sequenceDiagram
autonumber
participant User as 用户代码
participant Sink as RedisSink
participant Ext as FieldExtractor
participant Queue as AsyncQueue
participant Task as ConsumerTask
participant Client as RedisClient
participant Redis as Redis
User->>Sink: logger.info(msg)
Sink->>Ext: 提取运行字段
Ext-->>Sink: 字段数据
Sink->>Queue: 线程安全投递(LogRecord)
Sink-->>User: 返回 (耗时 <1ms)
rect rgb(245,245,245)
note over Task,Client: 后台异步循环
Task->>Queue: 批量获取 (数量/超时/压力)
Task->>Task: JSON 序列化
Task->>Client: send(batch)
Client->>Redis: LPUSH 批量
Redis-->>Client: OK
Client-->>Task: 成功
end
RedisSink 会在首次使用时启动一个守护线程,在线程内初始化并运行独立的 asyncio 事件循环。所有日志转换、入队和 Redis 发送任务都会调度到该事件循环执行:
- 调用线程无需管理事件循环,适配同步、多线程和异步混合场景。
- 通过
asyncio.run_coroutine_threadsafe完成线程间调度,确保提交动作具备跨线程的可见性和可靠性。 - 若后台提交积压超过上限,线程安全的
deque临时缓存会接管,并保留最近的一批日志等待关闭阶段尽力发送。
关键优势:
- 主线程保护: 专用事件循环线程承担主要异步工作。正常情况下
logger.info()只做本地转换和后台投递;当后台积压超过上限时会进入有界临时缓存并打印告警,而不是继续无界堆积任务。 - 资源效率: 批量发送大大减少了网络往返次数和系统调用,降低了 CPU 和网络资源的消耗。连接池避免了频繁创建和销毁连接的开销。
- 数据可靠性: 内存队列配合线程安全的临时缓存
deque作为双层缓冲,指数退避重试机制在网络抖动时通过逐渐增加等待时间来恢复,大大提高了日志发送的成功率。
classDiagram
class PlumelogSettings {
+str app_name
+str env
+str redis_host
+int redis_port
+int redis_db
+str|None redis_password
+str redis_key
+int batch_size
+float batch_interval_seconds
+int queue_max_size
+int retry_count
+int max_connections
}
class LogRecord {
+str app_name
+str env
+str server_name
+str method
+str content
+str log_level
+str class_name
+str thread_name
+int seq
+str date_time
+int dt_time
+to_dict()
}
class FieldExtractor {
+get_server_name() str
+get_host_name() str
+get_thread_name() str
+get_caller_info(depth) CallerInfo
+get_next_seq() int
-_seq_counter
}
class AsyncRedisClient {
+connect()
+send_log_record(record: LogRecord) bool
+send_log_records(records: list[LogRecord]) bool
+disconnect()
-_pool
}
class AsyncQueue {
+put_nowait(item)
+get() LogRecord
+qsize() int
+full() bool
+empty() bool
}
class RedisSink {
+__call__(record)
+close()
-_ensure_initialized()
-_log_consumer()
-_queue: AsyncQueue
-_client: AsyncRedisClient
-_extractor: FieldExtractor
-_buffer: list[LogRecord]
}
RedisSink --> PlumelogSettings : uses
RedisSink *-- AsyncQueue
RedisSink *-- AsyncRedisClient
RedisSink *-- FieldExtractor
RedisSink --> LogRecord : creates
AsyncRedisClient --> PlumelogSettings
FieldExtractor --> LogRecord
- 内存: 主要由异步队列 (
queue_max_size) 决定。默认10000条日志大约占用 10-20MB 内存,具体取决于日志大小。 - CPU: 非常低。大部分时间处于 I/O 等待状态,仅在日志格式化和序列化时有少量计算。
- 网络: 连接池保持少量(
max_connections)长连接,批量发送机制极大节省了网络带宽。
- Redis 连接中断:
AsyncRedisClient捕获连接错误。- 启动指数退避重试机制(例如,等待 1s, 2s, 4s... 后重试)。
- 在此期间,新的日志继续进入内存队列进行缓冲。
- 如果 Redis 在重试期间恢复,积压的日志将被发送。
- 如果达到最大重试次数后依然失败,该批次日志将被丢弃,并记录丢弃数量到标准输出,防止无限重试耗尽资源。
- 内存队列满:
- 如果日志生产速度持续高于消费速度(例如 Redis 长时间不可用),后台提交任务会先达到上限。
- 达到上限后,
RedisSink会把新日志转入有界临时缓存,并打印一条警告;临时缓存也满时会淘汰最旧日志。这是一种**有界背压(Backpressure)**机制,用于保护应用内存,防止任务无界堆积。
为了验证本组件在高并发场景下对业务应用 CPU 的影响,项目内置了纯 CPU 负载压力测试脚本。该测试Mock 掉了真实的 Redis 网络传输,专注于测试组件本身的队列调度、线程通讯、序列化等本地开销。
你可以随时在本地运行压力测试(通过 uv 会自动下载 psutil 依赖,不污染项目环境):
uv run --with psutil python scripts/benchmark_cpu.py在现代多核设备上,4 个业务并发线程的满载压测结果如下:
- 吞吐量 (QPS):
~13,000 条/秒 - 平均 CPU 开销: 约
117%(多核环境下,这意味着 4 个疯狂写入的主线程加上 1 个后台消费者线程,总计才用满了一个多一点的 CPU 核心)
💡 核心结论:在高达 1.3 万 QPS 的极限压测下,CPU 占用率表现非常优秀。在通常的实际业务应用场景(比如几十到几百 QPS)中,Plumelog-Loguru 对系统 CPU 的额外影响 几乎可以忽略不计(不到 1%),完全可以放心使用,绝不会成为性能瓶颈或导致 CPU 飙升。
# 克隆项目
git clone <repository-url>
cd plumelog-loguru
# 使用 uv (推荐) 或 pip 安装开发依赖
uv sync --dev
# 运行测试
uv run pytest
# 代码格式化
uv run ruff format .
# 类型检查与静态检查
uv run ruff check .
uv run mypy src/src/plumelog_loguru/
├── __init__.py # 主要 API 导出
├── config.py # Pydantic 配置模型
├── models.py # 日志数据模型
├── extractor.py # 上下文信息提取器
├── redis_client.py # 异步 Redis 客户端 (含连接池和重试)
└── redis_sink.py # Loguru Sink 实现 (含队列和消费者任务)
本项目基于 Apache License 2.0 许可。
欢迎通过提交 Issue 和 Pull Request 来为项目做出贡献!