[TOC]

一、ELK+Kafka+ZooKeeper

分布式日志数据==》集中式 ==》查询和管理,故障排查

1、各组件及作用

Elasticsearch:日志信息、日志信息搜索(全文搜索引擎)(外部端口9200;内部端口9300)

Logstash:数据输入、输出、数据传输、数据处理、格式化

Kibana:针对Elasticsearch分析及可视化平台,数据整合、数据分析,简单数据导出 (端口5601)

Filebeat:轻量级日志收集器(Logstash比较占用资源但有格式转换功能)

kafka:中间件,消息队列,有效处理数据保证数据不丢失。 消息队列:处理高并发、高并列,kafka用的最多,,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统)

一般用的基础架构

通过logstash拉取业务服务信息,然后传给elasticsearch,通过kibana在网站页面展现

一般会使用filebeat替代logstash,主要logstash太占资源

加入中间件的架构(kafka+zookeeper)

1608024416592

第一层、数据采集层

最左边的是业务服务器集群,上面安装了filebeat做日志采集,同时把采集的日志分别发送给两个logstash服务。

第二层、数据处理层,数据缓存层

logstash服务把接受到的日志经过格式处理,转存到本地的kafka broker+zookeeper 集群中。

第三层、数据转发层

这个单独的Logstash节点会实时去kafka broker集群拉数据,转发至ES DataNode。

第四层、数据持久化存储

ES DataNode 会把收到的数据,写磁盘,建索引库。

第五层、数据检索,数据展示

ES Master + Kibana 主要 协调 ES集群,处理数据检索请求,数据展示。

注1:【Kafka的加入原因与作用】

整个架构加入Kafka,是为了让整个系统更好的分层,Kafka作为一个消息流处理与持久化存储软件,能够帮助我们在主节点上屏蔽掉多个从节点之间不同日志文件的差异,负责管理日志端(从节点)的人可以专注于向 Kafka里生产数据,而负责数据分析聚合端的人则可以专注于从 Kafka内消费数据。所以部署时要把Kafka加进去。

而且使用Kafka进行日志传输的原因还在于其有数据缓存的能力,并且它的数据可重复消费,Kafka本身具有高可用性,能够很好的防止数据丢失,它的吞吐量相对来说比较好并且使用广泛。可以有效防止日志丢失和防止logsthash挂掉。综合来说:它均衡了网络传输,从而降低了网络闭塞,尤其是丢失数据的可能性,

注2:【双层的Logstash作用】

这里为什么要在Kafka前面增加二台logstash呢?是因为在大量的日志数据写入时,容易导致数据的丢失和混乱,为了解决这一问题,增加二台logstash可以通过类型进行汇总分类,降低数据传输的臃肿。

如果只有一层的Logstash,它将处理来自不同客户端Filebeat收集的日志信息汇总,并且进行处理分析,在一定程度上会造成在大规模日志数据下信息的处理混乱,并严重加深负载,所以有二层的结构进行负载均衡处理,并且职责分工,一层汇聚简单分流,一层分析过滤处理信息,并且内层都有二台Logstash来保障服务的高可用性,以此提升整个架构的稳定性。

2、Elasticsearch

Elasticsearch,基于RESTful web接口。

Elasticsearch作为数据持久化存储环节,主要就是接受采集端发送过来的数据,执行写磁盘,建立索引库,最后将结构化的数据存储到ES集群上

Elasticsearch是用Java开发的,提供了一个分布式多用户能力的全文搜索引擎,设计用于云计算中,能够达到

实时搜索,稳定,可靠,快速,安装使用方便。

Elasticsearch的基础核心概念:

1、接近实时(NRT)

lasticsearch是一个接近实时的搜索平台,这意味着,从索引一个文档直到这个文档能够被搜索到有一个轻微的延迟(通常是1秒)

2、集群(cluster)

一个集群就是由一个或多个节点组织在一起,它们共同持有你整个的数据,并一起提供索引和搜索功能。其中一个节点为主节点,这个主节点是可以通过选举产生的,并提供跨节点的联合索引和搜索的功能。集群有一个唯一性标示的名字,默认是elasticsearch,集群名字很重要,每个节点是基于集群名字加入到其集群中的。因此,确保在不同环境中使用不同的集群名字。

一个集群可以只有一个节点。强烈建议在配置elasticsearch时,配置成集群模式。

3、节点(node)

节点就是一台单一的服务器,是集群的一部分,存储数据并参与集群的索引和搜索功能。像集群一样,节点也是通过名字来标识,默认是在节点启动时随机分配的字符名。当然,你可以自己定义。该名字也很重要,在集群中用于识别服务器对应的节点。

节点可以通过指定集群名字来加入到集群中。默认情况,每个节点被设置成加入到elasticsearch集群。如果启动了多个节点,假设能自动发现对方,他们将会自动组建一个名为elasticsearch的集群。

4、索引(index)

一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母的),并且当我们要对对应于这个索引中的文档进行索引、搜索、更新和删除的时候,都要使用到这个名字。在一个集群中,如果你想,可以定义任意多的索引。

●索引相对于关系型数据库的库。

5、类型(type)

在一个索引中,你可以定义一种或多种类型。一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具有一组共同字段的文档定义一个类型。比如说,我们假设你运营一个博客平台并且将你所有的数据存储到一个索引中。在这个索引中,你可以为用户数据定义一个类型,为博客数据定义另一个类型,当然,也可以为评论数据定义另一个类型。

●类型相对于关系型数据库的表

3、ES的数据备份和恢复

Elasticsearch 5.x 数据备份和恢复可由 snapshot 模块来完成,snapshot模块可以通过文件共享系统为单个索引或整个集群远程创建快照和进行数据恢复。

索引快照时增量的。在创建快照前es会分析已有快照仓库,只对上次备份后更改的内容进行增量备份。在创建备份时同一个集群中只能运行一个es snapshot进程。

