首页 技术 正文
技术 2022年11月21日
0 收藏 776 点赞 4,115 浏览 9820 个字

1. Kafka.scala

在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装

   1: val kafkaServerStartble = new KafkaServerStartable(serverConfig)

   2: kafkaServerStartble.startup

   1: package kafka.server

   2: class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {

   3:   private var server : KafkaServer = null

   4:  

   5:   private def init() {

   6:     server = new KafkaServer(serverConfig)

   7:   }

   8:  

   9:   def startup() {

  10:     try {

  11:       server.startup()

  12:     }

  13:     catch {...}

  14:   }

  15: }

2. KafkaServer

KafkaServer代表一个kafka broker, 这是kafka的核心.
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析吧

   1: package kafka.server

   2: /**

   3:  * Represents the lifecycle of a single Kafka broker. Handles all functionality required

   4:  * to start up and shutdown a single Kafka node.

   5:  */

   6: class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {

   7:   var socketServer: SocketServer = null

   8:   var requestHandlerPool: KafkaRequestHandlerPool = null

   9:   var logManager: LogManager = null

  10:   var kafkaHealthcheck: KafkaHealthcheck = null

  11:   var topicConfigManager: TopicConfigManager = null

  12:   var replicaManager: ReplicaManager = null

  13:   var apis: KafkaApis = null

  14:   var kafkaController: KafkaController = null

  15:   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)

  16:   var zkClient: ZkClient = null

  17:  

  18:   /**

  19:    * Start up API for bringing up a single instance of the Kafka server.

  20:    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers

  21:    */

  22:   def startup() {

  23:     /* start scheduler */

  24:     kafkaScheduler.startup()

  25:     

  26:     /* setup zookeeper */

  27:     zkClient = initZk()

  28:  

  29:     /* start log manager */

  30:     logManager = createLogManager(zkClient)

  31:     logManager.startup()

  32:  

  33:     socketServer = new SocketServer(config.brokerId,

  34:                                     config.hostName,

  35:                                     config.port,

  36:                                     config.numNetworkThreads,

  37:                                     config.queuedMaxRequests,

  38:                                     config.socketSendBufferBytes,

  39:                                     config.socketReceiveBufferBytes,

  40:                                     config.socketRequestMaxBytes)

  41:     socketServer.startup()

  42:  

  43:     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)

  44:     kafkaController = new KafkaController(config, zkClient)

  45:     

  46:     /* start processing requests */

  47:     apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)

  48:     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)

  49:    

  50:     replicaManager.startup()

  51:  

  52:     kafkaController.startup()

  53:     

  54:     topicConfigManager = new TopicConfigManager(zkClient, logManager)

  55:     topicConfigManager.startup()

  56:     

  57:     /* tell everyone we are alive */

  58:     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)

  59:     kafkaHealthcheck.startup()

  60:   }

2.1 KafkaScheduler

KafkaSchduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现

   1: package kafka.utils

   2:  

   3: /**

   4:  * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor

   5:  * 

   6:  * It has a pool of kafka-scheduler- threads that do the actual work.

   7:  * 

   8:  * @param threads The number of threads in the thread pool

   9:  * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.

  10:  * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.

  11:  */

  12: @threadsafe

  13: class KafkaScheduler(val threads: Int, 

  14:                      val threadNamePrefix: String = "kafka-scheduler-", 

  15:                      daemon: Boolean = true) extends Scheduler with Logging {

  16:   @volatile private var executor: ScheduledThreadPoolExecutor = null   

  17:   override def startup() {

  18:     this synchronized {

  19:       executor = new ScheduledThreadPoolExecutor(threads) //创建ScheduledThreadPoolExecutor

  20:       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)

  21:       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)

  22:       executor.setThreadFactory(new ThreadFactory() {

  23:                                   def newThread(runnable: Runnable): Thread = 

  24:                                     Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)

  25:                                 })

  26:     }

  27:   }

  28:  

  29: def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {

  30:   val runnable = new Runnable { //将fun封装成Runnable

  31:     def run() = {

  32:       try {

  33:         fun()

  34:       } catch {...} 

  35:       finally {...}

  36:     }

  37:   }

  38:   if(period >= 0) //在pool中进行delay schedule

  39:     executor.scheduleAtFixedRate(runnable, delay, period, unit)

  40:   else

  41:     executor.schedule(runnable, delay, unit)

  42: }

