黄埔班第47节:kafka消息队列应用实战-3

2020/03/08

1. 搭建kafka的集群

kafka集群搭建的前提:jdk,zookeeper

Kafka集群只需要我们和单机一样将新的kafka加入到同一个zookeeper里即可,但broker.id需要和其他机器不一样

注意:

1、config/server.properties的log.dirs的保存消息日志的目录是支持多个磁盘目录的

2、使用springboot通过外网访问kafka需要开通一个监听如下,如果是集群的话需要给每台机器设置自己的外网IP

advertised.listeners=PLAINTEXT://39.99.195.49:9092

3、配置完一台单机的kafka,直接scp给到其他服务器节点,修改config/server.properties

现在有了3个broker节点,创建topic,有5个分区,每个leader主分区都有2个follower副本分区

kafka-topics.sh --zookeeper  127.0.0.1:2181 --create --partitions 5 --replication-factor 3 --topic topicName

# 查看topic的详情
kafka-topics.sh --zookeeper  127.0.0.1:2181 --describe --topic topicName

Topic: topicName        PartitionCount: 5       ReplicationFactor: 3    Configs: 
        Topic: topicName        Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: topicName        Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: topicName        Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: topicName        Partition: 3    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
        Topic: topicName        Partition: 4    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3

可以看到0分区的leader在broker ID 为1的broker节点上,画一下topic的分区结构图

数据是由Follower主动从Leader那里拉取过来的,当ISR中的Follower完成数据同步之后,Leader就会给Follower发送ack

2. Springboot整合Kafka进行消息收发

项目工程结构:

Producer发送端

1、pom.xml导入依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

2、application.yaml配置文件

spring:
  kafka:
    bootstrap-servers: 39.99.222.44:9092 # 集群用,隔开多个kafka节点
    producer:
      retries: 3 # 发送消息的重试次数
      batch-size: 16384  # 批量
      acks: 1 # 等待partition leader落盘完成后才返回ack,发送成功 ,默认就是1
      buffer-memory: 33554432 # 设置生产者内存缓存的大小 32M
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

key-serializer的序列化参数值,可以在官方文档http://kafka.apache.org/23/documentation.html查看配置值

idea全局搜索

3、编写发送代码

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

@Component
public class KafkaProducerService {

  @Resource
  private KafkaTemplate<String,Object> kafkaTemplate;

  public void sendMessage(String topic,String key,Object data){
		// 轮询分区
    // ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, data);
    // 全部发送到topic的0分区
    ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,0,key,data);
    // acks回调
    future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
      @Override
      public void onFailure(Throwable throwable) {
        System.out.println("********消息发送失败:"+throwable.toString());

      }
      @Override
      public void onSuccess(SendResult<String, Object> result) {
        System.out.println("=========消息发送成功:"+result.toString());
      }
    });
  }
}

测试代码

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import com.icodingedu.producter.service.KafkaProducerService;

@SpringBootTest
class KafkaProducterApplicationTests {

	@Autowired
	KafkaProducerService kafkaProducerService;

	@Test
	void contextLoads() {
		String topic = "topicfirst";
		for(int i=0;i<10;i++){
			kafkaProducerService.sendMessage(topic,"key:"+i,"hello kafka "+i);
		}
		System.out.println("======发送完成======");
	}
}

启动测试,远程kafka报错

修改kafka的配置文件server.properties

[root@helloworld config]# vim server.properties 
# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://139.199.13.139:9092

# 重新启动kafka
kafka-server-stop.sh /opt/kafka/config/server.properties
kafka-server-start.sh -daemon /opt/kafka/config/server.properties

消息发送成功,发现发送了10次

在没有设置分区和key的情况下,按照轮询方式写入数据,消费结果如下

kafka-console-consumer.sh --topic topicfirst --from-beginning --bootstrap-server 127.0.0.1:9092
# 读取的值
hello kafka 1
hello kafka 6
hello kafka 2
hello kafka 7
hello kafka 3
hello kafka 8
hello kafka 4
hello kafka 9
hello kafka 0
hello kafka 5
# 写入分区的顺序机制(没有指定partition分区与message的key的情况下)
Partition: 0    1    2    3    4 
           1 6  2 7  3 8  4 9  0 5
// 指定分区和key
ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic,0,key,data);

Consumer消费端

1、pom.xml导入依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

2、application.yaml配置文件

spring:
  kafka:
    bootstrap-servers: 39.99.195.49:9092,39.99.196.208:9092,39.99.196.190:9092 # 集群多个kafka节点
    consumer:
      enable-auto-commit: false # 不自动签收,要让业务走完,才手动签收
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 发送端使用序列化,消费端就要反序列化
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual
      concurrency: 5