Es 基础命令

创建快照仓库

1
2
3
4
5
6
curl -X PUT "node1:9200/_snapshot/my_backup" -H 'Content-Type: application/json' -d'{
"type": "fs",
"settings": {
"location": "sys_backup"
}
}'

查看已注册的快照仓库

1
curl -X GET "node1:9200/_snapshot/my_backup"

可以使用逗号间隔多个仓库,星号通配符匹配所有仓库名字,下面示例返回仓库名以repo开头的和包含backup的仓库信息:

1
curl -X GET "node1:9200/_snapshot/repo*,*backup*"

获取所有已注册快照仓库,省略仓库名或者使用_all

1
curl -X GET "node1:9200/_snapshot"

或者

1
curl -X GET "node1:9200/_snapshot/_all"

查看快照仓库列表

1
curl -X GET "node1:9200/_cat/repositories?v"

准备工作

文件共享系统

nfs、hdfs?

共享文件系统仓库(“type”: “fs”)使用共享文件系统存快照,如果要注册共享文件系统仓库,必须在所有master和data节点挂载相同的共享文件系统到同一个路径位置。这个路径位置(或者它的一个父目录)必须在所有master和data节点的path.repo设置上注册。

假设共享文件系统挂载到 /data/backups/es_backup ,应该在elasticsearch.yml文件中添加如下配置:

1
path.repo: ["/data/backups", "/data/longterm_backups"]

创建快照仓库

所有节点重启之后,执行下面的命令注册名字为 es_backup 的共享文件系统仓库:

1
2
3
4
5
6
7
8
9
curl -X PUT 'node1:9200/_snapshot/es_backup?verify=false' -H 'Content-Type: application/json' -d'{
"type": "fs",
"settings": {
"location": "/mount/backups/es_backup",
"compress": true,
"max_restore_bytes_per_sec": 50m,
"max_snapshot_bytes_per_sec": 30m
}
}'

如果使用相对路径,该路径将根据在path.repo中定义的第一个路径决定:

1
2
3
4
5
6
7
8
9
curl -XPUT 'http://node1:9200/_snapshot/es_backup?verify=false' -H 'Content-Type: application/json' -d '{
"type": "fs",
"settings": {
"location": "es_backup",
"compress": true,
"max_restore_bytes_per_sec": 50m,
"max_snapshot_bytes_per_sec": 30m
}
}'
参数说明
参数 含义
location 快照存储位置
compress 是否压缩源文件,默认为true
chunk_size 如果有需要,可以将大文件分解为多个小文件,默认不开启
max_restore_bytes_per_sec 指定数据恢复速度,默认为 40m/s
max_snapshot_bytes_per_sec 指定创建快照时的速度,默认为 40m/s
readonly 设置为只读仓库,默认为false
Repository Verification

在创建一个仓库时,会即刻在集群所有节点验证确保其功能在所有节点可用,verify 参数可以用来取消该验证(如果想使用验证功能,创建仓库时去掉 ?verify=false 参数即可):

1
2
3
4
curl -XPUT 'http://node1:9200/_snapshot/es_backup?verify=false' -H 'Content-Type: application/json' -d '{
"type": fs
... ...
}'

验证过程可以通过下面命令手动执行:

1
curl -X POST "node1:9200/_snapshot/es_backup/_verify"

Snapshot

创建快照

一个仓库可以拥有同一个集群的多个快照。在一个集群中快照拥有一个唯一名字作为标识。

示例: 在仓库 es_backup 中创建名字为 test_snapshot 的快照,可以通过执行下面的命令来实现。

1
curl -X PUT "node1:9200/_snapshot/es_backup/test_snapshot?wait_for_completion=true"

参数 wait_for_completion 决定请求是在快照初始化后立即返回(默认),还是等快照创建完成之后再返回。快照初始化时,所有之前的快照信息会被加载到内存,所以在一个大的仓库中改请求需要若干秒(甚至分钟)才能返回,即使参数 wait_for_completion 的值设置为 false。

默认情况下,创建一个快照会包含集群中所有打开和启动状态的索引。可以通过在创建快照的请求体中定义索引列表来改变这个默认处理:

1
2
3
4
5
6
curl -X PUT "node1:9200/_snapshot/es_backup/test_snapshot_2?wait_for_completion=true" -H 'Content-Type: application/json' -d'
{
"indices": "index_1,index_2",
"ignore_unavailable": true,
"include_global_state": false
}

要包含到快照中索引列表可以使用支持多个索引语法的 indices 参数来指定。快照请求也支持 ignore_unavailable 选项,该选项设置为 true 时,在创建快照时会忽略不存在的索引。默认情况下,如果选项 ignore_unavailable 没有设值,一个索引缺失,快照请求会失败。

通过设置 include_global_statefalse,可以阻止集群全局状态信息被保存为快照的一部分。默认情况下,如果如果一个快照中的一个或者多个索引没有所有主分片可用,整个快照创建会失败,该情况可以通过设置 partial 为 true 来改变。

快照名可以通过使用 date_math_expressions 来自动获得,和创建新索引时类似。注意特殊字符需要 URI 转码处理。

例如,在名字中使用当前日期,比如 snapshot-2018.05.11,来创建快照,可以使用如下命令完成:

1
2
# PUT /_snapshot/es_backup/<snapshot-{now/d}>
curl -X PUT "node1:9200/_snapshot/es_backup/%3Csnapshot-%7Bnow%2Fd%7D%3E"

创建快照:

1
2
3
4
5
6
curl -X PUT "node1:9200/_snapshot/es_backup/syslog?wait_for_completion=true" -H 'Content-Type: application/json' -d'
{
"indices": "bash_history.log*,secure.log*,cron.log*",
"ignore_unavailable": true,
"include_global_state": false
}

查看快照信息