2.2 Zookeeper Client

由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信

2.3 logManager

The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
Apache Kafka源码分析 – Log Management

2.4 ReplicaManager

在0.8中新加入的replica相关模块

Apache Kafka Replication Design – High level
kafka Detailed Replication Design V3
Apache Kafka源码分析 – ReplicaManager

2.5 Kafka Socket Server

首先broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的

socketServer = new SocketServer(config.brokerId...)

KafkaApis
该类封装了所有request的处理逻辑

/**
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
val brokerId: Int,
val config: KafkaConfig,
val controller: KafkaController) extends Logging {
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try{
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
}

KafkaRequestHandler
基于线程池的KafkaRequestHandler

/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
def run() {
while(true) {
try {
val req = requestChannel.receiveRequest() //从socketChannel接受request
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
apis.handle(req) //使用kafkaApis来处理request
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
} def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
}class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging {
val threads = new Array[Thread](numThreads) //线程池
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
} def shutdown() {
info("shutting down")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
info("shut down completely")
}
}
 

2.6 offsetManager

offsetManager = createOffsetManager()
定期清除过期的offset数据,即compact操作,

scheduler.schedule(name = "offsets-cache-compactor",
fun = compact,
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)

以及consumer相关的一些offset操作,不细究了,因为我们不用highlevel consumer

2.7 KafkaController

kafkaController = new KafkaController(config, zkClient, brokerState)

Apache Kafka源码分析 – Controller

0.8后,为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性

2.8 TopicConfigManager


topicConfigManager = new TopicConfigManager(zkClient, logManager)

TopicConfigManager用于处理topic config的change,kafka除了全局的配置,还有一种叫Topic-level configuration

> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
--config max.message.bytes=128000

比如你可以这样设置,那么这些topic config如何生效的?

topic-level config默认是被存储在,

/brokers/topics/<topic_name>/config
但是topic很多的情况下,为了避免创建太多的watcher,

所以单独创建一个目录

/brokers/config_changes

来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化

/brokers/config_changes/config_change_13321

并且这个通知有个suffix,用于区别是否已处理过

/**
* Process the given list of config changes
*/
private def processConfigChanges(notifications: Seq[String]) {
if (notifications.size > 0) {
info("Processing config change notification(s)...")
val now = time.milliseconds
val logs = logManager.logsByTopicPartition.toBuffer
val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2))
for (notification <- notifications) {
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) { //未处理过
val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
if(jsonOpt.isDefined) {
val json = jsonOpt.get
val topic = json.substring(1, json.length - 1) // hacky way to dequote,从通知中获取topic name
if (logsByTopic.contains(topic)) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties(logManager.defaultConfig.toProps)
props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))
val logConfig = LogConfig.fromProps(props)
for (log <- logsByTopic(topic))
log.config = logConfig //真正的更新log配置
info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
purgeObsoleteNotifications(now, notifications) //删除过期的notification,10分钟
}
}
lastExecutedChange = changeId
}
}
}
}

这个failover也没问题,反正配置设置多次也是无害的,每次启动都会把所有没过期的notification处理一遍

并且broker重启后是会从zk中, loading完整的配置的,所以也ok的,这个主要用于实时更新topic的配置

2.8 KafkaHealthcheck

kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)

这个很简单,就像注释的,告诉所有人我还活着。。。

实现就是在,

 /brokers/[0...N] --> advertisedHost:advertisedPort

register一个ephemeral znode,当SessionExpired时,再去register,典型zk应用
所以只需要watch这个路径就是知道broker是否还活着

2.9 ContolledShutdown

对于0.8之前,broker的startup和shutdown都很简单,把上面这些组件初始化,或stop就可以了

但是0.8后,增加replica,所以broker不能自己直接shutdown,需要先通知controller,controller做完处理后,比如partition leader的迁移,或replica offline,然后才能shutdown

private def controlledShutdown()

挺长的,逻辑就是找到controller,发送ControlledShutdownRequest,然后等待返回,如果失败,就是unclean shutdown

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,076
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,552
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,400
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,176
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,812
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,894