1. 搭建kafka的集群
kafka集群搭建的前提:jdk,zookeeper
Kafka集群只需要我们和单机一样将新的kafka加入到同一个zookeeper里即可,但broker.id需要和其他机器不一样
注意:
1、config/server.properties的log.dirs的保存消息日志的目录是支持多个磁盘目录的
2、使用springboot通过外网访问kafka需要开通一个监听如下,如果是集群的话需要给每台机器设置自己的外网IP
advertised.listeners=PLAINTEXT://39.99.195.49:9092
3、配置完一台单机的kafka,直接scp给到其他服务器节点,修改config/server.properties
现在有了3个broker节点,创建topic,有5个分区,每个leader主分区都有2个follower副本分区
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --partitions 5 --replication-factor 3 --topic topicName
# 查看topic的详情
kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic topicName
Topic: topicName PartitionCount: 5 ReplicationFactor: 3 Configs:
Topic: topicName Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: topicName Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: topicName Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topicName Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: topicName Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
可以看到0分区的leader在broker ID 为1的broker节点上,画一下topic的分区结构图
数据是由Follower主动从Leader那里拉取过来的,当ISR中的Follower完成数据同步之后,Leader就会给Follower发送ack
2. Springboot整合Kafka进行消息收发
项目工程结构:
Producer发送端
1、pom.xml导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、application.yaml配置文件
spring:
kafka:
bootstrap-servers: 39.99.222.44:9092 # 集群用,隔开多个kafka节点
producer:
retries: 3 # 发送消息的重试次数
batch-size: 16384 # 批量
acks: 1 # 等待partition leader落盘完成后才返回ack,发送成功 ,默认就是1
buffer-memory: 33554432 # 设置生产者内存缓存的大小 32M
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer的序列化参数值,可以在官方文档http://kafka.apache.org/23/documentation.html查看配置值
idea全局搜索
3、编写发送代码
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
@Component
public class KafkaProducerService {
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;
public void sendMessage(String topic,String key,Object data){
// 轮询分区
// ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, data);
// 全部发送到topic的0分区
ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,0,key,data);
// acks回调
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("********消息发送失败:"+throwable.toString());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("=========消息发送成功:"+result.toString());
}
});
}
}
测试代码
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import com.icodingedu.producter.service.KafkaProducerService;
@SpringBootTest
class KafkaProducterApplicationTests {
@Autowired
KafkaProducerService kafkaProducerService;
@Test
void contextLoads() {
String topic = "topicfirst";
for(int i=0;i<10;i++){
kafkaProducerService.sendMessage(topic,"key:"+i,"hello kafka "+i);
}
System.out.println("======发送完成======");
}
}
启动测试,远程kafka报错
修改kafka的配置文件server.properties
[root@helloworld config]# vim server.properties
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://139.199.13.139:9092
# 重新启动kafka
kafka-server-stop.sh /opt/kafka/config/server.properties
kafka-server-start.sh -daemon /opt/kafka/config/server.properties
消息发送成功,发现发送了10次
在没有设置分区和key的情况下,按照轮询方式写入数据,消费结果如下
kafka-console-consumer.sh --topic topicfirst --from-beginning --bootstrap-server 127.0.0.1:9092
# 读取的值
hello kafka 1
hello kafka 6
hello kafka 2
hello kafka 7
hello kafka 3
hello kafka 8
hello kafka 4
hello kafka 9
hello kafka 0
hello kafka 5
# 写入分区的顺序机制(没有指定partition分区与message的key的情况下)
Partition: 0 1 2 3 4
1 6 2 7 3 8 4 9 0 5
// 指定分区和key
ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,0,key,data);
Consumer消费端
1、pom.xml导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、application.yaml配置文件
spring:
kafka:
bootstrap-servers: 39.99.195.49:9092,39.99.196.208:9092,39.99.196.190:9092 # 集群多个kafka节点
consumer:
enable-auto-commit: false # 不自动签收,要让业务走完,才手动签收
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 发送端使用序列化,消费端就要反序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual
concurrency: 5
# auto-offset-reset的值有两个
# earliest: kafka出现错误重启之后,会找到未消费的offset继续消费
# latest: kafka出现错误中,如果还有数据往topic里写,只会从最新的offset开始消费
3、编写接收代码
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerService {
@KafkaListener(groupId = "group01",topics = "topicName")
public void onMessage(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
System.out.println("*****获取的消息: "+record.value());
acknowledgment.acknowledge();
}
}
3. kafka消费进度分析以及消费应答
# 通过命令看消费进度的
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.177:9092,192.168.0.178:9092,192.168.0.179:9092 --describe --group group01
# CURRENT-OFFSET :当前消费的offset进度
# LOG-END-OFFSET :数据文件里一共有多少位移数据
# LAG :这是就是还未消费的量
CURRENT-OFFSET + LAG = LOG-END-OFFSET
把消费端停掉,发送端发送消息,运行/kafka-consumer-groups.sh 看消费组的LAG堆积还没消费的消息,发现还有2*5=10条消息没消费
把签收注释掉,启动消费端,获取消息消费,查看LAG还是10条,因为消费端没有告诉kafka broker 确认签收消费
// acknowledgment.acknowledge();
所以业务做完后,要手工签收acknowledgment.acknowledge();
5. 在程序中consumer如何重新消费
# 重新消费在命令行的方式 加上 --from-beginning
kafka-console-consumer.sh --topic topicfirst --from-beginning --bootstrap-server 127.0.0.1:9092
重新消费需要明确两个点
- 每次消费完毕都会记录consumer的offset 消费位移
- 如果要从代码里从头消费就需要配置
- auto-offset-reset: earliest
- 更换消费者组或者将已消费的offset删除
更换消费者组重新消费
@KafkaListener(groupId = "group02",topics = "topicfirst")
public void onMessager(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
System.out.println("*******获取消息:" + record.value());
acknowledgment.acknowledge(); // 确认签收
}
注意:application.yaml要配置auto-offset-reset:earliest
发现之前使用group01已确认消费的消息重新读出来了
6. Kafka监控服务平台Eagle的使用
kafka-eagle平台监控系统(Kafka 鹰眼监控)
官网地址: http://www.kafka-eagle.org/articles/docs/changelog/changelog.html
# 下载地址 http://download.kafka-eagle.org/
wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.4.8.tar.gz
# 解压
tar -zxvf kafka-eagle-bin-1.4.8.tar.gz
[root@helloworld kafka-eagle-bin-1.4.8]# tar -xzvf kafka-eagle-web-1.4.8-bin.tar.gz
# Eagle是通过JMX来拉取kafka信息
# JMX:是Java Management Extensions(Java管理扩展)的缩写,
# 首先要对Kafka开启JMX
# 1.开启kafka的JMX
cd kafka/bin
vi bin/kafka-server-start.sh
# 修改heap内容开启JMX
# 将这一行进行修改:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xmx1G -Xms1G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
# 如果是集群需要所有机器都将这个配置修改好
# 重启kafka让配置生效
[root@helloworld bin]# kafka-server-stop.sh /opt/kafka/config/server.properties
[root@helloworld bin]# kafka-server-start.sh -daemon /opt/kafka/config/server.properties
# 2.eagle配置到环境变量中
vi /etc/profile
export KE_HOME=/usr/local/kafka-eagle
export PATH=$KE_HOME/bin:$JAVA_HOME/bin:$PATH
# 让配置生效
source /etc/profile
# 3.给执行文件授权
cd /usr/local/kafka-eagle/bin
chmod 777 ke.sh
# 4.eagle系统的配置,eagle可以监控多套kafka集群
cd /usr/local/kafka-eagle/conf
vi system-config.properties
# 设置对应的参数
# 可以配置多个集群,这里只配置一套,多个用逗号分开
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=127.0.0.1:2181,127.0.0.1:2181,127.0.0.1:2181
# 显示的broker数量
cluster1.kafka.eagle.broker.size=20
# zookeeper客户端线程数
kafka.zk.limit.size=25
# eagle端口
kafka.eagle.webui.port=8048
# 消费的offset保存位置
cluster1.kafka.eagle.offset.storage=kafka
# 是否开启图表并保持30天内容
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30
# KSQL的查询显示条数和是否自动fix error
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=false
# 从界面删除topic的token密码,相当于暗号,跟Elasticsearch head 界面上删除索引的时候要输入“删除”两个字的道理是一样的
kafka.eagle.topic.token=keadmin
# mysql保存eagle的一些元数据驱动
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/kafka_eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
修改kafka/bin/kafka-server-start.sh开启kafka 的JMX
mysql保存eagle的一些元数据驱动的数据库kafka_eagle需要自己手动创建
注意数据库如果不在一个设备上需要提前授权
use mysql;
select host,user from user;
grant all privileges on *.* to gavin@'192.168.%' identified by 'cpcoredb';
flush privileges;
启动/关闭/查看Kafka-eagle状态
[root@helloworld bin]# ./ke.sh --help
Usage: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate}
bin/ke.sh start
bin/ke.sh stop
bin/ke.sh stats # 查看状态
# 访问地址外网,云服务器要开通安全组和防火墙
http://39.100.39.20:8048/ke
启动成功:
访问控制台:
Dashboard
7. 大数据日志平台服务搭建设计
收集日志的目的:确保系统的稳定性,查看过去的日志信息
收集日志要注意的是
- 日志信息量非常大,所以一定要有一个可以堆积的地方
- 日志会出现暴增的情况
- 日志文件–>kafka(分布式)–>Logstash(收集过滤)–>ElasticSearch(聚合计算)–>Kibana(报表展示)
8. 使用filebeat将日志收集进kafka
logstash:使用java开发的比较消耗资源
filebeat:使用go语言开发的的空间,比logstash更轻量级,占用资源更少,也是Elasticsearch的组件之一,一般在生产环境都是使用filebeat来进行日志收集的
# 下载安装,与Elasticsearch的版本保持一致
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.5.2-linux-x86_64.tar.gz
# 我云服务器上的elasticsearch的版本是7.6.1,所以我下载的filebeat也是7.6.1版本
# 1、解压
tar -zxvf filebeat-7.5.2-linux-x86_64.tar.gz
# 2、修改配置文件
[esuser@helloworld filebead-7.6.1]$ pwd
/home/esuser/filebead-7.6.1
vi /usr/local/filebeat/filebeat.yml
filebeat.yml的配置信息
filebeat.inputs:
# 我的这个输入项是干什么的,自己命个名标记下
- input_type: log
paths: # 你的日志从哪里取的,目录,理论上日志文件我们生产环境上运行的程序产生的,你可以把日志内容通过程序写入ES,但没有是filebeat 稳定可靠
- /var/log/myapp/error.log
document_type: "error-log" # 写入ES时的_type值,ES7.x版本 type默认为_doc
multiline:
pattern: '^\[' # 指定匹配的表达式,匹配以[开头的字符串,用来匹配寻找error日志信息
negate: true # 是否匹配到,必须要匹配到
match: after # 没有匹配到的合并到上一行的末尾
max_lines: 2000 # 未匹配到最大的行数
timeout: 2s # 如果在规定时间没有新的日志事件就不等待后面的直接output
fields:
logbiz: order-error # 业务命名
logtopic: order-error-log # kafka的topic,把错误信息给到该topic
evn: dev # 定义一个环境名
registry_file: /home/esuser/filebead-7.6.1/data/registry # 记录日志读取的位置,如果容器重启,可以从记录的位置开始取日志
- input_type: log
paths: # 收集另外一个日志文件
- /var/log/myapp/info.log
document_type: "info-log"
multiline:
pattern: '^\['
negate: true
match: after
max_lines: 2000
timeout: 2s
fields:
logbiz: order-info
logtopic: order-info-log
evn: dev
registry_file: /home/esuser/filebead-7.6.1/data/registry
output.kafka:
enabled: true
hosts: ["192.168.0.177:9092","192.168.0.178:9092","192.168.0.179:9092"] # 这里kafka是一个集群
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip # 压缩
max_message_bytes: 1000000 # 最大日志1M
required_acks: 1
logging.to_files: true
3、启动filebeat
# 启动前做一下验证,看一下配置文件是否正确,就像nginx -t 一样
[esuser@helloworld filebead-7.6.1]$ ./filebeat test config
Config OK
# 注意:在启动之前一定要将filebeat导入的topic创建好
# 后台启动filebeat
./filebeat &
准备工作
创建/var/log/myapp/error.log,放入日志内容
vi /var/log/myapp/error.log
[error 信息,系统异常,error-1]
[error 信息,系统异常,error-2]
[error 信息,系统异常,error-3]
[error 信息,系统异常,error-4]
[error 信息,系统异常,error-5]
[error 信息,系统异常,error-6]
在Kafka eagle创建topic:order-error-log
创建/var/log/myapp/info.log,放入日志内容
vi /var/log/myapp/info.log
[info 信息,系统提示,info-1]
[info 信息,系统提示,info-2]
[info 信息,系统提示,info-3]
[info 信息,系统提示,info-4]
[info 信息,系统提示,info-5]
[info 信息,系统提示,info-6]
在Kafka eagle创建topic:order-info-log
验证日志文件内容是否已收集进kafka
使用命令查看topic 消息
kafka-console-consumer.sh --topic order-error-log --from-beginning --bootstrap-server 127.0.0.1:9092
Springboot Consumer消费端监听
@Service
public class ConsumerService {
@KafkaListener(groupId = "group01",topics = "order-error-log")
public void onMessager(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
System.out.println("*******获取消息:" + record.value());
acknowledgment.acknowledge(); // 确认签收
}
}
9. 通过logstash提取kafka数据进入ES
# 0.解压安装
# 根据ElasticSearch的版本下载,确保已安装Java,配置环境变量
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.5.2.tar.gz
# 解压
tar -zxvf logstash-7.5.2.tar.gz
# 1.在根目录创建sync的,之前我已经使用过logstash同步mysql的数据到ES中
[esuser@helloworld sync]$ ls
aiko-dict-task.sql logstash-db-dict-sync.conf logstash-ik.json mysql-connector-java-5.1.47.jar track_time
[esuser@helloworld sync]$ vi logstash-log-sync.conf
# 2.在sync文件里创建logstash-log-sync.conf,配置内容如下
input {
kafka {
bootstrap_servers => "192.168.0.177:9092,192.168.0.178:9092,192.168.0.179:9092" # kafka集群的写法
topics_pattern => "order-.*" # 匹配topic
consumer_threads => 5 # 读取的线程数
decorate_events => true
codec => "json"
auto_offset_reset => "latest" # 取最新的数据
group_id => "logstash1"
}
}
output {
elasticsearch {
hosts => ["192.168.0.175:9200"] # es地址 不能使用127.0.0.1,使用内网ip或外网ip
index => "kafka-%{+YYYY.MM.dd}"
}
stdout{
codec => rubydebug ##输出到屏幕上
}
}
# 在bin目录下启动执行
./logstash -f /usr/local/logstash/sync/logstash-log-sync.conf
# nohup后台启动
nohup ./logstash -f /usr/local/logstash/logstash-7.5.2/sync/logstash-log-sync.conf > myout.file 2>&1 &
准备工作
启动elasticsearch,elasticsearch-head 访问
# 查看java运行的进程
[root@helloworld bin]# jps
11507 Elasticsearch
15445 Jps
14408 QuorumPeerMain
14489 QuorumPeerMain
14442 QuorumPeerMain
9130 Kafka
12171 Elasticsearch
16877 KafkaEagle
12077 Elasticsearch
# kill掉其中一个es
[root@helloworld bin]#
logstash 启动成功
看es现在是还没有创建kafka的索引,我们来修改日志文件/var/log/myapp/error.log
# 增加 7到11的error数据
[root@helloworld ~]# vi /var/log/myapp/error.log
[error 信息,系统异常,error-7]
[error 信息,系统异常,error-8]
[error 信息,系统异常,error-9]
[error 信息,系统异常,error-10]
[error 信息,系统异常,error-11]
保存后,马上触发了logstash的读取kafka的日志内容,在这之前,filebeat是先把日志内容放进了kafka的
这时我们来看es-head控制台,发现创建了索引kafka-2020.07.23
11条错误信息都读进来了
问题
现在我加一条错误信息数据,就导致整个文件重新读进了kafka,从而导致数据重复
# 增加一条的error数据
[root@helloworld ~]# vi /var/log/myapp/error.log
[error 信息,系统异常,error-12]
发现filebeat 层没有做过滤,数据重复写进了kafka,导致重复写进了ES
看ES的数据重复了
由于filebeat只能采集数据无法进行筛选,所以过滤层要做logstash做
注意
filebeat重复发送数据内容,是因为我们没有做换行,使用shell命令写入一行
[root@helloworld myapp]# echo '[info 信息,系统提示,info-14]' >> info.log
这样它就没有把之前的内容重新发送过来了,只发送了我新添加的一行数据