1
curl -X GET "node1:9200/_snapshot/es_backup/syslog"

这个命令返回快照的基本信息,包括开始合结束时间、创建快照的 ElasticSearch 版本、包含的索引列表、快照当前状态和快照期间产生的失败索引列表。快照的状态有:

状态 含义
IN_PROGRESS 正在创建快照
SUCCESS 快照创建成功
FAILED 快照创建完成,但是有错误,数据不会保存
PARTIAL 整个集群备份完成,但是至少有一个shard数据存贮失败,会有更具体报错信息
INCOMPATIBLE 创建快照的es版本和当前集群es版本不一致

查看某仓库下所有快照信息:

1
curl -X GET "node1:9200/_snapshot/es_backup/_all"

查看当前正在运行的快照:

1
curl -X GET "localhost:9200/_snapshot/my_backup/_current"

删除快照

从仓库中删除一个快照,使用如下命令:

1
curl -X DELETE "node1:9200/_snapshot/es_backup/test_snapshot_2"

当一个快照从仓库中删除,ElasticSearch 将删除该快照关联的但不被其他快照使用的所有文件。如果在快照创建的时候执行快照删除操作,此快照创建进程将终止且所有该进程已创建的文件也将被清理。所以,快照删除操作可以用来取消错误启动的长时间运行的快照操作。

删除仓库

可以使用下面命令注销仓库:

1
curl -X DELETE "node1:9200/_snapshot/es_backup"

数据恢复

全量恢复

快照可以通过执行以下命令恢复

1
curl -X POST "node1:9200/_snapshot/es_backup/syslog/_restore"

默认情况下,快照中的所有索引将被恢复,集群状态不被恢复。可以通过在恢复请求中使用 indices 和 include_global_state 选项来指定要恢复的索引和允许恢复集群全局状态。索引列表支持多索引语法。rename_pattern 和 rename_replacement 选项在恢复时通过正则表达式来重命名索引。设置 include_aliases 为 false 可以防止与索引关联的别名被一起恢复。

1
2
3
4
5
6
7
8
curl -X POST "localhost:9200/_snapshot/my_backup/snapshot_1/_restore" -H 'Content-Type: application/json' -d'
{
"indices": "index_1,index_2",
"ignore_unavailable": true,
"include_global_state": true,
"rename_pattern": "index_(.+)",
"rename_replacement": "restored_index_$1"
}'

恢复操作可以在正常运行的集群上执行。已存在的索引只能在关闭状态下才能恢复,并且要跟快照中索引拥有相同数目的分片。还原操作自动打开关闭状态的索引,如果被还原索引在集群不存在,将创建新索引。如果集群状态通过 include_global_state (默认是 false)选项被还原,在集群中不存在的模板会被新增,已存在的同名模板会被快照中的模板替换。持久化设置会被添加到现有的持久化设置中。

4、Logstash

缺点:很消耗内存和CPU

1)、 logstash 介绍

LogStash由JRuby语言编写,基于消息(message-based)的简单架构,并运行在Java虚拟机(JVM)上。不同于分离的代理端(agent)或主机端(server),LogStash可配置单一的代理端(agent)与其它开源软件结合,以实现不同的功能。

2)、logStash的四大组件

  • Shipper:发送事件(events)至LogStash;通常,远程代理端(agent)只需要运行这个组件即可;
  • Broker and Indexer:接收并索引化事件;
  • Search and Storage:允许对事件进行搜索和存储;
  • Web Interface:基于Web的展示界面
    正是由于以上组件在LogStash架构中可独立部署,才提供了更好的集群扩展性。

3)、LogStash主机分类

  • 代理主机(agent host):作为事件的传递者(shipper),将各种日志数据发送至中心主机;只需运行Logstash 代理(agent)程序;
  • 中心主机(central host):可运行包括中间转发器(Broker)、索引器(Indexer)、搜索和存储器(Search and Storage)、Web界面端(Web Interface)在内的各个组件,以实现对日志数据的接收、处理和存储。

4)、Logstash工作原理:

Logstash事件处理有三个阶段:inputs → filters → outputs。是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。

img

Input:输入数据到logstash。

一些常用的输入为:

1
2
3
4
5
6
7
file:从文件系统的文件中读取,类似于tail -f命令

syslog:在514端口上监听系统日志消息,并根据RFC3164标准进行解析

redis:从redis service中读取

beats:从filebeat中读取

Filters:数据中间处理,对数据进行操作。

一些常用的过滤器为:

1
2
3
4
5
6
7
8
9
grok:解析任意文本数据,Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。内置120多个解析语法。

mutate:对字段进行转换。例如对字段进行删除、替换、修改、重命名等。

drop:丢弃一部分events不进行处理。

clone:拷贝 event,这个过程中也可以添加或移除字段。

geoip:添加地理信息(为前台kibana图形化展示使用)

Outputs:outputs是logstash处理管道的最末端组件。一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。

一些常见的outputs为:

1
2
3
4
5
6
7
8
9
10
11
12
13
elasticsearch:可以高效的保存数据,并且能够方便和简单的进行查询。

file:将event数据保存到文件中。

graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。

**Codecs:codecs 是基于数据流的过滤器,它可以作为input,output的一部分配置**。Codecs可以帮助你轻松的分割发送过来已经被序列化的数据。

一些常见的codecs:

json:使用json格式对数据进行编码/解码。

multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
input {
syslog {
type => "system-syslog"
host => "192.168.56.11"
port => "514"
}
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
file {
path => "/var/log/nginx/access_json.log"
codec => json
start_position => "beginning"
type => "nginx-log"
}
file {
path => "/var/log/elasticsearch/chuck-cluster.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
if [type] == "system" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
if [type] == "es-error" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
if [type] == "nginx-log" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "nginx-log-%{+YYYY.MM.dd}"
}
}
if [type] == "system-syslog" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "system-syslog-%{+YYYY.MM.dd}"
}
}
}

