首页 技术 正文
技术 2022年11月8日
0 收藏 411 点赞 2,089 浏览 30090 个字

若advertised.host.name的值是aa,则kafka发布的服务名也要是aa

kafka

log.cleanup.policy=delete 日志清理策略log.retention.hours=        (即7天)
数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除log.retention.bytes=- topic每个分区的最大文件大小
一个topic的大小限制=分区数*log.retention.bytes -1表示没有大小限制log.retention.check.interval.ms=5minutes 文件大小检查的周期时间
/* 自动确认offset的时间间隔  */
props.put("auto.commit.interval.ms", ""); props.put("session.timeout.ms", ""); //消息发送的最长等待时间.需大于session.timeout.ms这个时间
props.put("request.timeout.ms", ""); //一次从kafka中poll出来的数据条数
//max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
props.put("max.poll.records",""); //server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
props.put("fetch.min.bytes", "");
//若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间
props.put("fetch.wait.max.ms", "");

参数配置:https://my.oschina.net/infiniteSpace/blog/312890?p=1

server.properties中所有配置参数说明(解释)如下列表:
参数
说明(解释)
broker.id =
每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
log.dirs=/data/kafka-logs
kafka数据的存放地址,多个地址的话用逗号分割 /data/kafka-logs-,/data/kafka-logs-
port =
broker server服务端口
message.max.bytes =
表示消息体的最大大小,单位是字节
num.network.threads =
broker处理消息的最大线程数,一般情况下不需要去修改
num.io.threads =
broker处理磁盘IO的线程数,数值应该大于你的硬盘数
background.threads =
一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
queued.max.requests =
等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
host.name
broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
socket.send.buffer.bytes=*
socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.receive.buffer.bytes =*
socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.request.max.bytes =**
socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
log.segment.bytes =**
topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.roll.hours =*
这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete
日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.retention.minutes=3days
数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.bytes=-
topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.check.interval.ms=5minutes
文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.cleaner.enable=false
是否开启日志压缩
log.cleaner.threads =
日志压缩运行的线程数
log.cleaner.io.max.bytes.per.second=None
日志压缩时候处理的最大大小
log.cleaner.dedupe.buffer.size=**
日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好
log.cleaner.io.buffer.size=*
日志清理时候用到的IO块大小一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
日志清理中hash表的扩大因子一般不需要修改
log.cleaner.backoff.ms =
检查是否处罚日志清理的间隔
log.cleaner.min.cleanable.ratio=0.5
日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms =1day
对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.index.size.max.bytes =**
对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.interval.bytes =
当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.flush.interval.messages=None
log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
log.flush.scheduler.interval.ms =
检查是否需要固化到硬盘的时间间隔
log.flush.interval.ms = None
仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
log.delete.delay.ms =
文件在索引中清除后保留的时间一般不需要去修改
log.flush.offset.checkpoint.interval.ms =
控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改
auto.create.topics.enable =true
是否允许自动创建topic,若是false,就需要通过命令创建topic
default.replication.factor =
是否允许自动创建topic,若是false,就需要通过命令创建topic
num.partitions =
每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖以下是kafka中Leader,replicas配置参数controller.socket.timeout.ms =
partition leader与replicas之间通讯时,socket的超时时间
controller.message.queue.size=
partition leader与replicas数据同步时,消息的队列尺寸
replica.lag.time.max.ms =
replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.max.messages =
如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
##到其他follower中.
##在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.socket.timeout.ms=*
follower与leader之间的socket超时时间
replica.socket.receive.buffer.bytes=*
leader复制时候的socket缓存大小
replica.fetch.max.bytes =*
replicas每次获取数据的最大大小
replica.fetch.wait.max.ms =
replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.min.bytes =
fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
num.replica.fetchers=
leader进行复制的线程数,增大这个数值会增加follower的IO
replica.high.watermark.checkpoint.interval.ms =
每个replica检查是否将最高水位进行固化的频率
controlled.shutdown.enable =false
是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlled.shutdown.max.retries =
控制器关闭的尝试次数
controlled.shutdown.retry.backoff.ms =
每次关闭尝试的时间间隔
leader.imbalance.per.broker.percentage =
leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.check.interval.seconds =
检查leader是否不平衡的时间间隔
offset.metadata.max.bytes
客户端保留offset信息的最大空间大小
kafka中zookeeper参数配置zookeeper.connect = localhost:
zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.session.timeout.ms=
ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.connection.timeout.ms =
ZooKeeper的连接超时时间
zookeeper.sync.time.ms =
ZooKeeper集群中leader和follower之间的同步实际那

