kafka-python
常用方法
| 方法 | 作用 | 考点 |
|---|---|---|
send(record) |
异步发送消息,返回 Future(Java)或直接发送(Python 也返回 Future)。不会阻塞。 |
异步 vs 同步:send().get() 会阻塞变成同步。 |
send(record, callback) |
异步发送并提供回调函数,处理发送成功/失败后的逻辑(如记录失败消息、重试)。 | 生产者保证可靠性的常用手段。 |
flush() |
强制刷新缓冲区,阻塞直到所有未完成消息被确认。 | 优雅关闭前确保数据不丢。 |
close() |
关闭生产者,释放资源。会先自动 flush(),然后关闭网络连接。可设置超时。 |
一定要记得调用,否则可能丢失缓冲中的消息。 |
partitionsFor(topic) |
获取某个 topic 的所有分区元数据(分区数、leader 副本等)。 | 用于动态路由或测试。 |
metrics() |
获取生产者内部监控指标(如发送速率、重试次数、请求延迟)。 | 结合 Prometheus 做监控。 |
pause(partitions) / resume(partitions) |
暂停/恢复向特定分区发送消息(流量控制)。 | 高级用法,比如下游限流时暂时停止生产。 |
Q:flush() 和 close() 有什么区别?**
flush()只保证当前缓冲区清空,但生产者还可以继续发送消息。close()会先flush(),然后关闭生产者,之后不能再 send。
Q:如果不调用 flush() 直接退出程序会怎样?**
- 如果
linger.ms较大且缓冲区未满,最后一批消息可能永远发不出去,数据丢失。 - 最佳实践:退出前调用
flush()或close()。
Q:flush() 会一直阻塞吗?**
- 会阻塞直到所有待发送消息都收到 broker 确认(或超时/失败)。如果 broker
挂掉,可能永久阻塞,所以生产环境会配合
max.block.ms设置超时。
Q:性能影响?**
flush()会破坏批处理优势,降低吞吐量。只在必要时(如切换 topic、程序结束前)使用,不要每条消息都 flush。
踩坑
问题现象:生成10000条简单的文本消息,大小约1.2GB,远超过Kafka默认的消息大小限制100MB
同样消息,在没有压缩、没有批处理配置、没有缓冲优化的情况下。
改用confluent-kafka,生产者发送消息的速度明显快,发送成功
kafka-python vs confluent_kafka
| 特性 | kafka-python | confluent_kafka |
|---|---|---|
| 性能 | 纯Python实现,性能较低 | C库librdkafka,性能高3-5倍 |
| 内存使用 | 较高,Python对象开销大 | 较低,C级别内存管理 |
| 压缩支持 | 基础支持 | 优化的压缩算法 |
| 批处理 | 基础批处理 | 高级批处理和背压控制 |
| 可靠性 | 一般 | 企业级可靠性 |
建议
建议使用 confluent_kafka**,原因:
- 处理大消息性能更好
- 内存使用更高效
- 内置优化的批处理和压缩
- 更好的错误处理和重试机制