confluent_kafka
核心方法详解
1. Producer.poll() - 生产者轮询
作用: - 处理已发送消息的确认和回调 - 触发 delivery_report 回调函数 - 清理内部状态
参数说明:
producer.poll(timeout)
# timeout: 等待时间(秒)
# 0 = 非阻塞,立即返回
# 1.0 = 阻塞最多1秒
为什么需要 poll?
# 错误示例 - 没有poll,回调永远不会执行
producer.produce(topic, value=msg, callback=delivery_report)
# delivery_report 永远不会被调用!
# 正确示例 - 定期poll触发回调
producer.produce(topic, value=msg, callback=delivery_report)
producer.poll(0) # 触发回调
最佳实践:
# 高频场景 - 每N条poll一次
for i in range(total):
producer.produce(topic, value=msg, callback=delivery_report)
if i % 100 == 0:
producer.poll(0) # 非阻塞触发回调
# 低频场景 - 每次发送后poll
producer.produce(topic, value=msg, callback=delivery_report)
producer.poll(1.0) # 阻塞等待确认
2. Consumer.commit(msg) - 消费者提交偏移量
作用: - 提交已处理消息的偏移量 - 确保消费者重启后从正确位置继续 - 实现精确一次处理语义
提交方式对比:
# 方式1: 自动提交(enable.auto.commit=True)
# Kafka自动定期提交,可能丢失消息或重复处理
# 方式2: 手动提交当前消息
consumer.commit(msg) # 提交特定消息的offset
# 方式3: 手动提交当前分区位置
consumer.commit() # 提交最后消费的offset
# 方式4: 异步提交
consumer.commit(asynchronous=True) # 不等待提交完成
最佳实践:
# 精确一次处理 - 每处理一条提交一次
for msg in consumer:
value = process_message(msg)
consumer.commit(msg) # 处理成功后立即提交
# 批量处理 - 每N条提交一次(性能更好)
count = 0
for msg in consumer:
value = process_message(msg)
count += 1
if count % 100 == 0:
consumer.commit() # 批量提交
3. Producer.produce() - 发送消息
producer.produce(
topic,
value=json.dumps(msg).encode("utf-8"),
callback=delivery_report
)
完整参数:
producer.produce(
topic='my-topic', # 主题名(必需)
value=b'message data', # 消息值(字节)
key=b'message-key', # 消息键(可选,用于分区)
partition=-1, # 指定分区(-1=自动)
timestamp=None, # 时间戳(None=当前时间)
headers=[('key', b'value')], # 消息头(可选)
callback=delivery_report # 发送回调
)
4. Producer.flush() - 清空缓冲区
作用: - 阻塞直到所有缓冲消息发送完成 - 确保程序退出前消息全部发送 - 常用于优雅关闭
# 优雅关闭
try:
for msg in messages:
producer.produce(topic, value=msg)
finally:
producer.flush() # 确保所有消息发送完成
producer.close() # 关闭连接
5. Consumer.poll() - 消费消息
# 消费者核心方法
msg = consumer.poll(timeout=1.0)
返回值:
msg = consumer.poll(timeout=1.0)
if msg is None:
# 超时,没有新消息
continue
if msg.error():
# 发生错误
print(f"Error: {msg.error()}")
else:
# 正常消息
print(f"Received: {msg.value()}")
6. 其他常用方法
Producer 方法
# 获取生产者配置信息
print(producer.list_topics())
# 关闭生产者
producer.close()
# 获取内部队列长度
print(len(producer)) # 缓冲消息数
Consumer 方法
# 订阅主题
consumer.subscribe(['topic1', 'topic2'])
# 取消订阅
consumer.unsubscribe()
# 获取当前分配分区
partitions = consumer.assignment()
# 手动分配分区
from confluent_kafka import TopicPartition
consumer.assign([TopicPartition('topic', 0)])
# 获取主题元数据
metadata = consumer.list_topics()
# 关闭消费者
consumer.close()
7. 完整示例对比
生产者最佳实践
from confluent_kafka import Producer
def delivery_report(err, msg):
if err:
print(f"发送失败: {err}")
else:
print(f"发送成功: {msg.topic()} [{msg.partition()}] offset={msg.offset()}")
conf = {
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'retries': 3,
'batch.size': 16384,
'linger.ms': 10,
'compression.type': 'lz4'
}
producer = Producer(conf)
try:
for i in range(10000):
producer.produce(
'my-topic',
value=f'message-{i}'.encode(),
key=f'key-{i}'.encode(),
callback=delivery_report
)
# 定期poll触发回调
if i % 100 == 0:
producer.poll(0)
finally:
# 确保所有消息发送完成
producer.flush()
producer.close()
消费者最佳实践
from confluent_kafka import Consumer, KafkaException
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # 手动提交
}
consumer = Consumer(conf)
consumer.subscribe(['my-topic'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
continue
else:
print(f"Consumer error: {msg.error()}")
break
# 处理消息
try:
value = msg.value().decode('utf-8')
print(f"Received: {value}")
# 处理成功后提交
consumer.commit(msg)
except Exception as e:
print(f"Process error: {e}")
# 处理失败,不提交offset,消息会重新消费
finally:
consumer.close()
8. 性能调优建议
# 高吞吐生产者配置
high_throughput_conf = {
'batch.size': 65536, # 64KB
'linger.ms': 20, # 20ms
'compression.type': 'lz4', # 快速压缩
'max.in.flight.requests.per.connection': 5,
'queue.buffering.max.messages': 1000000
}
# 低延迟生产者配置
low_latency_conf = {
'batch.size': 16384, # 16KB
'linger.ms': 0, # 立即发送
'compression.type': 'none', # 不压缩
'acks': 1 # 只等leader确认
}