zookeeper第四節(jié)課原理源碼分析資料_第1頁
zookeeper第四節(jié)課原理源碼分析資料_第2頁
zookeeper第四節(jié)課原理源碼分析資料_第3頁
zookeeper第四節(jié)課原理源碼分析資料_第4頁
zookeeper第四節(jié)課原理源碼分析資料_第5頁
已閱讀5頁,還剩14頁未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

圖靈技術(shù)交流網(wǎng)站這節(jié)課我們是zk的最后一節(jié)課。之前的課我們了解了zk的特性,并且可以用它們能有些事情,那其根本,本質(zhì),底層是如何做到的,這節(jié)課我們將為大家來解密。這節(jié)課會講到:服務(wù)端源碼原理、客戶端源碼原理、選舉算法、序列化、運(yùn)維、容災(zāi)等問題。ZK原理:源碼分析部分:服務(wù)端:服務(wù)啟動: public static void main(String args) QuorumPeerMain main = new QuorumPeerMain(); try main.initializeAndRun(args);/點(diǎn)這看 catch (IllegalArgumentException e) LOG.error(Invalid arguments, exiting abnormally, e); LOG.info(USAGE); System.err.println(USAGE); System.exit(2); catch (ConfigException e) LOG.error(Invalid config, exiting abnormally, e); System.err.println(Invalid config, exiting abnormally); System.exit(2); catch (Exception e) LOG.error(Unexpected exception, exiting abnormally, e); System.exit(1); LOG.info(Exiting normally); System.exit(0);org.apache.zookeeper.server.quorum.QuorumPeerMain#initializeAndRunprotected void initializeAndRun(String args) throws ConfigException, IOException /讀取zoo.cfg配置參數(shù) QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length = 1) config.parse(args0); / Start and schedule the the purge task /啟動日志清除任務(wù) DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval(); purgeMgr.start(); if (args.length = 1 & config.servers.size() 0) runFromConfig(config); /讀取到的配置進(jìn)行搞事xxoo 哈哈 else LOG.warn(Either no config or no quorum defined in config, running + in standalone mode); / there is only server in the quorum - run as standalone ZooKeeperServerMain.main(args); org.apache.zookeeper.server.quorum.QuorumPeerMain#runFromConfigpublic void runFromConfig(QuorumPeerConfig config) throws IOException try ManagedUtil.registerLog4jMBeans(); catch (JMException e) LOG.warn(Unable to register log4j JMX control, e); LOG.info(Starting quorum peer); try ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns();/創(chuàng)建服務(wù)端的Socket實(shí)列 quorumPeer = new QuorumPeer();/confg讀取到的zoo.cfg賦值 quorumPeer.setClientPortAddress(config.getClientPortAddress(); quorumPeer.setTxnFactory(new FileTxnSnapLog( new File(config.getDataLogDir(), new File(config.getDataDir(); quorumPeer.setQuorumPeers(config.getServers(); quorumPeer.setElectionType(config.getElectionAlg(); quorumPeer.setMyid(config.getServerId(); quorumPeer.setTickTime(config.getTickTime(); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout(); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout(); quorumPeer.setInitLimit(config.getInitLimit(); quorumPeer.setSyncLimit(config.getSyncLimit(); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory(); quorumPeer.setLearnerType(config.getPeerType(); quorumPeer.setSyncEnabled(config.getSyncEnabled(); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs(); quorumPeer.start(); /調(diào)用start方法 注意這不是調(diào)用線程的start方法 quorumPeer.join(); catch (InterruptedException e) / warn, but generally this is ok LOG.warn(Quorum Peer interrupted, e); org.apache.zookeeper.server.quorum.QuorumPeer#startOverridepublic synchronized void start() loadDataBase();/先從內(nèi)存中恢復(fù)數(shù)據(jù)寫到文件中 cnxnFactory.start(); /啟動服務(wù)器端Socket實(shí)現(xiàn) startLeaderElection();/開始選舉 super.start();/這才真正調(diào)用線程的start方法也就會執(zhí)行run方法org.apache.zookeeper.server.NIOServerCnxnFactory#run 服務(wù)端建立鏈接public void run() while (!ss.socket().isClosed() try selector.select(1000); Set selected; synchronized (this) selected = selector.selectedKeys(); ArrayList selectedList = new ArrayList( selected); Collections.shuffle(selectedList);/亂序 for (SelectionKey k : selectedList) if (k.readyOps() & SelectionKey.OP_ACCEPT) != 0) SocketChannel sc = (ServerSocketChannel) k .channel().accept(); InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia);/調(diào)用zoo.cfg配置的客戶端連接數(shù)是否超過了 if (maxClientCnxns 0 & cnxncount = maxClientCnxns) LOG.warn(Too many connections from + ia + - max is + maxClientCnxns ); sc.close(); else LOG.info(Accepted socket connection from + sc.socket().getRemoteSocketAddress(); sc.configureBlocking(false);/監(jiān)聽read事件 SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);/創(chuàng)建內(nèi)部 NIOServerCnxn cnxn = createConnection(sc, sk); sk.attach(cnxn); addCnxn(cnxn); else if (k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE) != 0) /處理讀和寫事件操作 NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k);/不建議跟下去了 else if (LOG.isDebugEnabled() LOG.debug(Unexpected ops in select + k.readyOps(); /清除 下次之需 selected.clear(); catch (RuntimeException e) LOG.warn(Ignoring unexpected runtime exception, e); catch (Exception e) LOG.warn(Ignoring exception, e); closeAll(); LOG.info(NIOServerCnxn factory exited run method);org.apache.zookeeper.server.quorum.QuorumPeer#startLeaderElection 選舉開始synchronized public void startLeaderElection() try currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch();/投票給自己 catch(IOException e) RuntimeException re = new RuntimeException(e.getMessage(); re.setStackTrace(e.getStackTrace(); throw re; /從配置中拿自己的選舉地址 for (QuorumServer p : getView().values() if (p.id = myid) myQuorumAddr = p.addr; break; if (myQuorumAddr = null) throw new RuntimeException(My id + myid + not in the peer list); if (electionType = 0) try udpSocket = new DatagramSocket(myQuorumAddr.getPort(); responder = new ResponderThread(); responder.start(); catch (SocketException e) throw new RuntimeException(e); this.electionAlg = createElectionAlgorithm(electionType); /這是選舉的開始o(jì)rg.apache.zookeeper.server.quorum.FastLeaderElection#starter 選舉初始化private void starter(QuorumPeer self, QuorumCnxManager manager) this.self = self; proposedLeader = -1; proposedZxid = -1; sendqueue = new LinkedBlockingQueue(); recvqueue = new LinkedBlockingQueue(); this.messenger = new Messenger(manager);org.apache.zookeeper.server.quorum.QuorumPeer#run選舉開始這就不貼代碼了 其次可以看看FastLeaderElection中的lookForLeader方法 在這個run方法中會調(diào)用它 產(chǎn)生leader和follower客戶端:客戶端:public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException LOG.info(Initiating client connection, connectString= + connectString + sessionTimeout= + sessionTimeout + watcher= + watcher); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses();/拿到ip端口號 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);/創(chuàng)建ClientCnxn對象 cnxn.start();/非thread線程啟動org.apache.zookeeper.ClientCnxn#ClientCnxn初始化 啟動了兩個線程 send和eventpublic ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte sessionPasswd, boolean canBeReadOnly) this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread();org.apache.zookeeper.ClientCnxn#start連個線程startrun方法public void start() sendThread.start(); eventThread.start();org.apache.zookeeper.ClientCnxn.SendThread#runOverridepublic void run() clientCnxnSroduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); /客戶端和服務(wù)端鏈接的socket更新 int to; long lastPingRwServer = System.currentTimeMillis(); final int MAX_SEND_PING_INTERVAL = 10000; /10 seconds while (state.isAlive() try if (!clientCnxnSocket.isConnected() if(!isFirstConnect) try Thread.sleep(r.nextInt(1000); catch (InterruptedException e) LOG.warn(Unexpected exception, e); / dont re-establish connection if we are closing if (closing | !state.isAlive() break; startConnect(); clientCnxnSocket.updateLastSendAndHeard(); if (state.isConnected() / determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() = ZooKeeperSaslClient.SaslState.INITIAL) try zooKeeperSaslClient.initialize(ClientCnxn.this); catch (SaslException e) LOG.error(SASL authentication with Zookeeper Quorum member failed: + e); state = States.AUTH_FAILED; sendAuthEvent = true; KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) if (authState = KeeperState.AuthFailed) / An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; else if (authState = KeeperState.SaslAuthenticated) sendAuthEvent = true; if (sendAuthEvent = true) eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null); to = readTimeout - clientCnxnSocket.getIdleRecv(); else to = connectTimeout - clientCnxnSocket.getIdleRecv(); if (to 1000) ? 1000 : 0); /send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing MAX_SEND_PING_INTERVAL) sendPing();/發(fā)送心跳 clientCnxnSocket.updateLastSend(); else if (timeToNextPing = pingRwTimeout) lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); to = Math.min(to, pingRwTimeout - idlePingRwServer); clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);/這個方法比較長 重點(diǎn)看這 catch (Throwable e) if (closing) if (LOG.isDebugEnabled() / closing so this is expected LOG.debug(An exception was thrown while closing send thread for session 0x + Long.toHexString(getSessionId() + : + e.getMessage(); break; else / this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) LOG.info(e.getMessage() + , closing socket connection); else if (e instanceof SessionTimeoutException) LOG.info(e.getMessage() + RETRY_CONN_MSG); else if (e instanceof EndOfStreamException) LOG.info(e.getMessage() + RETRY_CONN_MSG); else if (e instanceof RWServerFoundException) LOG.info(e.getMessage(); else LOG.warn( Session 0x + Long.toHexString(getSessionId() + for server + clientCnxnSocket.getRemoteSocketAddress() + , unexpected error + RETRY_CONN_MSG, e); cleanup(); if (state.isAlive() eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); cleanup(); clientCnxnSocket.close(); if (state.isAlive() eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null); ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), SendThread exited loop for session: 0x + Long.toHexString(getSessionId();org.apache.zookeeper.ClientCnxnSocketNIO#doTransport 真正干事的Overridevoid doTransport(int waitTimeOut, List pendingQueue, LinkedList outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException selector.select(waitTimeOut); Set selected; synchronized (this) selected = selector.selectedKeys(); / Everything below and until we get back to the select is / non blocking, so time is effectively a constant. That is / Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) SocketChannel sc = (SocketChannel) k.channel(); if (k.readyOps() & SelectionKey.OP_CONNECT) != 0) if (sc.finishConnect() updateLastSendAndHeard(); sendThread.primeConnection(); else if (k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE) != 0) doIO(pendingQueue, outgoingQueue, cnxn);/這是處理客戶端往服務(wù)端發(fā)送的數(shù)據(jù) 鏈接之后會處理讀和寫操作 這不往下跟代碼了 if (sendThread.getZkState().is

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論