参数配置介绍:http://blog.csdn.net/lizhitao/article/details/25667831

参数 说明(解释)
broker.id   =0 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
log.dirs=/data/kafka-logs kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能  /data/kafka-logs-1,/data/kafka-logs-2
port   =9092 broker server服务端口
message.max.bytes   =6525000 表示消息体的最大大小,单位是字节
num.network.threads   =4 broker处理消息的最大线程数,一般情况下数量为cpu核数
num.io.threads   =8 broker处理磁盘IO的线程数,数值为cpu核数2倍
background.threads   =4 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
queued.max.requests   =500 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
host.name broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
socket.send.buffer.bytes=100*1024 socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.receive.buffer.bytes   =100*1024 socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.request.max.bytes   =100*1024*1024 socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
log.segment.bytes   =1024*1024*1024 topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.roll.hours   =24*7 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖
log.retention.minutes=300或者log.retention.hours=24 数据文件保留多长时间,存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略。
    log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除。
log.cleanup.policy   = delete 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.retention.bytes=-1 topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.check.interval.ms=5minutes 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.cleaner.enable=false 是否开启日志清理
log.cleaner.threads   = 2 日志清理运行的线程数
log.cleaner.io.max.bytes.per.second=None 日志清理时候处理的最大大小
log.cleaner.dedupe.buffer.size=500*1024*1024 日志清理去重时候的缓存空间,在空间允许的情况下,越大越好
log.cleaner.io.buffer.size=512*1024 日志清理时候用到的IO块大小一般不需要修改
log.cleaner.io.buffer.load.factor   =0.9 日志清理中hash表的扩大因子一般不需要修改
log.cleaner.backoff.ms   =15000 检查是否处罚日志清理的间隔
log.cleaner.min.cleanable.ratio=0.5 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms   =1day 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.index.size.max.bytes   =10*1024*1024 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.interval.bytes   =4096 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.flush.interval.messages=None
    例如log.flush.interval.messages=1000
   
log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性”的必要手段,所以此参数的设置,需要在”数据可靠性”与”性能”之间做必要的权衡.如果此值过大,将会导致每次”fsync”的时间较长(IO阻塞),如果此值过小,将会导致”fsync”的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
log.flush.scheduler.interval.ms   =3000 检查是否需要固化到硬盘的时间间隔
log.flush.interval.ms   = None
    例如:log.flush.interval.ms=1000
    表示每间隔1000毫秒flush一次数据到磁盘
   
仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制”fsync”的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
log.delete.delay.ms   =60000 文件在索引中清除后保留的时间一般不需要去修改
log.flush.offset.checkpoint.interval.ms   =60000 控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改
auto.create.topics.enable   =true 是否允许自动创建topic,若是false,就需要通过命令创建topic
default.replication.factor =1 是否允许自动创建topic,若是false,就需要通过命令创建topic
num.partitions   =1 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
以下是kafka中Leader,replicas配置参数  
controller.socket.timeout.ms   =30000 partition leader与replicas之间通讯时,socket的超时时间
controller.message.queue.size=10 partition leader与replicas数据同步时,消息的队列尺寸
replica.lag.time.max.ms   =10000 replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.max.messages   =4000 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
    ##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
    ##到其他follower中.##在broker数量较少,或者网络不足的环境中,建议提高此值.
   
    ##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
replica.socket.timeout.ms=30*1000 follower与leader之间的socket超时时间
replica.socket.receive.buffer.bytes=64*1024 leader复制时候的socket缓存大小
replica.fetch.max.bytes   =1024*1024 replicas每次获取数据的最大大小
replica.fetch.wait.max.ms   =500 replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.min.bytes   =1 fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
num.replica.fetchers=1 leader进行复制的线程数,增大这个数值会增加follower的IO
replica.high.watermark.checkpoint.interval.ms   =5000 每个replica检查是否将最高水位进行固化的频率
controlled.shutdown.enable   =false 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlled.shutdown.max.retries   =3 控制器关闭的尝试次数
controlled.shutdown.retry.backoff.ms   =5000 每次关闭尝试的时间间隔
leader.imbalance.per.broker.percentage   =10 leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.check.interval.seconds   =300 检查leader是否不平衡的时间间隔
offset.metadata.max.bytes 客户端保留offset信息的最大空间大小
kafka中zookeeper参数配置  
zookeeper.connect   = localhost:2181 zookeeper集群的地址,可以是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.session.timeout.ms=6000 ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.connection.timeout.ms   =6000 ZooKeeper的连接超时时间
zookeeper.sync.time.ms   =2000 ZooKeeper集群中leader和follower之间的同步实际那

