ZooKeeper单机客户端的启动流程源码阅读
- 2019 年 10 月 3 日
- 筆記
客户端的启动流程
看上面的客户端启动的脚本图,可以看到,zookeeper客户端脚本运行的入口ZookeeperMain.java的main()方法, 关于这个类可以理解成它是程序启动的辅助类,由它提供开始的位置,进而加载出zk client的上下文
创建ZooKeeperMain对象
// todo zookeeper的入口方法 public static void main(String args[]) throws KeeperException, IOException, InterruptedException { // todo new ZK客户端 ZooKeeperMain main = new ZooKeeperMain(args); // todo run方法的实现在下面 main.run(); }
跟踪
ZooKeeperMain main = new ZooKeeperMain(args);
能往下追很长的代码,提前说main.run()的作用,就是对用户输入的命令进行下一步处理
如上是入口函数的位置,跟进这两个函数,可以找到我们在client端的命令行中可以输入命令和zookeeper服务端进行通信的原因(开起了新的线程),以及zookeeper的客户端所依赖的其他类
跟进ZooKeeperMain main = new ZooKeeperMain(args);
public ZooKeeperMain(String args[]) throws IOException, InterruptedException { cl.parseOptions(args); System.out.println("Connecting to " + cl.getOption("server")); // todo 连接到客户端 connectToZK(cl.getOption("server")); }
我们在命令行启动客户端时,输入命令zkCli.sh -server localhost:2181
,其中的args数组, 就是我们在启动就是我们输入的参数,
构建zookeeperMain
对象时,上面主要做了两件事
- 解析args参数数组
- 连接客户端
解析参数数组的逻辑就在下面, 很熟悉,就是我们在命令行启动zookeeper时输入的命令可选项
public boolean parseOptions(String[] args) { List<String> argList = Arrays.asList(args); Iterator<String> it = argList.iterator(); while (it.hasNext()) { String opt = it.next(); try { if (opt.equals("-server")) { options.put("server", it.next()); } else if (opt.equals("-timeout")) { options.put("timeout", it.next()); } else if (opt.equals("-r")) { options.put("readonly", "true"); } } catch (NoSuchElementException e) { System.err.println("Error: no argument found for option " + opt); return false; } if (!opt.startsWith("-")) { command = opt; cmdArgs = new ArrayList<String>(); cmdArgs.add(command); while (it.hasNext()) { cmdArgs.add(it.next()); } return true; } } return true; }
创建ZooKeeper客户端的对象
接着看如果连接客户端, connectToZK(String newHost)
同样是本类方法,源码如下:
// todo 来到这里 protected void connectToZK(String newHost) throws InterruptedException, IOException { if (zk != null && zk.getState().isAlive()) { zk.close(); } //todo 命令行中的server 后面跟着 host主机地址 host = newHost; boolean readOnly = cl.getOption("readonly") != null; // todo 创建zookeeper的实例 zk = new ZooKeeper(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly); }
到这里算是个小高潮吧,毕竟看到了zookeeper client的封装类ZooKeeper
, 这个类上的注解大概是这么介绍这个类的
- 它是个Zookeeper 客户端的封装类, 它的第一个参数是
host:port,host:port,host:port
这种格式的字符串,逗号左右都是一个不同步的服务端的地址 - 会异步的创建session,通常这个session在构造函数执行完之间就已经创建完成了
- watcher 是监听者,它被通知的时刻不确定,可能是构造方法执行完成前,也可能在这之后
- 只要没有连接成功, zookeeper客户端,会一直尝试从提供的服务地址串中选择出一个尝试链接
跟进ZooKeeper
的构造方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws IOException{ LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager.defaultWatcher = watcher; // todo 包装服务端的地址 ConnectStringParser connectStringParser = new ConnectStringParser( connectString); //todo 将服务端的地址封装进 StaticHostProvider -> HostProvider中 HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses()); // todo 创建客户端的上下文, 这个上下文对象的亮点就是它维护了一个客户端的socket cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, // todo 跟进这个方法,getClientCnxnSocket, 获取出客户端上下文中的socket getClientCnxnSocket(), canBeReadOnly); // todo 启动客户端 cnxn.start(); }
主要做了这么几件事
- 将服务端的地址解析封装进了
StaticHostProvider
类中, 可以把这个类理解成专门存放服务端地址的set 集合 - 创建出了客户端的上下文对象: ClientCnxn, 当然在这之前,入参位置还有一个
getClientCnxnSocket()
这个函数可以创建出客户端的NIO Socket -
然后调用
cnxn.start()
其实就是启动了客户端的另外两条线程sendThread
和eventThread
下面会详细说创建客户端的 NioSocket
继续跟进源码getClientCnxnSocket()
通过反射,zk客户端使用的socket对象是ClientCnxnSocketNIO
//todo 通过反射创建出客户端上下文中的 socket , 实际的ClientCnxnSocketNIO 是 ClientCnxnSocket的子类 // todo ---> zookeeper 封装的 NIO的逻辑都在 实际的ClientCnxnSocketNIO private static ClientCnxnSocket getClientCnxnSocket() throws IOException { // todo zookeeper.clientCnxnSocket String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET); if (clientCnxnSocketName == null) { // todo 上面String其实就是这个类的name, 根进去看一下它的属性 // todo 这个类维护了NioSocket使用到的 selector 选择器 , 已经发生的感兴趣的事件SelectionKey clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); } try { // todo 可以看到客户端使用的 NioSocket return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor() .newInstance(); } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName); ioe.initCause(e); throw ioe; } }
创建 ClientCnxn客户端的上下文
创建上下文,构造函数中的诸多属性都是在前面读取配置文件或是新添加进来的,重点是最后两行,它创建了两条线程类,和zk客户端的IO息息相关
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; // todo 刚才传递过来的值为0 this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; connectTimeout = sessionTimeout / hostProvider.size(); // todo 添加read的超时时间 readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; // todo 创建了一个seadThread 线程 sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); }
创建SendThread
sendThred是一个客户端的线程类,什么时候开启? 其实就在上面,当创建了ClientCnxn后,调用的cnxn.start()就是在开启它的run() , 它有什么作用? 它的run()是一个无限循环,除非运到了close的条件,否则他就会一直循环下去, 比如向服务端发送心跳,或者向服务端发送我们在控制台输入的数据以及接受服务端发送过来的响应
这是他的构造方法,可以看到它还是一个守护线程,并拥有客户端socket的应用,有了NIO Socket相关技能
//todo SendThread(ClientCnxnSocket clientCnxnSocket) { super(makeThreadName("-SendThread()")); // todo 设置状态 Connecting state = States.CONNECTING; // todo 就是在 Zookeeper new ClientCnxn 时, 在倒数第二个位置使传递进去一个函数实际的 this.clientCnxnSocket = clientCnxnSocket; // todo 设置成守护线程 setDaemon(true); }
它的Run方法, 真的是好长啊, 比我上面写的部分内容还长(大概两百行了), 大概它的流程 ,每次循环:
- 检查一下客户端的socket有没有和服务端的socket建立连接
- 没有建立连接
- 尝试选出其他的server地址进行连接
- 如果满足close的条件,直接break 跳出整个while循环
- 如果已经建立了连接
- 计算 to = 读取的超时时间 – 服务端的响应时间
- 未连接的状态
- 计算 to = 连接超时时间 – 服务端的响应时间
- 上面的两个to, 如果小于0, 说明客户端和服务端通信出现了异常, 很可能是server的session time out,于是抛出异常
- 如果连接状态是健康的,向服务端发送心跳
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
向服务端发送数据
- 没有建立连接
在这个负责和服务端进行IO操作的线程中,只要不是close等重大的一般可以预知的异常都有try起来,然后记录日志,并没有其他操作,循环还是会进行
// todo introduce 介绍 clientCnxnSocket.introduce(this,sessionId); // todo this,sessionId == 0 clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds InetSocketAddress serverAddress = null; // todo 这个while循环中存在建立连接的过程, 已经连接建立失败后不断重试的过程 //todo state.isAlive() 默认是 NOT_CONNECTED while (state.isAlive()) { try { //todo 1111 如果socket还没有连接 ///////////////////////////////////////////////////////////////////////////////////////////////////////// //todo 如果socket还没有连接 if (!clientCnxnSocket.isConnected()) { // todo 判断是不是第一次连接, 如果不是第一次进入下面try代码块, 随机产生一个小于一秒的时间 if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing // todo 如果是closing 或者 已经关闭了, 直接退出这个循环 if (closing || !state.isAlive()) { break; } if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { // todo 连接失败时,来这里重试连接 // todo 从我们传递进来的host地址中选择一个地址 serverAddress = hostProvider.next(1000); } // todo client和server进行socket连接 // todo 跟进去 ,实现逻辑在上面 // todo 这个方法开始建立连接,并将 isFasterConnect改成了 false startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); } //todo 2222 如果socket处于连接状态 ///////////////////////////////////////////////////////////////////////////////////////////////////////// // todo 下面的连接状态 if (state.isConnected()) { // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent == true) { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); } } // todo 连接成功的话执行to 为下面值 // todo to = 读取的超时时间 - 上一次的读取时间 // todo 如果预订的超时时间 - 上次读的时间 <= 0 说明超时了 to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { // todo 如果没有连接成功, 就会来到这里, 给 to 赋值 to = connectTimeout - clientCnxnSocket.getIdleRecv(); } //todo 3333 异常处理 ///////////////////////////////////////////////////////////////////////////////////////////////////////// // todo 下面抛出来了异常 if (to <= 0) { String warnInfo; warnInfo = "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId); LOG.warn(warnInfo); // todo 这里抛出来了异常, 下面的try 就会把它抓住 throw new SessionTimeoutException(warnInfo); } //todo 44444 连接成功执行的逻辑 ///////////////////////////////////////////////////////////////////////////////////////////////////////// // todo 下面的是连接成功执行的逻辑 if (state.isConnected()) { // todo 为了防止竞争状态丢失发送第二个ping, 同时也避免出现很多的ping //1000(1 second) is to prevent(阻止) race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { // todo 客户端一直在这里循环, 如果连接成功的话, 每次循环都来到这个逻辑这里发送 ping sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } //todo 55555 ///////////////////////////////////////////////////////////////////////////////////////////////////////// // If we are in read-only mode, seek for read/write server // todo 只读状态 相关逻辑 if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } //todo 66666 ///////////////////////////////////////////////////////////////////////////////////////////////////////// // todo 消费outgoingqueue, 完成向服务端的发送发送 // todo doTransport 是 ClientCnxnSocket 的抽象方法, 实现类clientCnxnSocketNio clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } catch (Throwable e) { // todo 在这个try中处理里面的抛出来的异常 if (closing) { // todo 如果是请求关闭, 直接退出 break 出while循环 if (LOG.isDebugEnabled()) { // closing so this is expected LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId()) + " : " + e.getMessage()); } break; } else { // todo 只要不是退出异常, 下面的异常都是仅仅打印了一下出现了什么异常 // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } else if (e instanceof SessionTimeoutException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof EndOfStreamException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof RWServerFoundException) { LOG.info(e.getMessage()); } else if (e instanceof SocketException) { LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage()); } else { LOG.warn("Session 0x{} for server {}, unexpected error{}", Long.toHexString(getSessionId()), serverAddress, RETRY_CONN_MSG, e); } // todo 这个方法中, isFirstConnect = true cleanup(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } } } // todo while循环的结束符号 , 这是个while循环, 除了上面的close其他异常都会继续循环, 接着上去再看一遍 cleanup(); clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); }
在上面这个200行的Run方法中比较值得注意的几个方法如下
- 如果做到下次选出一个非当前server的地址
针对下标运行,对数组的size取模, 再赋值给自己,所以就实现了从0 – array.size()的循环
public InetSocketAddress next(long spinDelay) { currentIndex = ++currentIndex % serverAddresses.size(); if (currentIndex == lastIndex && spinDelay > 0) { try { Thread.sleep(spinDelay); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } else if (lastIndex == -1) { // We don't want to sleep on the first ever connect attempt. lastIndex = 0; }
- 如果检查到了没有连接的话,就是用clientCnxnSocket进行连接
这个函数中,将标记是否是第一次连接的标记置为了flase, 并且拿到了sessionid
// todo 保证连接的逻辑 void primeConnection() throws IOException { LOG.info("Socket connection established to " + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session"); isFirstConnect = false; //todo 创建了一个建立连接的request, 并且在下面将它添加进来 outgoingqueue long sessId = (seenRwServerBefore) ? sessionId : 0; ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); synchronized (outgoingQueue) { ... 如watcher 相关的逻辑
SendThread 和 服务端的IO沟通
跟进上面Run方法的如下方法,doTranprot
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
他是本类的抽象方法,具体的实现类是clientCnxnSocketNIO
跟进这个方法,其中有一步跟重要doIO(pendingQueue, outgoingQueue, cnxn);
for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); // todo 建立连接的逻辑 if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // todo 往服务端发送数据的逻辑 , 方法在上面的64行 doIO(pendingQueue, outgoingQueue, cnxn); } }
- DoIo的源码如下
它分成了两大模块
- 读就绪, 读取服务端发送过来的数据
- 写就绪, 往客户端发送用户在控制台输入的命令
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { // todo 通过key获取服务端的channel SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } // TODO 读就绪 if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { // todo 返回buffer incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { //todo 连接有没有初始化, 来这之前被改成了 flase ,现在 // todo 读取服务端发给我的连接请求的结果 readConnectResult(); // primeConnection() enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { //todo 如果已经初始化了, 就来这里读取响应, 跟进去 sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } //todo 写就绪 if (sockKey.isWritable()) { synchronized(outgoingQueue) { // todo 查询出可发送的packet Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } // todo 往服务端发送数据 packet.ByteBuf sock.write(p.bb); // 发送服务端 if (!p.bb.hasRemaining()) { //todo !hasRemaining 没有剩余的数据 sentCount++; // todo 将发送过的packet从outgoingqueue移除 outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { // todo 如果刚才的请求头的类型不是null , 不是ping ,不是权限验证 就把packet添加到 pendingQueue /** * These are the packets that have been sent and are waiting for a response. * todo 这个penddingQueue 存放的是已经发送的 和 等待服务器响应的packet */ pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { e.html disableWrite(); } else { // Just in case enableWrite(); } } } }
思考:
虽然找到了客户端往服务端发送数据的代码, 但是问题来了, 它发送的什么数据啊? 在上面可以看到,它每次发送的数据都被包装车成了packet类型,并且,继续往下跟进可以看到这个packet来自于一个叫outgoingqueue的队列中
client想往服务端发送什么?其实发送就是我们手动输入的命令,只不过他把我们的命令解析出来并且进行了封装,进行了哪些封装? String-> request -> packet -> socket ,这个packet就在上面的部分被消费
到目前为止,算上一开始的主线程,其实已经有3条线程了, 分别是主线程,SendThread和eventThread
代码读到这里,sendThread部分其实已经结束了,我们直到了它正在消费outgoingqueue中的内容,接下来的任务返回回去,从新回到 ZooKeeperMain中,看一开始主线程时如何处理用户在命令行的输入的
// todo zookeeper的入口方法 public static void main(String args[]) throws KeeperException, IOException, InterruptedException { // todo new ZK客户端 ZooKeeperMain main = new ZooKeeperMain(args); // todo run方法的实现在下面 main.run(); }
跟进 main.run()
, 主要做了如下几件事
- 通过反射创建出可以获取控制台输入的对象
jline.ConsoleReader
- 通过反射创建出可以解析键盘录入的对象
- 开启while循环,等待用户的输入,处理用户的输入
executeLine(line);
@SuppressWarnings("unchecked") void run() throws KeeperException, IOException, InterruptedException { if (cl.getCommand() == null) { System.out.println("Welcome to ZooKeeper!"); boolean jlinemissing = false; // only use jline if it's in the classpath try { // todo jline.ConsoleReader是java命令行的实现类, 获取可从控制台接受输入的对象 Class<?> consoleC = Class.forName("jline.ConsoleReader"); Class<?> completorC = Class.forName("org.apache.zookeeper.JLineZNodeCompletor"); System.out.println("JLine support is enabled"); // todo 使用反射获取实例 Object console = consoleC.getConstructor().newInstance(); Object completor = completorC.getConstructor(ZooKeeper.class).newInstance(zk); // todo 通过反射获取某指定类的指定方法 Completor Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor")); addCompletor.invoke(console, completor); String line; Method readLine = consoleC.getMethod("readLine", String.class); // todo 我们在命令行中输入的那些命令最终都会来到这里执行 // todo getPrompt() 方法 就是在控制台上打印出了命令行的前缀--- [zk: " + host + "("+zk.getState()+")" + " " + commandCount + "] " while ((line = (String) readLine.invoke(console, getPrompt())) != null) { // todo 执行命令行的输入 executeLine(line); } } catch (ClassNotFoundException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (NoSuchMethodException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (InvocationTargetException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (IllegalAccessException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (InstantiationException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } if (jlinemissing) { System.out.println("JLine support is disabled"); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); String line; while ((line = br.readLine()) != null) { executeLine(line); } } } else { // Command line args non-null. Run what was passed. processCmd(cl); } }
继续跟进 executeLine(line);
,做了如下几件事
- 处理用户输入
- 将命令添加到历史命令
- 处理命令
- 命令数+1
public void executeLine(String line) throws InterruptedException, IOException, KeeperException { if (!line.equals("")) { cl.parseCommand(line); // todo 添加到历史命令 addToHistory(commandCount, line); // todo 具体处理命令 processCmd(cl); // todo 命令次数+1 commandCount++; } }
处理命令的逻辑如下:
将命令解析出来,通过if分支语句,判断用户输入的什么命令, 然后再进一步处理
// todo zookeeper客户端, 处理用户输入命令的具体逻辑 // todo 用大白话讲,下面其实就是把 从控制台获取的用户的输入信息转换成指定的字符, 然后发送到服务端 // todo MyCommandOptions 是处理命令行选项和shell脚本的工具类 protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { // todo 在这个方法中可以看到很多的命令行所支持的命令 Stat stat = new Stat(); // todo 获取命令行输入中 0 1 2 3 ... 位置的内容, 比如 0 位置是命令 1 2 3 位置可能就是不同的参数 String[] args = co.getArgArray(); String cmd = co.getCommand(); if (args.length < 1) { usage(); return false; } if (!commandMap.containsKey(cmd)) { usage(); return false; } boolean watch = args.length > 2; String path = null; List<ACL> acl = Ids.OPEN_ACL_UNSAFE; LOG.debug("Processing " + cmd); ... // todo 看看这个create命令的实现, 如果是-e 就是很 CreateMode= ephemeral sequential 时序的 if (cmd.equals("create") && args.length >= 3) { int first = 0; CreateMode flags = CreateMode.PERSISTENT; if ((args[1].equals("-e") && args[2].equals("-s")) || (args[1]).equals("-s") && (args[2].equals("-e"))) { first += 2; flags = CreateMode.EPHEMERAL_SEQUENTIAL; } else if (args[1].equals("-e")) { first++; flags = CreateMode.EPHEMERAL; } else if (args[1].equals("-s")) { first++; flags = CreateMode.PERSISTENT_SEQUENTIAL; } ...
比如,用户输入的是创建新节点的命令create /path
, 就会有下面的函数处理
// todo 调用Zookeeper的 create方法, String newPath = zk.create(path, args[first + 2].getBytes(), acl, flags);
跟进这个方法 , 主要做了下面几件事
- 校验合法性
- 封装进 request
- 添加acl
- 提交submitRequest(),他是个重要的阻塞方法,每次执行都会阻塞等待服务端的响应
- 等待响应结果
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; // todo 验证,path string 的合法性, 根据去查看 PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); // todo 创建请求头, 不同的操作有不同的头标记 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); // todo 将命令行里面的内容嵌入到request request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null && acl.size() == 0) { throw new KeeperException.InvalidACLException(); } // todo 添加权限 request.setAcl(acl); // todo 通过上下文, 将包装后的用户的request 提交到socket 传递到server , 跟进去看看 ReplyHeader r =submitRequest // todo 判断是否出错了 if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath(); } else { return response.getPath().substring(cnxn.chrootPath.length()); } }
客户端的阻塞式等待
跟进submitRequest()
// todo 这是ClientCnxn的类, 提交请求, 最终将我们的请求传递到socket // todo 返回一个header, 因为根据它判断是否是否出错了 public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); // todo 来到这个 queuePacket() 方法在下面, 这个方法就是将 用户输入-> string ->>> request ->>> packet 的过程 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); // todo 使用同步代码块,在下面的进行 同步阻塞等待, 直到有了Response响应才会跳出这个循环, 这个finished状态就是在客户端接受到服务端的 // todo 的响应后, 将服务端的响应解析出来,然后放置到 pendingqueue里时,设置上去的 synchronized (packet) { while (!packet.finished) { // todo 这个等待是需要唤醒的 packet.wait(); } } // todo 直到上面的代码块被唤醒,才会这个方法才会返回 return r; }
在上面的代码中,可以看到可以他是使用一个while(!packet,finishes){} 来阻塞程序的, 刚看看到用户的命令被封装进了request, 接下来, 在queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
中,可以看到他被封装进packet,然后添加到outgoingqueue队列中,源码如下
// todo Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. // todo 它会为我们的没有 Xid 的packet生成 Xid // It is generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // todo 她会在ClientCnxnSocket::doIO()之后生成 // where the packet is actually sent. // todo packet实际生成的位置 synchronized (outgoingQueue) { // todo 将用户传递过来的信息包装成 Packet packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing // todo 如果客户端正在发送关闭session的请求, 就标记成 closing = true if (h.getType() == OpCode.closeSession) { closing = true; } // todo 将packet 添加到队列里面 // todo 这个什么时候会被消费呢? 是在sendthread的无限循环中被消费的, 因为那是第二条线程 outgoingQueue.add(packet); } } // todo getClientCnxnSocket() 获取ClientCnxnSocket对象 // todo wakeupCnxn() 是 ClientCnxnSocket对象 中的抽象方法, 实现类是 ClientCnxnSocket的实现类ClientCnxnSocketNio // 唤醒阻塞在selector.select上的线程,让该线程及时去处理其他事情,比如这里的让sendThread 干净去消费packet sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
在这个方法的最后一行,点睛,selector.wakeup(); 就是通知选择器,别再阻塞select了,赶紧去做其他工作
因为选择器在sendThread的doTransport()方法中,有阻塞的操作,我重新把代码贴出来如下
服务端的NIOSocket -> ClientCnxnSocket 都是ClientCnxn上下文的封装类的,SendThread同样也是,它可以使用
现在再看,唤醒selector 让他去做其他事 ,其实即使doIO(),这个方法代码其实我在上面贴出来过,就是分成两大部分,读就绪与写就绪
// todo 往服务端发送 packet //todo 下面就是NIO 网络编程的逻辑了 @Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { // todo 选择器在waitTimeOut时间内阻塞轮询== 上一次计算的 to时间 selector.select(waitTimeOut); Set<SelectionKey> selected; // todo 获取channel注册进selector时返回的key synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is non blocking, // so time is effectively a constant. That is Why we just have to do this once, here // todo 直到我们重新回到select之前, 下面的全部操作都是非阻塞的 // todo 因此时间只是一个常数, 那就是为什么我们在这里用下面的函数 updateNow(); // for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); // todo 建立连接的逻辑 if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // todo 往服务端发送数据的逻辑 , 方法在上面的64行 doIO(pendingQueue, outgoingQueue, cnxn); } }
写到这里其实已经把整个过程顺下来了,下面再重新看看,sendThread是如果消费packet并且修改然后得到服务端的响应,修改pakcet.finished属性的, 因为现在主线的submitRequest还在阻塞呢
往服务端写
客户端的socket的实现类是ClientCnxnSocketNio
, 它往服务端写的逻辑如下, 不难看出使用的java原生的sock.write(p.bb); // 发送服务端
, 亮点是后面的操作pendingQueue.add(p);
被写过的packet被添加到了pengingqueue中
if (sockKey.isWritable()) { synchronized(outgoingQueue) { // todo 查询出可发送的packet Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } // todo 往服务端发送数据 packet.ByteBuf sock.write(p.bb); // 发送服务端 if (!p.bb.hasRemaining()) { //todo !hasRemaining 没有剩余的数据 sentCount++; // todo 将发送过的packet从outgoingqueue移除 outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { // todo 如果刚才的请求头的类型不是null , 不是ping ,不是权限验证 就把packet添加到 pendingQueue /** * These are the packets that have been sent and are waiting for a response. * todo 这个penddingQueue 存放的是已经发送的 和 等待服务器响应的packet */ pendingQueue.add(p); } } }
上面说了, 为啥被使用过的pakcet还要保留一份呢? 还是那个原因,主线程还因为pakcet的finish状态未被该变而阻塞呢, 那什么时候改变呢? 答案是受到服务端的响应之后改变,在哪里收到呢? 就是DoIo()的读就绪模块,下面附上源码,它的解析我写在这段代码下面
从服务端读
if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { // todo 返回buffer incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { //todo 连接有没有初始化, 来这之前被改成了 flase ,现在 // todo 读取服务端发给我的连接请求的结果 readConnectResult(); // primeConnection() enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { //todo 如果已经初始化了, 就来这里读取响应, 跟进去 sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); }
如上代码的最后部分,sendThread.readResponse(incomingBuffer);
下面是它的源码,它首先是从buffer中读取出服务端的发送的数据,然后一通解析,封装进pendingqueue的packet中,并且在方法的最后部分终于完成了状态的修改
// todo 同样是 sendThread的方法, 读取响应 // todo 是经过flip 反转后的 可读的buffer void readResponse(ByteBuffer incomingBuffer) throws IOException { // todo --------------------- 从服务端写回来的buffer中解析封装成 ReplyHeader ---------------------------- ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); // todo --------------------------------------------------------------------- // todo 下面根据 ReplyHeader 的 xid 判断响应的结果类型 if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } eventThread.queueEvent( we ); return; } // If SASL authentication is currently in progress, construct and // send a response packet immediately, rather than queuing a // response as with other packets. if (clientTunneledAuthenticationInProgress()) { GetSASLRequest request = new GetSASLRequest(); request.deserialize(bbia,"token"); zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); return; } Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } // todo 从pendingQueue 中取出第一个packet packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response to the first request! * // todo 因为请求存在队列中,是有顺序的, 因此我们最好对第一个做出相应 */ try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); } // todo 把todo 从服务端解析出来的结果赋值给 pendingQueue 中的packet packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } if (LOG.isDebugEnabled()) { LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet); } } finally { // todo 跟进这个方法 finishPacket(packet); } }
解开客户端的阻塞状态
进入finishPacket(packet)
// todo ClientCnxn 也就是本类中, 在根据用户的输入向服务端提交命令后的那个 wait唤醒了, finished=true,使得原来的while循环退出了 private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } // todo 唤醒 Zookeeper中 submitRequest() 提交请求后的阻塞操作, 现在拿到请求后进行唤醒 if (p.cb == null) { synchronized (p) { //todo 将这个finish 改成true, 在 p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }