回顾一下前面提到的发送消息的时序图,上一节说到了Kafka相关的元数据信息以及消息的封装,消息封装完成之后就开始将消息发送出去,这个任务由Sender线程来实现。
1. Sender线程
找到KafkaProducer这个对象,KafkaProducer的构造函数中有这样几行代码。
- this.accumulator=new
- RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
- this.totalMemorySize,
- this.compressionType,
- config.getLong(ProducerConfig.LINGER_MS_CONFIG),
- retryBackoffMs,
- metrics,
- time);
构造了RecordAccumulator对象,设置了该对象中每个消息批次的大小、缓冲区大小、压缩格式等等。
紧接着就构建了一个非常重要的组件NetworkClient,用作发送消息的载体。
- NetworkClientclient=newNetworkClient(
- newSelector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),this.metrics,time,"producer",channelBuilder),
- this.metadata,
- clientId,
- config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
- config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
- config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
- config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
- this.requestTimeoutMs,time);
对于构建的NetworkClient,有几个重要的参数要注意一下:
√ connections.max.idle.ms: 表示一个网络连接最多空闲多久,超过这个空闲时间,就关闭这个网络连接,默认是9分钟。
√ max.in.flight.requests.per.connection:表示每个网络连接可以容忍 producer端发送给broker 消息然后消息没有响应的个数,默认是5个。(ps:producer向broker发送数据的时候,其实是存在多个网络连接)
√ send.buffer.bytes:socket发送数据的缓冲区的大小,默认值是128K。
√ receive.buffer.bytes:socket接受数据的缓冲区的大小,默认值是32K。
构建好消息发送的网络通道直到启动Sender线程,用于发送消息。
- this.sender=newSender(client,
- this.metadata,
- this.accumulator,
- config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)==1,
- config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
- (short)parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
- config.getInt(ProducerConfig.RETRIES_CONFIG),
- this.metrics,
- newSystemTime(),
- clientId,
- this.requestTimeoutMs);
- //默认的线程名前缀为kafka-producer-network-thread,其中clientId是生产者的id
- StringioThreadName="kafka-producer-network-thread"+(clientId.length()>0?"|"+clientId:"");
- //创建了一个守护线程,将Sender对象传进去。
- this.ioThread=newKafkaThread(ioThreadName,this.sender,true);
- //启动线程
- this.ioThread.start();
看到这里就非常明确了,既然是线程,那么肯定有run()方法,我们重点关注该方法中的实现逻辑,在这里要补充一点值得我们借鉴的线程使用模式,可以看到在创建sender线程后,并没有立即启动sender线程,而且新创建了KafkaThread线程,将sender对象给传进去了,然后再启动KafkaThread线程,相信有不少小伙伴会有疑惑,我们进去KafkaThread这个类看一下其中的内容。
- /**
- *AwrapperforThreadthatsetsthingsupnicely
- */
- publicclassKafkaThreadextendsThread{
- privatefinalLoggerlog=LoggerFactory.getLogger(getClass());
- publicKafkaThread(finalStringname,Runnablerunnable,booleandaemon){
- super(runnable,name);
- //设置为后台守护线程
- setDaemon(daemon);
- setUncaughtExceptionHandler(newThread.UncaughtExceptionHandler(){
- publicvoiduncaughtException(Threadt,Throwablee){
- log.error("Uncaughtexceptionin"+name+":",e);
- }
- });
- }
- }
发现KafkaThread线程其实只是启动了一个守护线程,那么这样做的好处是什么呢?答案是可以将业务代码和线程本身解耦,复杂的业务逻辑可以在KafkaThread这样的线程中去实现,这样在代码层面上Sender线程就非常的简洁,可读性也比较高。
先看一下Sender这个对象的构造。
/**
* 主要的功能就是用处理向 Kafka 集群发送生产请求的后台线程,更新元数据信息,以及将消息发送到合适的节点
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
- publicclassSenderimplementsRunnable{
- privatestaticfinalLoggerlog=LoggerFactory.getLogger(Sender.class);
- //kafka网络通信客户端,主要用于与broker的网络通信
- privatefinalKafkaClientclient;
- //消息累加器,包含了批量的消息记录
- privatefinalRecordAccumulatoraccumulator;
- //客户端元数据信息
- privatefinalMetadatametadata;
- /*theflagindicatingwhethertheproducershouldguaranteethemessageorderonthebrokerornot.*/
- //保证消息的顺序性的标记
- privatefinalbooleanguaranteeMessageOrder;
- /*themaximumrequestsizetoattempttosendtotheserver*/
- //对应的配置是max.request.size,代表调用send()方法发送的最大请求大小
- privatefinalintmaxRequestSize;
- /*thenumberofacknowledgementstorequestfromtheserver*/
- //用于保证消息发送状态,分别有-1,0,1三种选项
- privatefinalshortacks;
- /*thenumberoftimestoretryafailedrequestbeforegivingup*/
- //请求失败重试的次数
- privatefinalintretries;
- /*theclockinstanceusedforgettingthetime*/
- //时间工具,计算时间,没有特殊含义
- privatefinalTimetime;
- /*truewhilethesenderthreadisstillrunning*/
- //表示线程状态,true则表示running
- privatevolatilebooleanrunning;
- /*truewhenthecallerwantstoignoreallunsent/inflightmessagesandforceclose.*/
- //强制关闭消息发送的标识,一旦设置为true,则不管消息有没有发送成功都会忽略
- privatevolatilebooleanforceClose;
- /*metrics*/
- //发送指标收集
- privatefinalSenderMetricssensors;
- /*paramclientIdoftheclient*/
- //生产者客户端id
- privateStringclientId;
- /*themaxtimetowaitfortheservertorespondtotherequest*/
- //请求超时时间
- privatefinalintrequestTimeout;
- //构造器
- publicSender(KafkaClientclient,
- Metadatametadata,
- RecordAccumulatoraccumulator,
- booleanguaranteeMessageOrder,
- intmaxRequestSize,
- shortacks,
- intretries,
- Metricsmetrics,
- Timetime,
- StringclientId,
- intrequestTimeout){
- this.client=client;
- this.accumulator=accumulator;
- this.metadata=metadata;
- this.guaranteeMessageOrder=guaranteeMessageOrder;
- this.maxRequestSize=maxRequestSize;
- this.running=true;
- this.acks=acks;
- this.retries=retries;
- this.time=time;
- this.clientId=clientId;
- this.sensors=newSenderMetrics(metrics);
- this.requestTimeout=requestTimeout;
- }
- ….
- }
大概了解完Sender对象的初始化参数之后,开始步入正题,找到Sender对象中的run()方法。
- publicvoidrun(){
- log.debug("StartingKafkaproducerI/Othread.");
- //sender线程启动起来了以后就是处于一直运行的状态
- while(running){
- try{
- //核心代码
- run(time.milliseconds());
- }catch(Exceptione){
- log.error("UncaughterrorinkafkaproducerI/Othread:",e);
- }
- }
- log.debug("BeginningshutdownofKafkaproducerI/Othread,sendingremainingrecords.");
- //okaywestoppedacceptingrequestsbuttheremaystillbe
- //requestsintheaccumulatororwaitingforacknowledgment,
- //waituntilthesearecompleted.
- while(!forceClose&&(this.accumulator.hasUnsent()||this.client.inFlightRequestCount()>0)){
- try{
- run(time.milliseconds());
- }catch(Exceptione){
- log.error("UncaughterrorinkafkaproducerI/Othread:",e);
- }
- }
- if(forceClose){
- //Weneedtofailalltheincompletebatchesandwakeupthethreadswaitingon
- //thefutures.
- this.accumulator.abortIncompleteBatches();
- }
- try{
- this.client.close();
- }catch(Exceptione){
- log.error("Failedtoclosenetworkclient",e);
- }
- log.debug("ShutdownofKafkaproducerI/Othreadhascompleted.");
- }
以上的run()方法中,出现了两个while判断,本意都是为了保持线程的不间断运行,将消息发送到broker,两处都调用了另外的一个带时间参数的run(xx)重载方法,第一个run(ts)方法是为了将消息缓存区中的消息发送给broker,第二个run(ts)方法会先判断线程是否强制关闭,如果没有强制关闭,则会将消息缓存区中未发送出去的消息发送完毕,然后才退出线程。
/**
* Run a single iteration of sending
*
* @param now
* The current POSIX time in milliseconds
*/
- voidrun(longnow){
- //第一步,获取元数据
- Clustercluster=metadata.fetch();
- //getthelistofpartitionswithdatareadytosend
- //第二步,判断哪些partition满足发送条件
- RecordAccumulator.ReadyCheckResultresult=this.accumulator.ready(cluster,now);
- /**
- *第三步,标识还没有拉取到元数据的topic
- */
- if(!result.unknownLeaderTopics.isEmpty()){
- //Thesetoftopicswithunknownleadercontainstopicswithleaderelectionpendingaswellas
- //topicswhichmayhaveexpired.Addthetopicagaintometadatatoensureitisincluded
- //andrequestmetadataupdate,sincetherearemessagestosendtothetopic.
- for(Stringtopic:result.unknownLeaderTopics)
- this.metadata.add(topic);
- this.metadata.requestUpdate();
- }
- //removeanynodeswearen'treadytosendto
- Iterator<Node>iter=result.readyNodes.iterator();
- longnotReadyTimeout=Long.MAX_VALUE;
- while(iter.hasNext()){
- Nodenode=iter.next();
- /**
- *第四步,检查与要发送数据的主机的网络是否已经建立好。
- */
- //如果返回的是false
- if(!this.client.ready(node,now)){
- //移除result里面要发送消息的主机。
- //所以我们会看到这儿所有的主机都会被移除
- iter.remove();
- notReadyTimeout=Math.min(notReadyTimeout,this.client.connectionDelay(node,now));
- }
- }
/**
* 第五步,有可能我们要发送的partition有很多个,这种情况下,有可能会存在这样的情况
* 部分partition的leader partition分布在同一台服务器上面。
*
*
*/
- Map<Integer,List<RecordBatch>>batches=this.accumulator.drain(cluster,
- result.readyNodes,
- this.maxRequestSize,
- now);
- if(guaranteeMessageOrder){
- //Muteallthepartitionsdrained
- //如果batches空的话,跳过不执行。
- for(List<RecordBatch>batchList:batches.values()){
- for(RecordBatchbatch:batchList)
- this.accumulator.mutePartition(batch.topicPartition);
- }
- }
/**
* 第六步,处理超时的批次
*
*/
- List<RecordBatch>expiredBatches=this.accumulator.abortExpiredBatches(this.requestTimeout,now);
- //updatesensors
- for(RecordBatchexpiredBatch:expiredBatches)
- this.sensors.recordErrors(expiredBatch.topicPartition.topic(),expiredBatch.recordCount);
- sensors.updateProduceRequestMetrics(batches);
- /**
* 第七步,创建发送消息的请求,以批的形式发送,可以减少网络传输成本,提高吞吐
*/
- List<ClientRequest>requests=createProduceRequests(batches,now);
- //Ifwehaveanynodesthatarereadytosend+havesendabledata,pollwith0timeoutsothiscanimmediately
- //loopandtrysendingmoredata.Otherwise,thetimeoutisdeterminedbynodesthathavepartitionswithdata
- //thatisn'tyetsendable(e.g.lingering,backingoff).Notethatthisspecificallydoesnotincludenodes
- //withsendabledatathataren'treadytosendsincetheywouldcausebusylooping.
- longpollTimeout=Math.min(result.nextReadyCheckDelayMs,notReadyTimeout);
- if(result.readyNodes.size()>0){
- log.trace("Nodeswithdatareadytosend:{}",result.readyNodes);
- log.trace("Created{}producerequests:{}",requests.size(),requests);
- pollTimeout=0;
- }
- //发送请求的操作
- for(ClientRequestrequest:requests)
- //绑定op_write
- client.send(request,now);
- //ifsomepartitionsarealreadyreadytobesent,theselecttimewouldbe0;
- //otherwiseifsomepartitionalreadyhassomedataaccumulatedbutnotreadyyet,
- //theselecttimewillbethetimedifferencebetweennowanditslingerexpirytime;
- //otherwisetheselecttimewillbethetimedifferencebetweennowandthemetadataexpirytime;
- /**
* 第八步,真正执行网络操作的都是这个NetWordClient这个组件
* 包括:发送请求,接受响应(处理响应)
- this.client.poll(pollTimeout,now);
以上的run(long)方法执行过程总结为下面几个步骤:
1. 获取集群元数据信息
2. 调用RecordAccumulator的ready()方法,判断当前时间戳哪些partition是可以进行发送,以及获取partition 的leader partition的元数据信息,得知哪些节点是可以接收消息的
3. 标记还没有拉到元数据的topic,如果缓存中存在标识为unknownLeaderTopics的topic信息,则将这些topic添加到metadata中,然后调用metadata的requestUpdate()方法,请求更新元数据
4. 将不需要接收消息的节点从按步骤而返回的结果中删除,只对准备接收消息的节点readyNode进行遍历,检查与要发送的节点的网络是否已经建立好,不符合发送条件的节点都会从readyNode中移除掉
5. 针对以上建立好网络连接的节点集合,调用RecordAccumulator的drain()方法,得到等待发送的消息批次集合
6. 处理超时发送的消息,调用RecordAccumulator的addExpiredBatches()方法,循环遍历RecordBatch,判断其中的消息是否超时,如果超时则从队列中移除,释放资源空间
7. 创建发送消息的请求,调用createProducerRequest方法,将消息批次封装成ClientRequest对象,因为批次通常是多个的,所以返回一个List集合
8. 调用NetworkClient的send()方法,绑定KafkaChannel的op_write操作
9. 调用NetworkClient的poll()方法拉取元数据信息,建立连接,执行网络请求,接收响应,完成消息发送
以上就是Sender线程对消息以及集群元数据所发生的核心过程。其中就涉及到了另外一个核心组件NetworkClient。
2. NetworkClient
NetworkClient是消息发送的介质,不管是生产者发送消息,还是消费者接收消息,都需要依赖于NetworkClient建立网络连接。同样的,我们先了解NetworkClient的组成部分,主要涉及NIO的一些知识,有兴趣的童鞋可以看看NIO的原理和组成。
- /**
- *Anetworkclientforasynchronousrequest/responsenetworki/o.Thisisaninternalclassusedtoimplementthe
- *user-facingproducerandconsumerclients.
- *<p>
- *Thisclassisnotthread-safe!
- */
- publicclassNetworkClientimplementsKafkaClient
- {
- privatestaticfinalLoggerlog=LoggerFactory.getLogger(NetworkClient.class);
- /*theselectorusedtoperformnetworki/o*/
- //javaNIOSelector
- privatefinalSelectableselector;
- privatefinalMetadataUpdatermetadataUpdater;
- privatefinalRandomrandOffset;
- /*thestateofeachnode'sconnection*/
- privatefinalClusterConnectionStatesconnectionStates;
- /*thesetofrequestscurrentlybeingsentorawaitingaresponse*/
- privatefinalInFlightRequestsinFlightRequests;
- /*thesocketsendbuffersizeinbytes*/
- privatefinalintsocketSendBuffer;
- /*thesocketreceivesizebufferinbytes*/
- privatefinalintsocketReceiveBuffer;
- /*theclientidusedtoidentifythisclientinrequeststotheserver*/
- privatefinalStringclientId;
- /*thecurrentcorrelationidtousewhensendingrequeststoservers*/
- privateintcorrelation;
- /*maxtimeinmsfortheproducertowaitforacknowledgementfromserver*/
- privatefinalintrequestTimeoutMs;
- privatefinalTimetime;
- ……
- }
可以看到NetworkClient实现了KafkaClient接口,包括了几个核心类Selectable、MetadataUpdater、ClusterConnectionStates、InFlightRequests。
2.1 Selectable
其中Selectable是实现异步非阻塞网络IO的接口,通过类的注释可以知道Selectable可以使用单个线程来管理多个网络连接,包括读、写、连接等操作,这个和NIO是一致的。
我们先看看Selectable的实现类Selector,是org.apache.kafka.common.network包下的,源码内容比较多,挑相对比较重要的看。
- publicclassSelectorimplementsSelectable{
- publicstaticfinallongNO_IDLE_TIMEOUT_MS=-1;
- privatestaticfinalLoggerlog=LoggerFactory.getLogger(Selector.class);
- //这个对象就是javaNIO里面的Selector
- //Selector是负责网络的建立,发送网络请求,处理实际的网络IO。
- //可以算是最核心的一个组件。
- privatefinaljava.nio.channels.SelectornioSelector;
- //broker和KafkaChannel(SocketChnnel)的映射
- //这儿的kafkaChannel大家暂时可以理解为就是SocketChannel
- //维护NodeId和KafkaChannel的映射关系
- privatefinalMap<String,KafkaChannel>channels;
- //记录已经完成发送的请求
- privatefinalList<Send>completedSends;
- //记录已经接收到的,并且处理完了的响应。
- privatefinalList<NetworkReceive>completedReceives;
- //已经接收到了,但是还没来得及处理的响应。
- //一个连接,对应一个响应队列
- privatefinalMap<KafkaChannel,Deque<NetworkReceive>>stagedReceives;
- privatefinalSet<SelectionKey>immediatelyConnectedKeys;
- //没有建立连接或者或者端口连接的主机
- privatefinalList<String>disconnected;
- //完成建立连接的主机
- privatefinalList<String>connected;
- //建立连接失败的主机。
- privatefinalList<String>failedSends;
- privatefinalTimetime;
- privatefinalSelectorMetricssensors;
- privatefinalStringmetricGrpPrefix;
- privatefinalMap<String,String>metricTags;
- //用于创建KafkaChannel的Builder
- privatefinalChannelBuilderchannelBuilder;
- privatefinalintmaxReceiveSize;
- privatefinalbooleanmetricsPerConnection;
- privatefinalIdleExpiryManageridleExpiryManager;
发起网络请求的第一步是连接、注册事件、发送、消息处理,涉及几个核心方法
1. 连接connect()方法
- /**
- *BeginconnectingtothegivenaddressandaddtheconnectiontothisnioSelectorassociatedwiththegivenid
- *number.
- *<p>
- *Notethatthiscallonlyinitiatestheconnection,whichwillbecompletedonafuture{@link#poll(long)}
- *call.Check{@link#connected()}toseewhich(ifany)connectionshavecompletedafteragivenpollcall.
- *@paramidTheidforthenewconnection
- *@paramaddressTheaddresstoconnectto
- *@paramsendBufferSizeThesendbufferforthenewconnection
- *@paramreceiveBufferSizeThereceivebufferforthenewconnection
- *@throwsIllegalStateExceptionifthereisalreadyaconnectionforthatid
- *@throwsIOExceptionifDNSresolutionfailsonthehostnameorifthebrokerisdown
- */
- @Override
- publicvoidconnect(Stringid,InetSocketAddressaddress,intsendBufferSize,intreceiveBufferSize)throwsIOException{
- if(this.channels.containsKey(id))
- thrownewIllegalStateException("Thereisalreadyaconnectionforid"+id);
- //获取到SocketChannel
- SocketChannelsocketChannel=SocketChannel.open();
- //设置为非阻塞的模式
- socketChannel.configureBlocking(false);
- Socketsocket=socketChannel.socket();
- socket.setKeepAlive(true);
- //设置网络参数,如发送和接收的buffer大小
- if(sendBufferSize!=Selectable.USE_DEFAULT_BUFFER_SIZE)
- socket.setSendBufferSize(sendBufferSize);
- if(receiveBufferSize!=Selectable.USE_DEFAULT_BUFFER_SIZE)
- socket.setReceiveBufferSize(receiveBufferSize);
- //这个的默认值是false,代表要开启Nagle的算法
- //它会把网络中的一些小的数据包收集起来,组合成一个大的数据包,再进行发送
- //因为它认为如果网络中有大量的小的数据包在传输则会影响传输效率
- socket.setTcpNoDelay(true);
- booleanconnected;
- try{
- //尝试去服务器去连接,因为这儿非阻塞的
- //有可能就立马连接成功,如果成功了就返回true
- //也有可能需要很久才能连接成功,返回false。
- connected=socketChannel.connect(address);
- }catch(UnresolvedAddressExceptione){
- socketChannel.close();
- thrownewIOException("Can'tresolveaddress:"+address,e);
- }catch(IOExceptione){
- socketChannel.close();
- throwe;
- }
- //SocketChannel往Selector上注册了一个OP_CONNECT
- SelectionKeykey=socketChannel.register(nioSelector,SelectionKey.OP_CONNECT);
- //根据SocketChannel封装出一个KafkaChannel
- KafkaChannelchannel=channelBuilder.buildChannel(id,key,maxReceiveSize);
- //把key和KafkaChannel关联起来
- //我们可以根据key就找到KafkaChannel
- //也可以根据KafkaChannel找到key
- key.attach(channel);
- //缓存起来
- this.channels.put(id,channel);
- //如果连接上了
- if(connected){
- //OP_CONNECTwon'ttriggerforimmediatelyconnectedchannels
- log.debug("Immediatelyconnectedtonode{}",channel.id());
- immediatelyConnectedKeys.add(key);
- //取消前面注册OP_CONNECT事件。
- key.interestOps(0);
- }
- }
2. 注册register()
- /**
- *RegisterthenioSelectorwithanexistingchannel
- *Usethisonserver-side,whenaconnectionisacceptedbyadifferentthreadbutprocessedbytheSelector
- *Notethatwearenotcheckingiftheconnectionidisvalid-sincetheconnectionalreadyexists
- */
- publicvoidregister(Stringid,SocketChannelsocketChannel)throwsClosedChannelException{
- //往自己的Selector上面注册OP_READ事件
- //这样的话,Processor线程就可以读取客户端发送过来的连接。
- SelectionKeykey=socketChannel.register(nioSelector,SelectionKey.OP_READ);
- //kafka里面对SocketChannel封装了一个KakaChannel
- KafkaChannelchannel=channelBuilder.buildChannel(id,key,maxReceiveSize);
- //key和channel
- key.attach(channel);
- //所以我们服务端这儿代码跟我们客户端的网络部分的代码是复用的
- //channels里面维护了多个网络连接。
- this.channels.put(id,channel);
- }
3. 发送send()
- /**
- *Queuethegivenrequestforsendinginthesubsequent{@link#poll(long)}calls
- *@paramsendTherequesttosend
- */
- publicvoidsend(Sendsend){
- //获取到一个KafakChannel
- KafkaChannelchannel=channelOrFail(send.destination());
- try{
- //重要方法
- channel.setSend(send);
- }catch(CancelledKeyExceptione){
- this.failedSends.add(send.destination());
- close(channel);
- }
- }
4. 消息处理poll()
- @Override
- publicvoidpoll(longtimeout)throwsIOException{
- if(timeout<0)
- thrownewIllegalArgumentException("timeoutshouldbe>=0");
- //将上一次poll()方法返回的结果清空
- clear();
- if(hasStagedReceives()||!immediatelyConnectedKeys.isEmpty())
- timeout=0;
- /*checkreadykeys*/
- longstartSelect=time.nanoseconds();
- //从Selector上找到有多少个key注册了,等待I/O事件发生
- intreadyKeys=select(timeout);
- longendSelect=time.nanoseconds();
- this.sensors.selectTime.record(endSelect-startSelect,time.milliseconds());
- //上面刚刚确实是注册了一个key
- if(readyKeys>0||!immediatelyConnectedKeys.isEmpty()){
- //处理I/O事件,对这个Selector上面的key要进行处理
- pollSelectionKeys(this.nioSelector.selectedKeys(),false,endSelect);
- pollSelectionKeys(immediatelyConnectedKeys,true,endSelect);
- }
- //对stagedReceives里面的数据要进行处理
- addToCompletedReceives();
- longendIo=time.nanoseconds();
- this.sensors.ioTime.record(endIo-endSelect,time.milliseconds());
- //weusethetimeattheendofselecttoensurethatwedon'tcloseanyconnectionsthat
- //havejustbeenprocessedinpollSelectionKeys
- //完成处理后,关闭长链接
- maybeCloseOldestConnection(endSelect);
- }
5. 处理Selector上面的key
- //用来处理OP_CONNECT,OP_READ,OP_WRITE事件,同时负责检测连接状态
- privatevoidpollSelectionKeys(Iterable<SelectionKey>selectionKeys,
- booleanisImmediatelyConnected,
- longcurrentTimeNanos){
- //获取到所有key
- Iterator<SelectionKey>iterator=selectionKeys.iterator();
- //遍历所有的key
- while(iterator.hasNext()){
- SelectionKeykey=iterator.next();
- iterator.remove();
- //根据key找到对应的KafkaChannel
- KafkaChannelchannel=channel(key);
- //registerallper-connectionmetricsatonce
- sensors.maybeRegisterConnectionMetrics(channel.id());
- if(idleExpiryManager!=null)
- idleExpiryManager.update(channel.id(),currentTimeNanos);
- try{
- /*completeanyconnectionsthathavefinishedtheirhandshake(eithernormallyorimmediately)*/
- //处理完成连接和OP_CONNECT的事件
- if(isImmediatelyConnected||key.isConnectable()){
- //完成网络的连接。
- if(channel.finishConnect()){
- //网络连接已经完成了以后,就把这个channel添加到已连接的集合中
- this.connected.add(channel.id());
- this.sensors.connectionCreated.record();
- SocketChannelsocketChannel=(SocketChannel)key.channel();
- log.debug("CreatedsocketwithSO_RCVBUF={},SO_SNDBUF={},SO_TIMEOUT={}tonode{}",
- socketChannel.socket().getReceiveBufferSize(),
- socketChannel.socket().getSendBufferSize(),
- socketChannel.socket().getSoTimeout(),
- channel.id());
- }else
- continue;
- }
- /*ifchannelisnotreadyfinishprepare*/
- //身份认证
- if(channel.isConnected()&&!channel.ready())
- channel.prepare();
- /*ifchannelisreadyreadfromanyconnectionsthathavereadabledata*/
- if(channel.ready()&&key.isReadable()&&!hasStagedReceive(channel)){
- NetworkReceivenetworkReceive;
- //处理OP_READ事件,接受服务端发送回来的响应(请求)
- //networkReceive代表的就是一个服务端发送回来的响应
- while((networkReceive=channel.read())!=null)
- addToStagedReceives(channel,networkReceive);
- }
- /*ifchannelisreadywritetoanysocketsthathavespaceintheirbufferandforwhichwehavedata*/
- //处理OP_WRITE事件
- if(channel.ready()&&key.isWritable()){
- //获取到要发送的那个网络请求,往服务端发送数据
- //如果消息被发送出去了,就会移除OP_WRITE
- Sendsend=channel.write();
- //已经完成响应消息的发送
- if(send!=null){
- this.completedSends.add(send);
- this.sensors.recordBytesSent(channel.id(),send.size());
- }
- }
- /*cancelanydefunctsockets*/
- if(!key.isValid()){
- close(channel);
- this.disconnected.add(channel.id());
- }
- }catch(Exceptione){
- Stringdesc=channel.socketDescription();
- if(einstanceofIOException)
- log.debug("Connectionwith{}disconnected",desc,e);
- else
- log.warn("Unexpectederrorfrom{};closingconnection",desc,e);
- close(channel);
- //添加到连接失败的集合中
- this.disconnected.add(channel.id());
- }
- }
- }
2.2 MetadataUpdater
是NetworkClient用于请求更新集群元数据信息并检索集群节点的接口,是一个非线程安全的内部类,有两个实现类,分别是DefaultMetadataUpdater和ManualMetadataUpdater,NetworkClient用到的是DefaultMetadataUpdater类,是NetworkClient的默认实现类,同时是NetworkClient的内部类,从源码可以看到,如下。
- if(metadataUpdater==null){
- if(metadata==null)
- thrownewIllegalArgumentException("`metadata`mustnotbenull");
- this.metadataUpdater=newDefaultMetadataUpdater(metadata);
- }else{
- this.metadataUpdater=metadataUpdater;
- }
- ……
- classDefaultMetadataUpdaterimplementsMetadataUpdater
- {
- /*thecurrentclustermetadata*/
- //集群元数据对象
- privatefinalMetadatametadata;
- /*trueifthereisametadatarequestthathasbeensentandforwhichwehavenotyetreceivedaresponse*/
- //用来标识是否已经发送过MetadataRequest,若是已发送,则无需重复发送
- privatebooleanmetadataFetchInProgress;
- /*thelasttimestampwhennobrokernodeisavailabletoconnect*/
- //记录没有发现可用节点的时间戳
- privatelonglastNoNodeAvailableMs;
- DefaultMetadataUpdater(Metadatametadata){
- this.metadata=metadata;
- this.metadataFetchInProgress=false;
- this.lastNoNodeAvailableMs=0;
- }
- //返回集群节点集合
- @Override
- publicList<Node>fetchNodes(){
- returnmetadata.fetch().nodes();
- }
- @Override
- publicbooleanisUpdateDue(longnow){
- return!this.metadataFetchInProgress&&this.metadata.timeToNextUpdate(now)==0;
- }
- //核心方法,判断当前集群保存的元数据是否需要更新,如果需要更新则发送MetadataRequest请求
- @Override
- publiclongmaybeUpdate(longnow){
- //shouldweupdateourmetadata?
- //获取下次更新元数据的时间戳
- longtimeToNextMetadataUpdate=metadata.timeToNextUpdate(now);
- //获取下次重试连接服务端的时间戳
- longtimeToNextReconnectAttempt=Math.max(this.lastNoNodeAvailableMs+metadata.refreshBackoff()-now,0);
- //检测是否已经发送过MetadataRequest请求
- longwaitForMetadataFetch=this.metadataFetchInProgress?Integer.MAX_VALUE:0;
- //ifthereisnonodeavailabletoconnect,backoffrefreshingmetadata
- longmetadataTimeout=Math.max(Math.max(timeToNextMetadataUpdate,timeToNextReconnectAttempt),
- waitForMetadataFetch);
- if(metadataTimeout==0){
- //Bewarethatthebehaviorofthismethodandthecomputationoftimeoutsforpoll()are
- //highlydependentonthebehaviorofleastLoadedNode.
- //找到负载最小的节点
- Nodenode=leastLoadedNode(now);
- //创建MetadataRequest请求,待触发poll()方法执行真正的发送操作。
- maybeUpdate(now,node);
- }
- returnmetadataTimeout;
- }
- //处理没有建立好连接的请求
- @Override
- publicbooleanmaybeHandleDisconnection(ClientRequestrequest){
- ApiKeysrequestKey=ApiKeys.forId(request.request().header().apiKey());
- if(requestKey==ApiKeys.METADATA&&request.isInitiatedByNetworkClient()){
- Clustercluster=metadata.fetch();
- if(cluster.isBootstrapConfigured()){
- intnodeId=Integer.parseInt(request.request().destination());
- Nodenode=cluster.nodeById(nodeId);
- if(node!=null)
- log.warn("Bootstrapbroker{}:{}disconnected",node.host(),node.port());
- }
- metadataFetchInProgress=false;
- returntrue;
- }
- returnfalse;
- }
- //解析响应信息
- @Override
- publicbooleanmaybeHandleCompletedReceive(ClientRequestreq,longnow,Structbody){
- shortapiKey=req.request().header().apiKey();
- //检查是否为MetadataRequest请求
- if(apiKey==ApiKeys.METADATA.id&&req.isInitiatedByNetworkClient()){
- //处理响应
- handleResponse(req.request().header(),body,now);
- returntrue;
- }
- returnfalse;
- }
- @Override
- publicvoidrequestUpdate(){
- this.metadata.requestUpdate();
- }
- //处理MetadataRequest请求响应
- privatevoidhandleResponse(RequestHeaderheader,Structbody,longnow){
- this.metadataFetchInProgress=false;
- //因为服务端发送回来的是一个二进制的数据结构
- //所以生产者这儿要对这个数据结构要进行解析
- //解析完了以后就封装成一个MetadataResponse对象。
- MetadataResponseresponse=newMetadataResponse(body);
- //响应里面会带回来元数据的信息
- //获取到了从服务端拉取的集群的元数据信息。
- Clustercluster=response.cluster();
- //checkifanytopicsmetadatafailedtogetupdated
- Map<String,Errors>errors=response.errors();
- if(!errors.isEmpty())
- log.warn("Errorwhilefetchingmetadatawithcorrelationid{}:{}",header.correlationId(),errors);
- //don'tupdatetheclusteriftherearenovalidnodes…thetopicwewantmaystillbeintheprocessofbeing
- //createdwhichmeanswewillgeterrorsandnonodesuntilitexists
- //如果正常获取到了元数据的信息
- if(cluster.nodes().size()>0){
- //更新元数据信息。
- this.metadata.update(cluster,now);
- }else{
- log.trace("Ignoringemptymetadataresponsewithcorrelationid{}.",header.correlationId());
- this.metadata.failedUpdate(now);
- }
- }
- /**
- *Createametadatarequestforthegiventopics
- */
- privateClientRequestrequest(longnow,Stringnode,MetadataRequestmetadata){
- RequestSendsend=newRequestSend(node,nextRequestHeader(ApiKeys.METADATA),metadata.toStruct());
- returnnewClientRequest(now,true,send,null,true);
- }
- /**
- *Addametadatarequesttothelistofsendsifwecanmakeone
- */
- privatevoidmaybeUpdate(longnow,Nodenode){
- //检测node是否可用
- if(node==null){
- log.debug("Giveupsendingmetadatarequestsincenonodeisavailable");
- //markthetimestampfornonodeavailabletoconnect
- this.lastNoNodeAvailableMs=now;
- return;
- }
- StringnodeConnectionId=node.idString();
- //判断网络连接是否应建立好,是否可用向该节点发送请求
- if(canSendRequest(nodeConnectionId)){
- this.metadataFetchInProgress=true;
- MetadataRequestmetadataRequest;
- //指定需要更新元数据的topic
- if(metadata.needMetadataForAllTopics())
- //封装请求(获取所有topics)的元数据信息的请求
- metadataRequest=MetadataRequest.allTopics();
- else
- //我们默认走的这儿的这个方法
- //就是拉取我们发送消息的对应的topic的方法
- metadataRequest=newMetadataRequest(newArrayList<>(metadata.topics()));
- //这儿就给我们创建了一个请求(拉取元数据的)
- ClientRequestclientRequest=request(now,nodeConnectionId,metadataRequest);
- log.debug("Sendingmetadatarequest{}tonode{}",metadataRequest,node.id());
- //缓存请求,待下次触发poll()方法执行发送操作
- doSend(clientRequest,now);
- }elseif(connectionStates.canConnect(nodeConnectionId,now)){
- //wedon'thaveaconnectiontothisnoderightnow,makeone
- log.debug("Initializeconnectiontonode{}forsendingmetadatarequest",node.id());
- //初始化连接
- initiateConnect(node,now);
- //IfinitiateConnectfailedimmediately,thisnodewillbeputintoblackoutandwe
- //shouldallowimmediatelyretryingincasethereisanothercandidatenode.Ifit
- //isstillconnecting,theworstcaseisthatweendupsettingalongertimeout
- //onthenextroundandthenwaitfortheresponse.
- }else{//connected,butcan'tsendmoreORconnecting
- //Ineithercase,wejustneedtowaitforanetworkeventtoletusknowtheselected
- //connectionmightbeusableagain.
- this.lastNoNodeAvailableMs=now;
- }
- }
- }
- 将ClientRequest请求缓存到InFlightRequest缓存队列中。
- privatevoiddoSend(ClientRequestrequest,longnow){
- request.setSendTimeMs(now);
- //这儿往inFlightRequests缓存队列里面还没有收到响应的请求,默认最多能存5个请求
- this.inFlightRequests.add(request);
- //然后不断调用Selector的send()方法
- selector.send(request.request());
- }
2.3 InFlightRequests
这个类是一个请求队列,用于缓存已经发送出去但是没有收到响应的ClientRequest,提供了许多管理缓存队列的方法,支持通过配置参数控制ClientRequest的数量,通过源码可以看到其底层数据结构Map。
- /**
- *Thesetofrequestswhichhavebeensentorarebeingsentbuthaven'tyetreceivedaresponse
- */
- finalclassInFlightRequests{
- privatefinalintmaxInFlightRequestsPerConnection;
- privatefinalMap<String,Deque<ClientRequest>>requests=newHashMap<>();
- publicInFlightRequests(intmaxInFlightRequestsPerConnection){
- this.maxInFlightRequestsPerConnection=maxInFlightRequestsPerConnection;
- }
- ……
- }
除了包含了很多关于处理队列的方法之外,有一个比较重要的方法着重看一下canSendMore()。
- /**
- *Canwesendmorerequeststothisnode?
- *
- *@paramnodeNodeinquestion
- *@returntrueiffwehavenorequestsstillbeingsenttothegivennode
- */
- publicbooleancanSendMore(Stringnode){
- //获得要发送到节点的ClientRequest队列
- Deque<ClientRequest>queue=requests.get(node);
- //如果节点出现请求堆积,未及时处理,则有可能出现请求超时的情况
- returnqueue==null||queue.isEmpty()||
- (queue.peekFirst().request().completed()
- &&queue.size()<this.maxInFlightRequestsPerConnection);
- }
了解完上面几个核心类之后,我们开始剖析NetworkClient的流程和实现。
2.4 NetworkClient
Kafka中所有的消息都都需要借助NetworkClient与上下游建立发送通道,其重要性不言而喻。这里我们只考虑消息成功的流程,异常处理不做解析,相对而言没那么重要,消息发送的流程大致如下:
1. 首先调用ready()方法,判断节点是否具备发送消息的条件
2. 通过isReady()方法判断是否可以往节点发送更多请求,用来检查是否有请求堆积
3. 使用initiateConnect初始化连接
4. 然后调用selector的connect()方法建立连接
5. 获取SocketChannel,与服务端建立连接
6. SocketChannel往Selector注册OP_CONNECT事件
7. 调用send()方式发送请求
8. 调用poll()方法处理请求
下面就根据消息发送流程涉及的核心方法进行剖析,了解每个流程中涉及的主要操作。
1. 检查节点是否满足消息发送条件
- /**
- *Beginconnectingtothegivennode,returntrueifwearealreadyconnectedandreadytosendtothatnode.
- *
- *@paramnodeThenodetocheck
- *@paramnowThecurrenttimestamp
- *@returnTrueifwearereadytosendtothegivennode
- */
- @Override
- publicbooleanready(Nodenode,longnow){
- if(node.isEmpty())
- thrownewIllegalArgumentException("Cannotconnecttoemptynode"+node);
- //判断要发送消息的主机,是否具备发送消息的条件
- if(isReady(node,now))
- returntrue;
- //判断是否可以尝试去建立网络
- if(connectionStates.canConnect(node.idString(),now))
- //ifweareinterestedinsendingtoanodeandwedon'thaveaconnectiontoit,initiateone
- //初始化连接
- //绑定了连接到事件而已
- initiateConnect(node,now);
- returnfalse;
- }
2. 初始化连接
- /**
- *Initiateaconnectiontothegivennode
- */
- privatevoidinitiateConnect(Nodenode,longnow){
- StringnodeConnectionId=node.idString();
- try{
- log.debug("Initiatingconnectiontonode{}at{}:{}.",node.id(),node.host(),node.port());
- this.connectionStates.connecting(nodeConnectionId,now);
- //开始建立连接
- selector.connect(nodeConnectionId,
- newInetSocketAddress(node.host(),node.port()),
- this.socketSendBuffer,
- this.socketReceiveBuffer);
- }catch(IOExceptione){
- /*attemptfailed,we'lltryagainafterthebackoff*/
- connectionStates.disconnected(nodeConnectionId,now);
- /*maybetheproblemisourmetadata,updateit*/
- metadataUpdater.requestUpdate();
- log.debug("Errorconnectingtonode{}at{}:{}:",node.id(),node.host(),node.port(),e);
- }
- }
3. initiateConnect()方法中调用的connect()方法就是Selectable实现类Selector的connect()方法,包括获取SocketChannel并注册OP_CONNECT、OP_READ、 OP_WRITE事件上面已经分析了,这里不做赘述,完成以上一系列建立网络连接的动作之后将消息请求发送到下游节点,Sender的send()方法会调用NetworkClient的send()方法进行发送,而NetworkClient的send()方法最终调用了Selector的send()方法。
- /**
- *Queueupthegivenrequestforsending.Requestscanonlybesentouttoreadynodes.
- *
- *@paramrequestTherequest
- *@paramnowThecurrenttimestamp
- */
- @Override
- publicvoidsend(ClientRequestrequest,longnow){
- StringnodeId=request.request().destination();
- //判断已经建立连接状态的节点能否接收更多请求
- if(!canSendRequest(nodeId))
- thrownewIllegalStateException("Attempttosendarequesttonode"+nodeId+"whichisnotready.");
- //发送ClientRequest
- doSend(request,now);
- }
- privatevoiddoSend(ClientRequestrequest,longnow){
- request.setSendTimeMs(now);
- //缓存请求
- this.inFlightRequests.add(request);
- selector.send(request.request());
- }
4. 最后调用poll()方法处理请求
- /**
- *Doactualreadsandwritestosockets.
- *
- *@paramtimeoutThemaximumamountoftimetowait(inms)forresponsesiftherearenoneimmediately,
- *mustbenon-negative.Theactualtimeoutwillbetheminimumoftimeout,requesttimeoutand
- *metadatatimeout
- *@paramnowThecurrenttimeinmilliseconds
- *@returnThelistofresponsesreceived
- */
- @Override
- publicList<ClientResponse>poll(longtimeout,longnow){
- //步骤一:请求更新元数据
- longmetadataTimeout=metadataUpdater.maybeUpdate(now);
- try{
- //步骤二:执行I/O操作,发送请求
- this.selector.poll(Utils.min(timeout,metadataTimeout,requestTimeoutMs));
- }catch(IOExceptione){
- log.error("UnexpectederrorduringI/O",e);
- }
- //processcompletedactions
- longupdatedNow=this.time.milliseconds();
- List<ClientResponse>responses=newArrayList<>();
- //步骤三:处理各种响应
- handleCompletedSends(responses,updatedNow);
- handleCompletedReceives(responses,updatedNow);
- handleDisconnections(responses,updatedNow);
- handleConnections();
- handleTimedOutRequests(responses,updatedNow);
- //invokecallbacks
- //循环调用ClientRequest的callback回调函数
- for(ClientResponseresponse:responses){
- if(response.request().hasCallback()){
- try{
- response.request().callback().onComplete(response);
- }catch(Exceptione){
- log.error("Uncaughterrorinrequestcompletion:",e);
- }
- }
- }
- returnresponses;
- }
有兴趣的童鞋可以继续深究回调函数的逻辑和Selector的操作。
补充说明:
上面有一个点没有涉及到,就是kafka的内存池,可以去看一下BufferPool这个类,这一块知识点应该是要在上一篇文章说的,突然想起来漏掉了,在这里做一下补充,对应就是我们前面说到的RecordAccumulator这个类的数据结构,封装好RecordAccumulator对象是有多个Dqueue组成,每个Dqueue由多个RecordBatch组成,除此之外,RecordAccumulator还包括了BufferPool内存池,这里再稍微回忆一下,RecordAccumulator类初始化了ConcurrentMap。
- publicfinalclassRecordAccumulator{
- ……
- privatefinalBufferPoolfree;
- ……
- privatefinalConcurrentMap<TopicPartition,Deque<RecordBatch>>batches;
- }
如图所示,我们重点关于内存的分配allocate()和释放deallocate()这个两个方法,有兴趣的小伙伴可以私底下看一下,这个类的代码总共也就三百多行,内容不是很多,欢迎一起交流学习,这里就不做展开了,免得影响本文的主题。
3. 小结
本文主要是剖析Kafka生产者发送消息的真正执行者Sender线程以及作为消息上下游的传输通道NetworkClient组件,主要涉及到NIO的应用,同时介绍了发送消息主要涉及的核心依赖类。写这篇文章主要是起到一个承上启下的作用,既是对前面分析Kafka生产者发送消息的补充,同时也为接下来剖析消费者消费上游消息作铺垫,写得有点不成体系,写文章的思路主要考虑用总分的思路作为线索去分析,个人觉得篇幅过长不方便阅读,所以会尽量精简,重点分析核心方法和流程,希望对读者有所帮助。
原文链接:https://mp.weixin.qq.com/s/_xjxjahAp76Oft05LYYYCA