5、Filebeat

Filebeat工作原理:

Filebeat由两个主要组件组成:prospectors 和 harvesters。这两个组件协同工作将文件变动发送到指定的输出中。

img

Harvester(收割机):

负责读取单个文件内容。每个文件会启动一个Harvester,每个Harvester会逐行读取各个文件,并将文件内容发送到制定输出中。Harvester负责打开和关闭文件,意味在Harvester运行的时候,文件描述符处于打开状态,如果文件在收集中被重命名或者被删除,Filebeat会继续读取此文件。所以在Harvester关闭之前,磁盘不会被释放。默认情况filebeat会保持文件打开的状态,直到达到close_inactive(如果此选项开启,filebeat会在指定时间内将不再更新的文件句柄关闭,时间从harvester读取最后一行的时间开始计时。若文件句柄被关闭后,文件发生变化,则会启动一个新的harvester。关闭文件句柄的时间不取决于文件的修改时间,若此参数配置不当,则可能发生日志不实时的情况,由scan_frequency参数决定,默认10s。Harvester使用内部时间戳来记录文件最后被收集的时间。例如:设置5m,则在Harvester读取文件的最后一行之后,开始倒计时5分钟,若5分钟内文件无变化,则关闭文件句柄。默认5m)。

Prospector(勘测者):负责管理Harvester并找到所有读取源。

1
`filebeat.prospectors:``- input_type: log`` ``paths:``  ``- /apps/logs/*/info.log`

