跳转至

kafka-python

常用方法

方法 作用 考点
send(record) 异步发送消息,返回 Future(Java)或直接发送(Python 也返回 Future)。不会阻塞。 异步 vs 同步:send().get() 会阻塞变成同步。
send(record, callback) 异步发送并提供回调函数,处理发送成功/失败后的逻辑(如记录失败消息、重试)。 生产者保证可靠性的常用手段。
flush() 强制刷新缓冲区,阻塞直到所有未完成消息被确认。 优雅关闭前确保数据不丢。
close() 关闭生产者,释放资源。会先自动 flush(),然后关闭网络连接。可设置超时。 一定要记得调用,否则可能丢失缓冲中的消息。
partitionsFor(topic) 获取某个 topic 的所有分区元数据(分区数、leader 副本等)。 用于动态路由或测试。
metrics() 获取生产者内部监控指标(如发送速率、重试次数、请求延迟)。 结合 Prometheus 做监控。
pause(partitions) / resume(partitions) 暂停/恢复向特定分区发送消息(流量控制)。 高级用法,比如下游限流时暂时停止生产。

Q:flush()close() 有什么区别?**

  • flush() 只保证当前缓冲区清空,但生产者还可以继续发送消息。
  • close() 会先 flush(),然后关闭生产者,之后不能再 send。

Q:如果不调用 flush() 直接退出程序会怎样?**

  • 如果 linger.ms 较大且缓冲区未满,最后一批消息可能永远发不出去,数据丢失
  • 最佳实践:退出前调用 flush()close()

Q:flush() 会一直阻塞吗?**

  • 会阻塞直到所有待发送消息都收到 broker 确认(或超时/失败)。如果 broker 挂掉,可能永久阻塞,所以生产环境会配合 max.block.ms 设置超时。

Q:性能影响?**

  • flush() 会破坏批处理优势,降低吞吐量。只在必要时(如切换 topic、程序结束前)使用,不要每条消息都 flush。

踩坑

问题现象:生成10000条简单的文本消息,大小约1.2GB,远超过Kafka默认的消息大小限制100MB

同样消息,在没有压缩、没有批处理配置、没有缓冲优化的情况下。

改用confluent-kafka,生产者发送消息的速度明显快,发送成功

kafka-python vs confluent_kafka

特性 kafka-python confluent_kafka
性能 纯Python实现,性能较低 C库librdkafka,性能高3-5倍
内存使用 较高,Python对象开销大 较低,C级别内存管理
压缩支持 基础支持 优化的压缩算法
批处理 基础批处理 高级批处理和背压控制
可靠性 一般 企业级可靠性

建议

建议使用 confluent_kafka**,原因:

  • 处理大消息性能更好
  • 内存使用更高效
  • 内置优化的批处理和压缩
  • 更好的错误处理和重试机制