# auto-offset-reset的值有两个      
# earliest: kafka出现错误重启之后,会找到未消费的offset继续消费
# latest: kafka出现错误中,如果还有数据往topic里写,只会从最新的offset开始消费

3、编写接收代码

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerService {

    @KafkaListener(groupId = "group01",topics = "topicName")
    public void onMessage(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
        System.out.println("*****获取的消息: "+record.value());
        acknowledgment.acknowledge();
    }
}

3. kafka消费进度分析以及消费应答

# 通过命令看消费进度的
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.177:9092,192.168.0.178:9092,192.168.0.179:9092 --describe --group group01

# CURRENT-OFFSET :当前消费的offset进度
# LOG-END-OFFSET :数据文件里一共有多少位移数据
# LAG :这是就是还未消费的量
CURRENT-OFFSET + LAG = LOG-END-OFFSET

把消费端停掉,发送端发送消息,运行/kafka-consumer-groups.sh 看消费组的LAG堆积还没消费的消息,发现还有2*5=10条消息没消费

把签收注释掉,启动消费端,获取消息消费,查看LAG还是10条,因为消费端没有告诉kafka broker 确认签收消费

 // acknowledgment.acknowledge();

所以业务做完后,要手工签收acknowledgment.acknowledge();

5. 在程序中consumer如何重新消费

# 重新消费在命令行的方式 加上 --from-beginning
kafka-console-consumer.sh --topic topicfirst --from-beginning --bootstrap-server 127.0.0.1:9092

重新消费需要明确两个点

  • 每次消费完毕都会记录consumer的offset 消费位移
  • 如果要从代码里从头消费就需要配置
    • auto-offset-reset: earliest
    • 更换消费者组或者将已消费的offset删除

更换消费者组重新消费

@KafkaListener(groupId = "group02",topics = "topicfirst")
	public void onMessager(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
		System.out.println("*******获取消息:" + record.value());
		acknowledgment.acknowledge(); // 确认签收
	}

注意:application.yaml要配置auto-offset-reset:earliest

发现之前使用group01已确认消费的消息重新读出来了

6. Kafka监控服务平台Eagle的使用

kafka-eagle平台监控系统(Kafka 鹰眼监控)

官网地址: http://www.kafka-eagle.org/articles/docs/changelog/changelog.html

# 下载地址 http://download.kafka-eagle.org/
wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.4.8.tar.gz
# 解压
tar -zxvf kafka-eagle-bin-1.4.8.tar.gz 
[root@helloworld kafka-eagle-bin-1.4.8]# tar -xzvf kafka-eagle-web-1.4.8-bin.tar.gz 


# Eagle是通过JMX来拉取kafka信息
# JMX:是Java Management Extensions(Java管理扩展)的缩写,
# 首先要对Kafka开启JMX
# 1.开启kafka的JMX
cd kafka/bin
vi bin/kafka-server-start.sh
# 修改heap内容开启JMX
# 将这一行进行修改:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xmx1G -Xms1G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
# 如果是集群需要所有机器都将这个配置修改好
# 重启kafka让配置生效
[root@helloworld bin]# kafka-server-stop.sh /opt/kafka/config/server.properties 
[root@helloworld bin]# kafka-server-start.sh -daemon /opt/kafka/config/server.properties 

# 2.eagle配置到环境变量中
vi /etc/profile
export KE_HOME=/usr/local/kafka-eagle
export PATH=$KE_HOME/bin:$JAVA_HOME/bin:$PATH
# 让配置生效
source /etc/profile

# 3.给执行文件授权
cd /usr/local/kafka-eagle/bin
chmod 777 ke.sh

# 4.eagle系统的配置,eagle可以监控多套kafka集群
cd /usr/local/kafka-eagle/conf
vi system-config.properties
# 设置对应的参数
# 可以配置多个集群,这里只配置一套,多个用逗号分开
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=127.0.0.1:2181,127.0.0.1:2181,127.0.0.1:2181
# 显示的broker数量
cluster1.kafka.eagle.broker.size=20
# zookeeper客户端线程数
kafka.zk.limit.size=25
# eagle端口
kafka.eagle.webui.port=8048
# 消费的offset保存位置
cluster1.kafka.eagle.offset.storage=kafka
# 是否开启图表并保持30天内容
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30
# KSQL的查询显示条数和是否自动fix error
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=false
# 从界面删除topic的token密码,相当于暗号,跟Elasticsearch head 界面上删除索引的时候要输入“删除”两个字的道理是一样的
kafka.eagle.topic.token=keadmin
# mysql保存eagle的一些元数据驱动
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/kafka_eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456