Prospector会找到/apps/logs/*目录下的所有info.log文件,并为每个文件启动一个Harvester。Prospector会检查每个文件,看Harvester是否已经启动,是否需要启动,或者文件是否可以忽略。若Harvester关闭,只有在文件大小发生变化的时候Prospector才会执行检查。只能检测本地的文件。

Filebeat如何记录文件状态:

将文件状态记录在文件中(默认在/var/lib/filebeat/registry)。此状态可以记住Harvester收集文件的偏移量。若连接不上输出设备,如ES等,filebeat会记录发送前的最后一行,并再可以连接的时候继续发送。Filebeat在运行的时候,Prospector状态会被记录在内存中。Filebeat重启的时候,利用registry记录的状态来进行重建,用来还原到重启之前的状态。每个Prospector会为每个找到的文件记录一个状态,对于每个文件,Filebeat存储唯一标识符以检测文件是否先前被收集。

Filebeat如何保证事件至少被输出一次:

Filebeat之所以能保证事件至少被传递到配置的输出一次,没有数据丢失,是因为filebeat将每个事件的传递状态保存在文件中。在未得到输出方确认时,filebeat会尝试一直发送,直到得到回应。若filebeat在传输过程中被关闭,则不会再关闭之前确认所有时事件。任何在filebeat关闭之前为确认的时间,都会在filebeat重启之后重新发送。这可确保至少发送一次,但有可能会重复。可通过设置shutdown_timeout 参数来设置关闭之前的等待事件回应的时间(默认禁用)。

Filebeat输出到logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

[admin@ris-1 filebeat]$ sudo cat filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
- /var/log/messages
fields:
service : "ris-1-systemlog-filebeat"
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
output.logstash:
hosts: ["10.6.75.171:5044"] #logstash地址可以是多个,我这里是本机,其实没必要
#hosts:["192.168.108.191:5044", "192.168.108.87:5044"] # 发往二台Logstash-collect
loadbalance: true
#loadbalance: false # 消息只是往一个logstash里发,如果这个logstash挂了,就会自动将数据发到另一个logstash中。(主备模式)
#loadbalance: true # 如果为true,则将数据均分到各个logstash中,挂了就不发了,往存活的logstash里面发送。
worker: 2 #线程数
#compression_level: 3 #压缩级别
#output.file:
# path: "/tmp/"
# filename: filebeat

二、Kafka和ZooKeeper

1、架构图

1608022885638

一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。

2、Kafka中的术语

  • broker:中间的kafka cluster,存储消息,是由多个server组成的集群。
  • topic:kafka给消息提供的分类方式。broker用来存储不同topic的消息数据。
  • producer:往broker中某个topic里面生产数据。
  • consumer:从broker中某个topic获取数据。

1608023542181

3、topic与消息

kafka将所有消息组织成多个topic的形式存储,而每个topic又可以拆分成多个partition,每个partition又由一个一个消息组成。每个消息都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在partition中。

d85277e929d653e92841f190092440ee.png

这样,消息就以一个个id的方式,组织起来。

  • producer选择一个topic,生产消息,消息会通过分配策略append到某个partition末尾。
  • consumer选择一个topic,通过id指定从哪个位置开始消费消息。消费完成之后保留id,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费。

上面的id在kafka中称为offset,这种组织和处理策略提供了如下好处:

  • 消费者可以根据需求,灵活指定offset消费。

  • 保证了消息不变性,为并发消费提供了线程安全的保证

每个consumer都保留自己的offset,互相之间不干扰,不存在线程安全问题。

  • 消息访问的并行高效性

每个topic中的消息被组织成多个partition,partition均匀分配到集群server中。生产、消费消息的时候,会被路由到指定partition,减少竞争,增加了程序的并行能力。

  • 增加消息系统的可伸缩性

每个topic中保留的消息可能非常庞大,通过partition将消息切分成多个子消息,并通过负责均衡策略将partition分配到不同server。这样当机器负载满的时候,通过扩容可以将消息重新均匀分配。

  • 保证消息可靠性

消息消费完成之后不会删除,可以通过重置offset重新消费,保证了消息不会丢失。

  • 灵活的持久化策略

可以通过指定时间段(如最近一天)来保存消息,节省broker存储空间。

  • 备份高可用性。

消息以partition为单位分配到多个server,并以partition为单位进行备份。备份策略为:1个leader和N个followers,leader接受读写请求,followers被动复制leader。leader和followers会在集群中打散,保证partition高可用。

4、Partitions

​ 每个Topics划分为一个或者多个Partition,并且Partition中的每条消息都被标记了一个sequential id ,也就是offset,并且存储的数据是可配置存储时间的

cff253038e988e8473593bc42f401e1b.png

5、producer

producer生产消息需要如下参数:

  • topic:往哪个topic生产消息。
  • partition:往哪个partition生产消息。
  • key:根据该key将消息分区到不同partition。
  • message:消息。

img

6、consumer

传统消息系统有两种模式:

  • 队列
  • 发布订阅

kafka通过consumer group将两种模式统一处理:每个consumer将自己标记consumer group名称,之后系统会将consumer group按名称分组,将消息复制并分发给所有分组,每个分组只有一个consumer能消费这条消息。如下图:

img

于是推理出两个极端情况:

  • 当所有consumer的consumer group相同时,系统变成队列模式
  • 当每个consumer的consumer group都不相同时,系统变成发布订阅

注意:

​ 1、Consumer Groups 提供了topics和partitions的隔离, 如上图Consumer Group A中的consumer-C2挂掉,consumer-C1会接收P1,P2,即一个consumer Group中有其他consumer挂掉后能够重新平衡。如下图:

bf6adebeae181d8860c2f3478ccb3df1.png

​ 2、多consumer并发消费消息时,容易导致消息乱序,通过限制消费者为同步,可以保证消息有序,但是这大大降低了程序的并发性。

kafka通过partition的概念,保证了partition内消息有序性,缓解了上面的问题。partition内消息会复制分发给所有分组,每个分组只有一个consumer能消费这条消息。这个语义保证了某个分组消费某个分区的消息,是同步而非并发的。如果一个topic只有一个partition,那么这个topic并发消费有序,否则只是单个partition有序。

一般消息系统,consumer存在两种消费模型

  • push:优势在于消息实时性高。劣势在于没有考虑consumer消费能力和饱和情况,容易导致producer压垮consumer。
  • pull:优势在可以控制消费速度和消费数量,保证consumer不会出现饱和。劣势在于当没有数据,会出现空轮询,消耗cpu。

kafka采用pull,并采用可配置化参数保证当存在数据并且数据量达到一定量的时候,consumer端才进行pull操作,否则一直处于block状态。kakfa采用整数值consumer position来记录单个分区的消费状态,并且单个分区单个消息只能被consumer group内的一个consumer消费,维护简单开销小。消费完成,broker收到确认,position指向下次消费的offset。由于消息不会删除,在完成消费,position更新之后,consumer依然可以重置offset重新消费历史消息。

消息发送语义

producer视角

  • 消息最多发送一次:producer异步发送消息,或者同步发消息但重试次数为0。
  • 消息至少发送一次:producer同步发送消息,失败、超时都会重试。
  • 消息发且仅发一次:后续版本支持。

consumer视角

  • 消息最多消费一次:consumer先读取消息,再确认position,最后处理消息。
  • 消息至少消费一次:consumer先读取消息,再处理消息,最后确认position。
  • 消息消费且仅消费一次。

注意

  • 如果消息处理后的输出端(如db)能保证消息更新幂等性,则多次消费也能保证exactly once语义。
  • 如果输出端能支持两阶段提交协议,则能保证确认position和处理输出消息同时成功或者同时失败。
  • 在消息处理的输出端存储更新后的position,保证了确认position和处理输出消息的原子性(简单、通用)。

可用性

在kafka中,正常情况下所有node处于同步中状态,当某个node处于非同步中状态,也就意味着整个系统出问题,需要做容错处理。

同步中代表了:

  • 该node与zookeeper能连通。
  • 该node如果是follower,那么consumer position与leader不能差距太大(差额可配置)。

某个分区内同步中的node组成一个集合,即该分区的ISR。

kafka通过两个手段容错:

  • 数据备份:以partition为单位备份,副本数可设置。当副本数为N时,代表1个leader,N-1个followers,followers可以视为leader的consumer,拉取leader的消息,append到自己的系统中
  • failover
  1. 当leader处于非同步中时,系统从followers中选举新leader

  2. 当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步之后再次进入 ISR。

另外,kafka有个保障:当producer生产消息时,只有当消息被所有ISR确认时,才表示该消息提交成功。只有提交成功的消息,才能被consumer消费。

因此,当有N个副本时,N个副本都在ISR中,N-1个副本都出现异常时,系统依然能提供服务。

持久化

基于以下几点事实,kafka重度依赖磁盘而非内存来存储消息。

  • 硬盘便宜,内存贵
  • 顺序读+预读取操作,能提高缓存命中率
  • 在持久化数据结构的选择上,kafka采用了queue而不是Btree
  • kafka只有简单的根据offset读和append操作,所以基于queue操作的时间复杂度为O(1),而基于Btree操作的时间复杂度为O(logN)

7、解释Kafka的Zookeeper是什么?我们可以在没有Zookeeper的情况下使用Kafka吗?

Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。

不,不可能越过Zookeeper,直接联系Kafka broker。一旦Zookeeper停止工作,它就不能服务客户端请求。

Zookeeper主要用于在集群中不同节点之间进行通信

在Kafka中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取

除此之外,它还执行其他活动,如: leader检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

2181 对Client端提供服务的端口。
3888 选举Leader。
2888 集群内的机器通讯使用。(Leader使用此端口)

8、kafka如何彻底删除topic及数据?

删除kafka topic及其数据,严格来说并不是很难的操作。但是,往往给kafka 用者带来诸多问题。项目组之前接触过多个开发者,发现都会偶然出现无法彻底删除kafka的情况。本文总结多个删除kafka topic的应用场景,总结一套删除kafka topic的标准操作方法。

step1:

如果需要被删除topic 此时正在被程序 produce和consume,则这些生产和消费程序需要停止。

因为如果有程序正在生产或者消费该topic,则该topic的offset信息会一直在broker更新。调用kafka delete命令则无法删除该topic。

同时,需要设置 auto.create.topics.enable = false,默认设置为true。如果设置为true,则produce或者fetch 不存在的topic也会自动创建这个topic。这样会给删除topic带来很多意向不到的问题。

所以,这一步很重要,必须设置auto.create.topics.enable = false,并认真把生产和消费程序彻底全部停止。

step2:

server.properties配置文件 设置 delete.topic.enable=true

如果没有设置 delete.topic.enable=true,则调用kafka 的delete命令无法真正将topic删除,而是显示(marked for deletion)

step3:

调用命令删除topic:

./bin/kafka-topics.sh –delete –zookeeper 【zookeeper server:port】 –topic 【topic name】

step4:

删除kafka存储目录(server.properties文件log.dirs配置,默认为”/data/kafka-logs”)相关topic的数据目录。

注意:如果kafka有多个 broker,且每个broker配置了多个数据盘(比如 /data/kafka-logs,/data1/kafka-logs …),且topic也有多个分区和replica,则需要对所有broker的所有数据盘进行扫描,删除该topic的所有分区数据。

step5:

完成之后,调用命令:

./bin/kafka-topics.sh –list –zookeeper 【zookeeper server:port】

查看现在kafka的topic信息。正常情况下删除的topic就不会再显示。

但是,如果还能够查询到删除的topic,则重启zk和kafka即可。

三、优化:

1、为什么要消息队列

数据首先通过Filebeat来收集数据,然后经过 Output 插件将数据投递到 Kafka 集群中,这样当遇到 Filebeat接收数据的能力超过 Elasticsearch 集群处理能力的时候,就可以通过队列起到削峰填谷的作用, Elasticsearch 集群就不存在丢失数据的问题。

这种架构适合较大集群的应用部署,通过消息队列解决了消息丢失、数据堆积的问题。

2、kafka和redis对比

目前业界在日志服务场景中,使用比较多的两种消息队列为 :Kafka VS Redis。尽管 ELK Stack 官网建议使用 Redis 来做消息队列,但是我们建议采用 Kafka 。主要从下面两个方面考虑:

1.数据丢失:Redis 队列多用于实时性较高的消息推送,并不保证可靠。Kafka保证可靠但有点延时

2.数据堆积:Redis 队列容量取决于机器内存大小,如果超过设置的Max memory,数据就会抛弃。Kafka 的堆积能力取决于机器硬盘大小

综合上述的理由,我们决定采用 Kafka 来缓冲队列。

3、RabbitMQ和kafka对比:

RabbitMQ单机吞吐量:2.6w/s;kafka单机吞吐量:10多w/s;

RabbitMQ支持简单集群,’复制’模式,对高级集群模式支持不好;kafka天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave

Kafka的优势在于专为超高吞吐量的实时日志采集;

4、ELK优化

Elasticsearch很耗内存、cpu。

标准的建议是把50%的内存给elasticsearch,剩下的50%给Lucene(官方建议:https://www.elastic.co/guide/en/elasticsearch/guide/current/heap-sizing.html)。

单个es实例内存分配不要超过32G。jdk1.8配置成-Xms32766m -Xmx32766m

ES的jvm堆内存不足会有怎样的错误?

内存的使用和同一时间内的请求数量有关系的,尤其需要评分和排序的时候。

比如说,我分配了10G内存,当请求量不大的时候不会有什么问题,但当请求量大的时候,会怎样呢?

oom,进程直接挂掉;ES节点 JVM挂掉。

ES所在操作系统的内存优化

优化设置:禁用swap·分区

通过减低swap分区的使用积极性,永久生效;

/dev/mapper/centos-swap swap swap defaults 0 0 #进入/etc/fstab/将其注释

临时生效直接swapoff -a即可

降低swap分区使用积极性,可以控制系统的内存使用空间的阀值;swappiness=0表示最大限度使用物理内存,也就是说,当物理内存使用100%之后,才去使用swap交换分区;

比如说,我们现在需要设置系统内存大小阀值,当物理内存使用90%的时候,只剩10%的物理内存,再去使用swap空间

100-10=90%

# vim /etc/sysctl.conf

vm.swappiness = 10

最近发现kibana的日志传的很慢,常常查不到日志,由于所有的日志收集都只传输到了一个logstash进行收集和过滤,所以需要提高logstash的吞吐量。

Logstash性能优化(logstash的吞吐量存在瓶颈)

结果:ES的吞吐由每秒9817/s提升到41183/s

logstash.yml配置文件优化:

1、pipline.workers:因为logstash中的grok正则及其消耗系统计算字眼,同时filter也会存在瓶颈,此时增加工作线程,以提高性能

#工作线程数,官方建议是等于CPU内核数

pipeline.workers: 24

# 实际output时的线程数

pipeline.output.workers: 24

#查询一下ES当前的线程情况:

GET _nodes/stats/thread_pool?pretty

其中:”bulk”模板的线程数24,当前活跃的线程数24,证明所有的线程是busy的状态,queue队列214,rejected为30804543。那么问题就找到了,所有的线程都在忙,队列堵满后再有进程写入就会被拒绝,而当前拒绝数为30804543。

优化方案

●问题找到了,如何优化呢。官方的建议是提高每次批处理的数量,调节传输间歇时间。当batch.size增大,es处理的事件数就会变少,写入也就顺畅了

2、pipeline.batch.size:批量执行event的最大值,该值用于input批量处理事件值,

再打包发送给filter和output.可以提高性能,但是会增加额外的内存开销

# 每次发送的事件数

pipeline.batch.size: 3000 #后来最优的值为10000

3、pipeline.batch.delay:批量处理事件的最大等待值

(input需要按照batch处理的最大发送到消息队列,需要设置一个超时事件)

# 发送延时

pipeline.batch.delay: 5 #改成10

Filebeat优化

还记得我们为什么要使用filebeat采集日志数据吗?那是因为Logstash功能虽然强大,但是它依赖于java,在海量日志环境中,logstash进程会消耗更多的系统资源,这将严重的影响业务系统的性能,

而我们说的filebeat是基于go语言,没有任何依赖,配置简单,占用系统资源少,

比logstash更加的轻量级;但是有点还是需要注意。在日志量比较大的情况下或者日志异常突发时,

filebeat也会占用大量的系统内存开销,所以说这方面的优化,也是至关重要的

内存优化,Filebeat内存受到两种模式的限制,一种是内存模式,第二种是文件缓存模式,任选其一即可

Kafka的性能优化

##每天8个G日志量##

1.Kafka提供两种策略删除旧数据,否则磁盘不够用。

一是基于时间,二是基于Partition文件大小。

可以通过配置server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示:

# The minimum age of a log file to be eligible for deletion

log.retention.hours=96 (168小时=一周,默认是一周,可以调小一点)

log.retention.bytes=1GB

2.borker进行I/O处理的线程数;一般为磁盘个数(默认为8,根据情况调大)num.io.threads=8

3.borker进行网络处理的线程数;默认为3,可以调为cpu核数+1;num.network.threads=3

4.follow从leader拉取消息进行同步数据的最大字节数:replica.fetch.max.bytes=5242880(默认1MB,太小了,改为5MB)

既然我们在ELK中用到了Kafka,那么优化也是必须的,先来回顾一下,kafka是一个高吞吐分布式消息系统,并且提供了持久化,高性能主要表现在以下两点:

第一,磁盘的连续读写性能远远高于随机读写;

第二:拆分一个topic主题分配多个partition分区,这样可以提供并发和吞吐量;

另外,我们的kafka消息读写为什么这么高效?原因何在?

我们要知道linux系统内核为文件设置一个缓存机制,所有对文件读写的数据内容都会存在着缓存中,称之为:page cache(页缓存)

缓存机制:

当一个文件发生读操作时,系统会先去page cache页缓存中读取,如果找到,便会直接返回,缓存中没有需要读取的数据内容,那么会去磁盘中读取,此时系统写入一份到缓存中。最终返回数据;

当有写操作时,亦是如此,数据会首先写入缓存并进行标识,

等待批量保存到文件系统,减少了磁盘的操作次数和系统额外开销

我们的kafka就是依赖于这种机制,数据的读写交互便是在缓存中完成接力,

不会因为kafka写入磁盘数据影响吞吐量,这就是为什么kafka非常高效的根本原因

topic的拆分:

kafka读写单位是partition,将一个topic分配到多个partition可以提高系统的吞吐量,但前提是将不同的partition分配到不同的磁盘上,如果多个partition位于一个磁盘上就会出现多个进程同时对磁盘上多个文件进行读写,这样造成了磁盘的频繁调度,破坏了磁盘读写的连续性

如何实现将不同的partition分配到不同的磁盘上呢?

我们可以将磁盘上的多个目录配置到broker的log.dirs上

# vim /usr/local/kafka/config/server.properties

log.dirs=/disk1/logs,/disk2/logs/,/disk3/logs #kafaka在新建partition时,会将partition分布在paritition最少的目录上面,因此,不能将同一个磁盘上的多个目录设置到logs.dirs上

(1) kafka配置参数优化:

num.network.threads=3 #broker处理消息的最大线程数

num.io.threads=8 #broker处理磁盘IO的线程数

一般num.network.threads主要就是处理网络IO,读写缓冲区数据,基本没有IO等待,配置线程数量为CPU核数n+1

num.io.threads主要进行磁盘IO操作,高峰期可以能有些等待,因此配置较大一点,配置线程数量为CPU核数的2~3倍即可

(2) 日志保留策略优化:

kafka被大量的写入日志消息后,会生成大量的数据文件,也就是日志消息,这样会占用大量的磁盘空间。

减少日志保留时间,通过log.retention.hours设置,单位是小时 log.retention.hours=72

#保留日志数据的时间范围,过后便会删除

段文件大小优化:

段文件配置大小为1GB,这样有利于快速的回收磁盘空间,重启kafka加载也会更快,如果说文件过小,那么文件数量就会较多,kafka启动的时候回单线扫描(log.dir)下的所有文件,文件较多启动较慢,会影响性能,

log.segment.bytes=1073741824 #段文件最大大小,超过该阀值,会自动创建新的日志段

(3) Logs数据文件写盘策略优化

为了大幅度提高producer写入吞吐量,需要制定定期批量写入文件磁盘的计划

每当producer写入10000条消息事,便会将数据写入磁盘,

#log.flush.interval.messages=10000 #强行将数据刷新到磁盘之前所能接受的消息数

#log.flush.interval.ms=1000 #在强制刷新之前,消息可以停留在日志中最长的时间(单位毫秒,每间隔1秒时间,刷数据到磁盘中)

ZooKeeper的配置优化:

目的:实现日志自动清理

这两个参数都是在zoo.cfg中配置的:

autopurge.purgeInterval 这个参数指定了清理频率,单位是小时,可以填小一点,比如1个小时

四、Logstash过滤+Kafka为什么这么快

Java堆栈日志怎么过滤?

我看我同事写的,用grok插件正则匹配,日志的第一个字段是”@timestamp”,从这一个时间点开始,到下一个时间点结束。

multiline 插件,匹配多行日志,

codec=> multiline {

​ pattern => “^[“

​ negate => true

​ what => “previous”

​ }

对 multiline 插件来说,有三个设置比较重要:negate、pattern 和 what。

negate

  • 默认为 false

否定正则表达式(如果没有匹配的话)。

pattern

  • 类型为 string
  • 没有默认值

要匹配的正则表达式。

what

  • 可以为 previous 或 next
  • 没有默认值

kafka为什么那么快?

写入数据:

Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术, 顺序写入 和 MMFile 。

顺序写入:

磁盘读写的快慢取决于你怎么使用它,也就是顺序读写或者随机读写。在顺序读写的情况下,某些优化场景磁盘的读写速度可以和内存持平。

因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。

而且Linux对于磁盘的读写优化也比较多,包括read-ahead和write-behind,磁盘缓存等。如果在内存做这些操作的时候,一个是JAVA对象的内存开销很大,另一个是随着堆内存数据的增多,JAVA的GC时间会变得很长,使用磁盘操作有以下几个好处:

磁盘顺序读写速度超过内存随机读写;

JVM的GC效率低,内存占用大。使用磁盘可以避免这一问题;

系统冷启动后,磁盘缓存依然可用

五、故障及解决方案:

1、启动filebeat将nginx日志文件信息传递给kafka集群时出现如下报错:

[root@localhost filebeat-6.5.4-linux-x86_64]# ./filebeat -e -c filebeat.yml

img

filebeat先连接14.0.0.20:9092,再连接node2:9092,如果未做本地解析就会无法连接

img

在kafka的配置文件/usr/local/kafka/config/server.properties中找到了答案:

img

===========================================================================

2、kafka数据堆积: (消费超时死循环)

监控kafka的topic消息,里面的消息没有被及时消费,出现报警信息,怎么解决?

看官方文档上写的kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用消费者的轮询方法。每次轮询到消息,在处理完这一批消息后,才会继续下一次轮询。但如果一次轮询返回的结构没办法及时处理完成,会有什么后果呢?服务端约定了和客户端max.poll.interval.ms,两次poll最大间隔。如果客户端处理一批消息花费的时间超过了这个限制时间,服务端可能就会把消费者客户端移除掉,并触发rebalance。

解决方案:

1、kafka消费者 默认此间隔时长为300s,本次故障是600s都没处理完成,于是改成3600s

max.poll.interval.ms=3600000

2、根据逻辑,当处理数据失败后,进行rebalance,跳出该轮回,进行下一项任务,这样也可以解决该问题, 但遗留部分数据异常可能性。

============================================================================

3、kafka之消费超时死循环

遇见该报错信息及解决

07-28 14:34:46.111 -ERROR 279920[skynet.stream.kfk.consumer-2] skynet.boot.stream.kafka.MyConsumer [188]: kafka consumer poll msg error:org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

前提背景:

服务正常输出结果,但是后续任务无法输出结果

根据服务日志发现该报错信息:

这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了,因为kafka认为这个消费者死去。

原因分析问题:

这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms,

该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用消费者的轮询方法。每次轮询到消息,在处理完这一批消息后,才会继续下一次轮询。但如果一次轮询返回的结构没办法及时处理完成,会有什么后果呢?服务端约定了和客户端max.poll.interval.ms,两次poll最大间隔。如果客户端处理一批消息花费的时间超过了这个限制时间,服务端可能就会把消费者客户端移除掉,并触发rebalance。

kafka的偏移量(offset)是由消费者进行管理的,偏移量有两种,拉取偏移量(position)与提交偏移量(committed)。拉取偏移量代表当前消费者分区消费进度。每次消息消费后,需要提交偏移量。在提交偏移量时,kafka会使用拉取偏移量的值作为分区的提交偏移量发送给协调者。

如果没有提交偏移量,下一次消费者重新与broker连接后,会从当前消费者group已提交到broker的偏移量处开始消费。

所以,问题就在这里,当我们处理消息时间太长时,已经被broker剔除,提交偏移量又会报错。所以拉取偏移量没有提交到broker,分区又rebalance。下一次重新分配分区时,消费者会从最新的已提交偏移量处开始消费。这里就出现了重复消费的问题。

解决方案:

1、kafka消费者默认此间隔时长为300s,本次故障是600s都没处理完成,于是改成3600s

max.poll.interval.ms=3600000

2、根据逻辑,当处理数据失败后,进行rebalance,跳出该轮回,进行下一项任务,这样也可以解决该问题,但遗留部分数据异常可能性。

=================================================================

4、如何创建topic?

###创建topic (只写一个节点就可以,集群之间会自动同步) #

1
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper  14.0.0.10:2181 --replication-factor 3 --partitions 3 --topic topicTest Created topic "topicTest"

###参数解释

1
2
3
4
5
--topic : 创建topic名为: topicTest 

--replication-factor : 复制因子为3

--partitions : 3个分区

查看现有的topic:

1
2
3
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper  

14.0.0.10:2181,14.0.0.20:2181,14.0.0.30:2181 topicTest

查看topic详细信息

1
2
3
 /usr/local/kafka/bin/kafka-topics.sh --describe  --zookeeper  14.0.0.10:2181,14.0.0.20:2181,14.0.0.30:2181  --topic topicTest Topic:topicTest PartitionCount:3        

ReplicationFactor:3 Configs: Topic: topicTest Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: topicTest Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: topicTest Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1

返回值解释 # Partition: 分区 # Leader : 负责读写指定分区的节点 # Replicas : 复制该分区log的节点列表 # Isr : 当前活跃的副本列表

5、如何生产消息?

1
[root@node1 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list  14.0.0.10:9092,14.0.0.20:9092,14.0.0.30:9092 --topic ConsumerTest >hello kafka And zookeeper >My name is baiyongjie >My blog address is baiyongjie.com >Welcome to visit! 

6、如何消费消息?

1
[root@node2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh  --bootstrap-server 14.0.0.10:9092,14.0.0.20:9092,14.0.0.30:9092 --topic ConsumerTest --from-beginning hello kafka And zookeeper My name is baiyongjie My blog address is baiyongjie.com Welcome to visit

!