参数

默认值

说明(解释)

broker.id =0

 

每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况

log.dirs=/data/kafka-logs

 

kafka数据的存放地址,多个地址的话用逗号分割/data/kafka-logs-1,/data/kafka-logs-2

port =9092

 

broker   server服务端口

message.max.bytes   =6525000

 

表示消息体的最大大小,单位是字节

num.network.threads   =4

 

broker处理消息的最大线程数,一般情况下不需要去修改

num.io.threads =8

 

broker处理磁盘IO的线程数,数值应该大于你的硬盘数

background.threads   =4

 

一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改

queued.max.requests   =500

 

等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。

host.name

 

broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置

socket.send.buffer.bytes=100*1024

 

socket的发送缓冲区,socket的调优参数SO_SNDBUFF

socket.receive.buffer.bytes   =100*1024

 

socket的接受缓冲区,socket的调优参数SO_RCVBUFF

socket.request.max.bytes   =100*1024*1024

 

socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖

log.segment.bytes   =1024*1024*1024

 

topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖

log.roll.hours =24*7

 

这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖

log.cleanup.policy   = delete

 

日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖

log.retention.minutes=3days

 

数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据

log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

log.retention.bytes=-1

 

topic每个分区的最大文件大小,一个topic的大小限制 =分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

log.retention.check.interval.ms=5minutes

 

文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略

log.cleaner.enable=false

 

是否开启日志压缩

log.cleaner.threads   = 2

 

日志压缩运行的线程数

log.cleaner.io.max.bytes.per.second=None

 

日志压缩时候处理的最大大小

log.cleaner.dedupe.buffer.size=500*1024*1024

 

日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好

log.cleaner.io.buffer.size=512*1024

 

日志清理时候用到的IO块大小一般不需要修改

log.cleaner.io.buffer.load.factor   =0.9

 

日志清理中hash表的扩大因子一般不需要修改

log.cleaner.backoff.ms   =15000

 

检查是否处罚日志清理的间隔

log.cleaner.min.cleanable.ratio=0.5

 

日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖

log.cleaner.delete.retention.ms   =1day

 

对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖

log.index.size.max.bytes   =10*1024*1024

 

对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖

log.index.interval.bytes   =4096

 

当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数

log.flush.interval.messages=None

 

log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性”的必要手段,所以此参数的设置,需要在数据可靠性与”性能”之间做必要的权衡.如果此值过大,将会导致每次”fsync”的时间较长(IO阻塞),如果此值过小,将会导致“fsync”的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.

log.flush.scheduler.interval.ms   =3000

 

检查是否需要固化到硬盘的时间间隔

log.flush.interval.ms   = None

 

仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制“fsync”的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.

log.delete.delay.ms   =60000

 

文件在索引中清除后保留的时间一般不需要去修改

log.flush.offset.checkpoint.interval.ms   =60000

 

控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改

auto.create.topics.enable   =true

 

是否允许自动创建topic,若是false,就需要通过命令创建topic

default.replication.factor =1

 

是否允许自动创建topic,若是false,就需要通过命令创建topic

num.partitions =1

 

每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖

     

以下是kafka中Leader,replicas配置参数

   

controller.socket.timeout.ms   =30000

 

partition leader与replicas之间通讯时,socket的超时时间

controller.message.queue.size=10

 

partition leader与replicas数据同步时,消息的队列尺寸

replica.lag.time.max.ms   =10000

 

replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中

replica.lag.max.messages   =4000

 

如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效

##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后

##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移

##到其他follower中.

##在broker数量较少,或者网络不足的环境中,建议提高此值.

replica.socket.timeout.ms=30*1000

 

follower与leader之间的socket超时时间

replica.socket.receive.buffer.bytes=64*1024

 

leader复制时候的socket缓存大小

replica.fetch.max.bytes   =1024*1024

 

