kafka相关实践(demo版本)

发布时间:2026/7/3 12:24:20
kafka相关实践(demo版本) 我们来重新过一遍kafka如果从实操的角度来说运用kafka我们说一下大致的流程1.使用docker拉取kafka的文件夹并且运行下去旧的斑斑需要zookeeper2.创建一个springboot引入依赖yaml。做配置链接kafka3.使用代码弄一个生产者往kafka里面赛数据4.在kafka的dashboard里面查看赛的数据5.在创建一个springboot消费来自kafka的数据或者在原来的springboot里面接收kafka的数据然后消费第一步环境准备Docker 拉起 Kafka “全家桶”你说的“拉取kafka的文件夹”实操中通常指一个docker-compose.yml文件。我们直接上KRaft 模式去掉老旧的 Zookeeper并配上Kafdrop可视化界面这就是你的 Dashboard。新建文件夹kafka-lab创建docker-compose.ymlversion:3services:kafka:image:bitnami/kafka:latestports:-9092:9092environment:# KRaft 模式配置KAFKA_CFG_NODE_ID:0KAFKA_CFG_PROCESS_ROLES:controller,brokerKAFKA_CFG_CONTROLLER_QUORUM_VOTERS:0kafka:9093KAFKA_CFG_LISTENERS:PLAINTEXT://:9092,CONTROLLER://:9093KAFKA_CFG_ADVERTISED_LISTENERS:PLAINTEXT://localhost:9092KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP:PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXTKAFKA_CFG_CONTROLLER_LISTENER_NAMES:CONTROLLERKAFKA_CFG_INTER_BROKER_LISTENER_NAME:PLAINTEXTKAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE:true# 允许生产者自动创建主题新手友好ALLOW_PLAINTEXT_LISTENER:yesKAFKA_KRAFT_CLUSTER_ID:abcdefghijklmnopqrstuvhealthcheck:test:[CMD,kafka,broker,healthchecks]interval:10stimeout:5sretries:5kafdrop:# 可视化 Dashboardimage:obsidiandynamics/kafdropports:-9000:9000environment:KAFKA_BROKERCONNECT:kafka:9092JVM_OPTS:-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:UseStringDeduplication -noverifydepends_on:kafka:condition:service_healthy运行docker-compose up -d验证浏览器打开http://localhost:9000能看到 Kafka 集群信息说明环境 OK。第二步创建 SpringBoot 项目生产者新建一个 SpringBoot 项目引入核心依赖pom.xmldependencies!-- Spring Kafka 核心 --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency!-- 为了发 JSON 对象引入 Jackson --dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId!-- 方便用 Controller 触发 --/dependency/dependenciesapplication.yml配置spring:kafka:bootstrap-servers:localhost:9092producer:key-serializer:org.apache.kafka.common.serialization.StringSerializervalue-serializer:org.springframework.kafka.support.serializer.JsonSerializer# 重要生产环境推荐 acksall等待所有副本确认测试用 1 即可acks:1第三步编写生产者代码往 Kafka 塞数据我们定义一个实体UserEvent.java发送 JSONpublicrecordUserEvent(StringuserId,StringuserName,Longtimestamp){}写一个 Service 负责发送ServiceSlf4jpublicclassKafkaProducerService{AutowiredprivateKafkaTemplateString,ObjectkafkaTemplate;publicvoidsendMessage(Stringtopic,Stringkey,Objectmessage){// 发送并获取 Future添加回调监听成功/失败实操必须加varfuturekafkaTemplate.send(topic,key,message);future.addCallback(result-{varmetadataresult.getRecordMetadata();log.info(✅ 发送成功 - 主题: {}, 分区: {}, 偏移量: {},metadata.topic(),metadata.partition(),metadata.offset());},failure-{log.error(❌ 发送失败: ,failure);});}}写一个 Controller 触发发送方便测试RestControllerRequestMapping(/kafka)publicclassKafkaController{AutowiredprivateKafkaProducerServiceproducerService;GetMapping(/send)publicStringsend(RequestParamStringmsg){vareventnewUserEvent(u-123,msg,System.currentTimeMillis());// 指定主题名称如果不存在且 auto.create 为 trueKafka 会自动创建producerService.sendMessage(my-topic,event.userId(),event);return消息已发送: msg;}}启动生产者 SpringBoot 服务端口设为 8080访问http://localhost:8080/kafka/send?msghello控制台会打印成功日志说明数据已写入 Broker。第四步在 Kafka DashboardKafdrop里查看数据回到http://localhost:9000点击Topics- 找到my-topic自动创建的分区数默认 1副本数 1。点击my-topic进入详情找到View Messages或Browse Messages。选择 Partition 0点击Browse就能看到你刚发送的那条 JSON 数据包含 userId、userName 和时间戳。这一步证明了数据确实落盘了且可视化工具能反序列化读取。第五步创建消费者消费数据你有两种选择新建一个独立 SpringBoot 项目模拟微服务解耦或在现有项目加一个监听器模拟内部处理。这里我强烈建议新建一个消费者项目端口 8081因为实际生产中生产者与消费者往往独立部署。新项目的依赖和 YAML 一模一样但application.yml需要额外配置消费者spring:kafka:bootstrap-servers:localhost:9092consumer:group-id:my-consumer-group# 消费者组非常重要决定消费进度key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.springframework.kafka.support.serializer.JsonDeserializerproperties:# 允许反序列化不信任的包因为我们的类在 com.example.demo 下spring.json.trusted.packages:*# 新消费者从哪里开始消费earliest从最开始 / latest最新 / noneauto-offset-reset:earliest# 手动提交还是自动提交测试用自动生产推荐手动enable-auto-commit:true写一个简单的消息监听器核心代码ComponentSlf4jpublicclassKafkaMessageListener{KafkaListener(topicsmy-topic,groupIdmy-consumer-group)publicvoidlisten(UserEventevent,Header(KafkaHeaders.RECEIVED_PARTITION)intpartition,Header(KafkaHeaders.OFFSET)longoffset){log.info( 消费到数据: {}, 来自分区: {}, 偏移量: {},event,partition,offset);// 在这里写你的业务逻辑入库、调用下游等}}启动消费者服务端口 8081你会立刻看到之前那条hello消息被消费打印出来。再次调用生产者的发送接口消费者会实时打印新消息。进阶避坑 核心概念串讲这一步帮你彻底弄懂走过流程后必须搞懂这几个关键点主题Topic、分区Partition与偏移量Offsetmy-topic是一个逻辑通道。Kafdrop 里你能看到它默认只有 1 个分区p0。如果生产环境创建主题时要指定分区数如 3 个分区数决定了消费者的并行度一个分区只能被同一个消费者组里的一个实例消费。消费者组Group ID我们在消费者 YAML 里配了group-id: my-consumer-group。如果再启动一个消费者实例端口 8082且 group.id 相同它们会负载均衡消费分区如果my-topic有 2 个分区则各消费一个如果只有 1 个则只有一个实例能消费另一个闲置作为备份。如果 group.id不同比如my-consumer-group-2那么两个消费者都会完整消费所有数据相当于广播模式。生产者 Acks 与可靠性acks1Leader 收到消息就确认可能丢数据。acksall或-1所有 ISR同步副本确认才返回最强可靠性但延迟高。线上订单、支付类必须用all。偏移量提交Offset Commit上面我们用了enable-auto-commit: true自动提交。风险是业务逻辑还没处理完自动提交已触发消费者挂了会导致消息丢失。生产实操改为enable-auto-commit: false在listen方法里手动调用Acknowledgment.acknowledge()确保业务处理完再提交偏移量。KafkaListener(topicsmy-topic)publicvoidlisten(UserEventevent,Acknowledgmentack){try{// 业务处理...ack.acknowledge();// 手动提交}catch(Exceptione){// 不提交等待重试}}关于“同一个 SpringBoot 里既生产又消费”完全可行你只需要在启动类加上EnableKafka然后把上述KafkaListener和KafkaTemplate放在同一个项目即可。但解耦是微服务核心原则建议分开。终极大礼包生产级启动脚本命令你的第一步“运行下去”还可以用命令创建主题替代自动创建# 进入 Kafka 容器dockerexec-itkafka-lab-kafka-1bash# 创建主题3分区2副本注意副本数不能超过 broker 数量kafka-topics.sh--create--topicorder-topic--partitions3--replication-factor1--bootstrap-server localhost:9092# 查看主题详情kafka-topics.sh--describe--topicorder-topic --bootstrap-server localhost:9092按这个流程走一遍你就不是“调 API”而是真正实操了一遍 Kafka 的核心链路。如果中间遇到反序列化报错比如JsonDeserializer找不到类记得在消费者 YAML 里配spring.kafka.consumer.properties.spring.json.trusted.packages*这是新手最容易卡的坑。有任何一步报错直接贴日志给我我帮你精准排错。继续吧