修改kafka/bin/kafka-server-start.sh开启kafka 的JMX

mysql保存eagle的一些元数据驱动的数据库kafka_eagle需要自己手动创建

注意数据库如果不在一个设备上需要提前授权

use mysql;
select host,user from user;
grant all privileges on *.* to gavin@'192.168.%' identified by 'cpcoredb';
flush privileges;

启动/关闭/查看Kafka-eagle状态

[root@helloworld bin]# ./ke.sh --help
Usage: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate}

bin/ke.sh start
bin/ke.sh stop
bin/ke.sh stats  # 查看状态

# 访问地址外网,云服务器要开通安全组和防火墙
http://39.100.39.20:8048/ke

启动成功:

访问控制台:

Dashboard

7. 大数据日志平台服务搭建设计

收集日志的目的:确保系统的稳定性,查看过去的日志信息

收集日志要注意的是

  • 日志信息量非常大,所以一定要有一个可以堆积的地方
  • 日志会出现暴增的情况
  • 日志文件–>kafka(分布式)–>Logstash(收集过滤)–>ElasticSearch(聚合计算)–>Kibana(报表展示)

8. 使用filebeat将日志收集进kafka

logstash:使用java开发的比较消耗资源

filebeat:使用go语言开发的的空间,比logstash更轻量级,占用资源更少,也是Elasticsearch的组件之一,一般在生产环境都是使用filebeat来进行日志收集的

# 下载安装,与Elasticsearch的版本保持一致
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.5.2-linux-x86_64.tar.gz
# 我云服务器上的elasticsearch的版本是7.6.1,所以我下载的filebeat也是7.6.1版本
# 1、解压
tar -zxvf filebeat-7.5.2-linux-x86_64.tar.gz
# 2、修改配置文件
[esuser@helloworld filebead-7.6.1]$ pwd
/home/esuser/filebead-7.6.1
vi /usr/local/filebeat/filebeat.yml

filebeat.yml的配置信息

filebeat.inputs:
# 我的这个输入项是干什么的,自己命个名标记下
- input_type: log
  paths:  # 你的日志从哪里取的,目录,理论上日志文件我们生产环境上运行的程序产生的,你可以把日志内容通过程序写入ES,但没有是filebeat 稳定可靠
    - /var/log/myapp/error.log
  document_type: "error-log"    # 写入ES时的_type值,ES7.x版本 type默认为_doc 
  multiline:
    pattern: '^\['              # 指定匹配的表达式,匹配以[开头的字符串,用来匹配寻找error日志信息
    negate: true                # 是否匹配到,必须要匹配到
    match: after                # 没有匹配到的合并到上一行的末尾
    max_lines: 2000             # 未匹配到最大的行数
    timeout: 2s                 # 如果在规定时间没有新的日志事件就不等待后面的直接output
  fields: 
    logbiz: order-error         # 业务命名
    logtopic: order-error-log   # kafka的topic,把错误信息给到该topic
    evn: dev                    # 定义一个环境名
  registry_file: /home/esuser/filebead-7.6.1/data/registry  # 记录日志读取的位置,如果容器重启,可以从记录的位置开始取日志

- input_type: log
  paths:   # 收集另外一个日志文件
    - /var/log/myapp/info.log
  document_type: "info-log"
  multiline:
    pattern: '^\['
    negate: true
    match: after
    max_lines: 2000
    timeout: 2s
  fields: 
    logbiz: order-info
    logtopic: order-info-log
    evn: dev
  registry_file: /home/esuser/filebead-7.6.1/data/registry  

output.kafka: 
   enabled: true
   hosts: ["192.168.0.177:9092","192.168.0.178:9092","192.168.0.179:9092"]  # 这里kafka是一个集群
   topic: '%{[fields.logtopic]}'
   partition.hash: 
     reachable_only: true
   compression: gzip   # 压缩
   max_message_bytes: 1000000   # 最大日志1M
   required_acks: 1
logging.to_files: true

3、启动filebeat

# 启动前做一下验证,看一下配置文件是否正确,就像nginx -t 一样
[esuser@helloworld filebead-7.6.1]$ ./filebeat test config
Config OK
# 注意:在启动之前一定要将filebeat导入的topic创建好
# 后台启动filebeat
./filebeat &

准备工作

创建/var/log/myapp/error.log,放入日志内容