replicas每次获取数据的最大大小

replica.fetch.wait.max.ms   =500

 

replicas同leader之间通信的最大等待时间,失败了会重试

replica.fetch.min.bytes   =1

 

fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件

num.replica.fetchers=1

 

leader进行复制的线程数,增大这个数值会增加follower的IO

replica.high.watermark.checkpoint.interval.ms   =5000

 

每个replica检查是否将最高水位进行固化的频率

controlled.shutdown.enable   =false

 

是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker

controlled.shutdown.max.retries   =3

 

控制器关闭的尝试次数

controlled.shutdown.retry.backoff.ms   =5000

 

每次关闭尝试的时间间隔

leader.imbalance.per.broker.percentage   =10

 

leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡

leader.imbalance.check.interval.seconds   =300

 

检查leader是否不平衡的时间间隔

offset.metadata.max.bytes

 

客户端保留offset信息的最大空间大小

kafka中zookeeper参数配置

   

zookeeper.connect   = localhost:2181

 

zookeeper集群的地址,可以是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3

zookeeper.session.timeout.ms=6000

 

ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大

zookeeper.connection.timeout.ms   =6000

 

ZooKeeper的连接超时时间

zookeeper.sync.time.ms   =2000

 

ZooKeeper集群中leader和follower之间的同步实际那

broker.id=  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port= #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0..1有个bug,DNS解析问题,失败率的问题。
num.network.threads= #这个是borker进行网络处理的线程数
num.io.threads= #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes= #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes= #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes= #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions= #默认的分区数,一个topic默认1个分区数
log.retention.hours= #默认消息的最大持久化时间,168小时,7天
message.max.byte= #消息保存的最大值5M
default.replication.factor= #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes= #取消息的最大直接数
log.segment.bytes= #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms= #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours= ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:,192.168.7.101:,192.168.7.107: #设置zookeeper的连接端口

=====================

==========================

zookeeper

#tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
#initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 *= 秒
#syncLimit:
这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*=10秒
#dataDir:
快照日志的存储路径
#dataLogDir:
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
#clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

==============一个人的集群配置==========

master

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id= #每个broker id############################# Socket Server Settings #############################listeners=PLAINTEXT://:9092# The port the socket server listens on
port=# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=master# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=master# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients># The number of threads handling network requests
num.network.threads=# The number of threads doing disk I/O
num.io.threads=# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=#cache的大小,存储這么多就开始发送# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=#1m# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=#请求信息的最大数,不能超过java堆栈的大小############################# Log Basics ############################## A comma seperated list of directories under which to store log files
log.dirs=/usr/local/kafka/logs#可<span style="font-family: Arial, Helvetica, sans-serif;">多个,中间</span><span style="font-family: Arial, Helvetica, sans-serif;">用逗号分开,新建的topic存储的时候是看那个更少就存那个</span># The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# . Durability: Unflushed data may be lost if you are not using replication.
# . Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# . Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion
log.retention.hours=#保存7天# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=#默认消息不可以超出的大小# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=#消息持久化文件的最大化大小。超过会新起一个# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=#多长时间检查一次连接情况############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=master:,worker1:,worker2:#zookeeper的接口
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=

work1

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=############################# Socket Server Settings #############################listeners=PLAINTEXT://:9092# The port the socket server listens on
port=# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=worker1# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=worker1# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients># The number of threads handling network requests
num.network.threads=# The number of threads doing disk I/O
num.io.threads=# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=############################# Log Basics ############################## A comma seperated list of directories under which to store log files
log.dirs=/usr/local/kafka/logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# . Durability: Unflushed data may be lost if you are not using replication.
# . Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# . Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion
log.retention.hours=# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=master:,worker1:,worker2:
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=

work2

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=############################# Socket Server Settings #############################listeners=PLAINTEXT://:9092# The port the socket server listens on
port=# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=worker2# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=worker2# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients># The number of threads handling network requests
num.network.threads=# The number of threads doing disk I/O
num.io.threads=# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=############################# Log Basics ############################## A comma seperated list of directories under which to store log files
log.dirs=/usr/local/kafka/logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# . Durability: Unflushed data may be lost if you are not using replication.
# . Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# . Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion
log.retention.hours=# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=master:,worker1:,worker2:
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=

==============

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,113
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,586
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,432
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,203
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,838
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,922