vi /var/log/myapp/error.log
[error 信息,系统异常,error-1]
[error 信息,系统异常,error-2]
[error 信息,系统异常,error-3]
[error 信息,系统异常,error-4]
[error 信息,系统异常,error-5]
[error 信息,系统异常,error-6]

在Kafka eagle创建topic:order-error-log

创建/var/log/myapp/info.log,放入日志内容

vi /var/log/myapp/info.log
[info 信息,系统提示,info-1]
[info 信息,系统提示,info-2]
[info 信息,系统提示,info-3]
[info 信息,系统提示,info-4]
[info 信息,系统提示,info-5]
[info 信息,系统提示,info-6]

在Kafka eagle创建topic:order-info-log

验证日志文件内容是否已收集进kafka

使用命令查看topic 消息

kafka-console-consumer.sh --topic order-error-log --from-beginning --bootstrap-server 127.0.0.1:9092

Springboot Consumer消费端监听

@Service
public class ConsumerService {

	@KafkaListener(groupId = "group01",topics = "order-error-log")
	public void onMessager(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
		System.out.println("*******获取消息:" + record.value());
		acknowledgment.acknowledge(); // 确认签收
	}
}

9. 通过logstash提取kafka数据进入ES

# 0.解压安装
# 根据ElasticSearch的版本下载,确保已安装Java,配置环境变量
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.5.2.tar.gz
# 解压
tar -zxvf logstash-7.5.2.tar.gz
# 1.在根目录创建sync的,之前我已经使用过logstash同步mysql的数据到ES中
[esuser@helloworld sync]$ ls
aiko-dict-task.sql  logstash-db-dict-sync.conf  logstash-ik.json  mysql-connector-java-5.1.47.jar  track_time
[esuser@helloworld sync]$ vi logstash-log-sync.conf
# 2.在sync文件里创建logstash-log-sync.conf,配置内容如下
input {
    kafka {
        bootstrap_servers => "192.168.0.177:9092,192.168.0.178:9092,192.168.0.179:9092"  # kafka集群的写法
        topics_pattern  => "order-.*"  # 匹配topic
        consumer_threads => 5   # 读取的线程数
        decorate_events => true 
        codec => "json"
        auto_offset_reset => "latest"  # 取最新的数据
        group_id => "logstash1" 
    }
}
output {
    elasticsearch {
        hosts => ["192.168.0.175:9200"]  # es地址 不能使用127.0.0.1,使用内网ip或外网ip
        index => "kafka-%{+YYYY.MM.dd}"
    }
    stdout{
        codec => rubydebug     ##输出到屏幕上
    }
}
# 在bin目录下启动执行
./logstash -f /usr/local/logstash/sync/logstash-log-sync.conf
# nohup后台启动
nohup ./logstash -f /usr/local/logstash/logstash-7.5.2/sync/logstash-log-sync.conf > myout.file 2>&1 &

准备工作

启动elasticsearch,elasticsearch-head 访问

# 查看java运行的进程
[root@helloworld bin]# jps
11507 Elasticsearch
15445 Jps
14408 QuorumPeerMain
14489 QuorumPeerMain
14442 QuorumPeerMain
9130 Kafka
12171 Elasticsearch
16877 KafkaEagle
12077 Elasticsearch
# kill掉其中一个es
[root@helloworld bin]# 

logstash 启动成功

看es现在是还没有创建kafka的索引,我们来修改日志文件/var/log/myapp/error.log

# 增加 7到11的error数据
[root@helloworld ~]# vi /var/log/myapp/error.log
[error 信息,系统异常,error-7]
[error 信息,系统异常,error-8]
[error 信息,系统异常,error-9]
[error 信息,系统异常,error-10]
[error 信息,系统异常,error-11]

保存后,马上触发了logstash的读取kafka的日志内容,在这之前,filebeat是先把日志内容放进了kafka的

这时我们来看es-head控制台,发现创建了索引kafka-2020.07.23

11条错误信息都读进来了

问题

现在我加一条错误信息数据,就导致整个文件重新读进了kafka,从而导致数据重复

# 增加一条的error数据
[root@helloworld ~]# vi /var/log/myapp/error.log
[error 信息,系统异常,error-12]

发现filebeat 层没有做过滤,数据重复写进了kafka,导致重复写进了ES

看ES的数据重复了

由于filebeat只能采集数据无法进行筛选,所以过滤层要做logstash做

注意

filebeat重复发送数据内容,是因为我们没有做换行,使用shell命令写入一行

[root@helloworld myapp]# echo '[info 信息,系统提示,info-14]' >> info.log 

这样它就没有把之前的内容重新发送过来了,只发送了我新添加的一行数据

10.kafka知识点总结

Post Directory