台湾黑帽seo 蜘蛛池:深入理解 ZooKeeper单机客户端的启动流程_黑帽SEO优化

频道:SEO技术 日期: 浏览:636
:前端初探 Gitlab CI/CD

客户端的启动流程


看上面的客户端启动的脚本图,可以看到,zookeeper客户端脚本运行的入口ZookeeperMain.java的main()方法, 关于这个类可以理解成它是程序启动的辅助类,由它提供开始的位置,进而加载出zk client的上下文

创建ZooKeeperMain对象

// todo zookeeper的入口方法 public static void main(String args[]) throws KeeperException, IOException, InterruptedException {     // todo new ZK客户端     ZooKeeperMain main = new ZooKeeperMain(args);      // todo run方法的实现在下面     main.run(); }

跟踪ZooKeeperMain main = new ZooKeeperMain(args); 能往下追很长的代码,提前说main.run()的作用,就是对用户输入的命令进行下一步处理

如上是入口函数的位置,跟进这两个函数,可以找到我们在client端的命令行中可以输入命令和zookeeper服务端进行通信的原因(开起了新的线程),以及zookeeper的客户端所依赖的其他类

跟进ZooKeeperMain main = new ZooKeeperMain(args);

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }

我们在命令行启动客户端时,输入命令zkCli.sh -server localhost:2181,其中的args数组, 就是我们在启动就是我们输入的参数,

构建zookeeperMain对象时,上面主要做了两件事

  • 解析args参数数组
  • 连接客户端

解析参数数组的逻辑就在下面, 很熟悉,就是我们在命令行启动zookeeper时输入的命令可选项

  public boolean parseOptions(String[] args) {     List<String> argList = Arrays.asList(args);     Iterator<String> it = argList.iterator();      while (it.hasNext()) {         String opt = it.next();         try {             if (opt.equals("-server")) {                 options.put("server", it.next());             } else if (opt.equals("-timeout")) {                 options.put("timeout", it.next());             } else if (opt.equals("-r")) {                 options.put("readonly", "true");             }         } catch (NoSuchElementException e) {             System.err.println("Error: no argument found for option "                     + opt);             return false;         }          if (!opt.startsWith("-")) {             command = opt;             cmdArgs = new ArrayList<String>();             cmdArgs.add(command);             while (it.hasNext()) {                 cmdArgs.add(it.next());             }             return true;         }     }     return true; }

创建ZooKeeper客户端的对象

接着看如果连接客户端, connectToZK(String newHost) 同样是本类方法,源码如下:

// todo 来到这里 protected void connectToZK(String newHost) throws InterruptedException, IOException {     if (zk != null && zk.getState().isAlive()) {         zk.close();     }     //todo  命令行中的server 后面跟着 host主机地址     host = newHost;     boolean readOnly = cl.getOption("readonly") != null;     // todo 创建zookeeper的实例     zk = new ZooKeeper(host,                         Integer.parseInt(cl.getOption("timeout")),                         new MyWatcher(), readOnly); }

到这里算是个小高潮吧,毕竟看到了zookeeper client的封装类ZooKeeper, 这个类上的注解大概是这么介绍这个类的

  • 它是个Zookeeper 客户端的封装类, 它的第一个参数是 host:port,host:port,host:port这种格式的字符串,逗号左右是不同的服务端的地址
  • 会异步的创建session,通常这个session在构造函数执行完之间就已经创建完成了
  • watcher 是监听者,它被通知的时刻不确定,可能是构造方法执行完成前,也可能在这之后
  • 只要没有连接成功, zookeeper客户端,会一直尝试从提供的服务地址串中选择出一个尝试链接

跟进ZooKeeper的构造方法

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws IOException{     LOG.info("Initiating client connection, connectString=" + connectString             + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);      watchManager.defaultWatcher = watcher;      // todo 包装服务端的地址     ConnectStringParser connectStringParser = new ConnectStringParser(             connectString);     //todo 将服务端的地址封装进 StaticHostProvider -> HostProvider中     HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());      // todo 创建客户端的上下文, 这个上下文对象的亮点就是它维护了一个客户端的socket     cnxn = new ClientCnxn(connectStringParser.getChrootPath(),             hostProvider, sessionTimeout, this, watchManager,             // todo 跟进这个方法,getClientCnxnSocket, 获取出客户端上下文中的socket             getClientCnxnSocket(), canBeReadOnly);     // todo 启动客户端     cnxn.start(); }

主要做了这么几件事

  • 将服务端的地址解析封装进了StaticHostProvider类中, 可以把这个类理解成专门存放服务端地址的set 集合
  • 创建出了客户端的上下文对象: ClientCnxn, 当然在这之前,入参位置还有一个getClientCnxnSocket()这个函数可以创建出客户端的NIO Socket
  • 然后调用cnxn.start() 其实就是启动了客户端的另外两条线程sendThreadeventThread 下面会详细说

    创建客户端的 NioSocket

继续跟进源码getClientCnxnSocket()通过反射,zk客户端使用的socket对象是ClientCnxnSocketNIO

 //todo 通过反射创建出客户端上下文中的 socket , 实际的ClientCnxnSocketNIO 是 ClientCnxnSocket的子类     // todo --->  zookeeper 封装的 NIO的逻辑都在   实际的ClientCnxnSocketNIO     private static ClientCnxnSocket getClientCnxnSocket() throws IOException {         // todo zookeeper.clientCnxnSocket         String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);          if (clientCnxnSocketName == null) {             // todo 上面String其实就是这个类的name, 根进去看一下它的属性             // todo 这个类维护了NioSocket使用到的 selector 选择器 , 已经发生的感兴趣的事件SelectionKey             clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();         }          try {             // todo 可以看到客户端使用的 NioSocket             return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor()                     .newInstance();         } catch (Exception e) {             IOException ioe = new IOException("Couldn't instantiate "                     + clientCnxnSocketName);             ioe.initCause(e);             throw ioe;         }     }

创建 ClientCnxn客户端的上下文

创建上下文,构造函数中的诸多属性都是在前面读取配置文件或是新添加进来的,重点是最后两行,它创建了两条线程类,和zk客户端的IO息息相关

 public   ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,             ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,             long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {         this.zooKeeper = zooKeeper;         this.watcher = watcher;         this.sessionId = sessionId; // todo 刚才传递过来的值为0         this.sessionPasswd = sessionPasswd;         this.sessionTimeout = sessionTimeout;         this.hostProvider = hostProvider;         this.chrootPath = chrootPath;          connectTimeout = sessionTimeout / hostProvider.size();         // todo 添加read的超时时间         readTimeout = sessionTimeout * 2 / 3;         readOnly = canBeReadOnly;                  // todo  创建了一个seadThread 线程         sendThread = new SendThread(clientCnxnSocket);         eventThread = new EventThread();     }

创建SendThread

sendThred是一个客户端的线程类,什么时候开启? 其实就在上面,当创建了ClientCnxn后,调用的cnxn.start()就是在开启它的run() , 它有什么作用? 它的run()是一个无限循环,除非运到了close的条件,否则他就会一直循环下去, 比如向服务端发送心跳,或者向服务端发送我们在控制台输入的数据以及接受服务端发送过来的响应

这是他的构造方法,可以看到它还是一个守护线程,并拥有客户端socket的引用,有了NIO Socket相关技能

//todo SendThread(ClientCnxnSocket clientCnxnSocket) {     super(makeThreadName("-SendThread()"));     // todo 设置状态 Connecting     state = States.CONNECTING;     // todo 就是在 Zookeeper new ClientCnxn 时, 在倒数第二个位置使传递进去一个函数实际的     this.clientCnxnSocket = clientCnxnSocket;     // todo 设置成守护线程     setDaemon(true); }

它的Run方法, 真的是好长啊, 比我上面写的部分内容还长(大概两百行了), 大概它的流程 ,每次循环:

  • 检查一下客户端的socket有没有和服务端的socket建立连接
    • 没有建立连接
      • 尝试选出其他的server地址进行连接
      • 如果满足close的条件,直接break 跳出整个while循环
    • 如果已经建立了连接
      • 计算 to = 读取的超时时间 - 服务端的响应时间
    • 未连接的状态
      • 计算 to = 连接超时时间 - 服务端的响应时间
    • 上面的两个to, 如果小于0, 说明客户端和服务端通信出现了异常, 很可能是server的session time out,于是抛出异常
    • 如果连接状态是健康的,向服务端发送心跳
    • clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);向服务端发送数据

在这个负责和服务端进行IO操作的线程中,只要不是close或其他重大错误,一般可以预知的异常都有try起来,然后记录日志,并没有其他操作,循环还是会进行

// todo introduce 介绍     clientCnxnSocket.introduce(this,sessionId); // todo this,sessionId == 0     clientCnxnSocket.updateNow();     clientCnxnSocket.updateLastSendAndHeard();     int to;     long lastPingRwServer = Time.currentElapsedTime();     final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds     InetSocketAddress serverAddress = null;     // todo 这个while循环中存在建立连接的过程, 已经连接建立失败后不断重试的过程     //todo  state.isAlive() 默认是 NOT_CONNECTED     while (state.isAlive()) {         try {   //todo 1111  如果socket还没有连接 /////////////////////////////////////////////////////////////////////////////////////////////////////////              //todo  如果socket还没有连接             if (!clientCnxnSocket.isConnected()) {                 // todo 判断是不是第一次连接, 如果不是第一次进入下面try代码块, 随机产生一个小于一秒的时间                 if(!isFirstConnect){                     try {                         Thread.sleep(r.nextInt(1000));                     } catch (InterruptedException e) {                         LOG.warn("Unexpected exception", e);                     }                 }                 // don't re-establish connection if we are closing                 // todo 如果是closing 或者 已经关闭了, 直接退出这个循环                 if (closing || !state.isAlive()) {                     break;                 }                 if (rwServerAddress != null) {                     serverAddress = rwServerAddress;                     rwServerAddress = null;                 } else {                     // todo 连接失败时,来这里重试连接                     // todo 从我们传递进来的host地址中选择一个地址                     serverAddress = hostProvider.next(1000);                 }                  // todo client和server进行socket连接                 // todo  跟进去 ,实现逻辑在上面                 // todo  这个方法开始建立连接,并将 isFasterConnect改成了 false                 startConnect(serverAddress);                 clientCnxnSocket.updateLastSendAndHeard();             }   //todo  2222 如果socket处于连接状态 /////////////////////////////////////////////////////////////////////////////////////////////////////////              // todo 下面的连接状态             if (state.isConnected()) {                 // determine whether we need to send an AuthFailed event.                 if (zooKeeperSaslClient != null) {                     boolean sendAuthEvent = false;                     if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {                         try {                             zooKeeperSaslClient.initialize(ClientCnxn.this);                         } catch (SaslException e) {                            LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);                             state = States.AUTH_FAILED;                             sendAuthEvent = true;                         }                     }                     KeeperState authState = zooKeeperSaslClient.getKeeperState();                     if (authState != null) {                         if (authState == KeeperState.AuthFailed) {                             // An authentication error occurred during authentication with the Zookeeper Server.                             state = States.AUTH_FAILED;                             sendAuthEvent = true;                         } else {                             if (authState == KeeperState.SaslAuthenticated) {                                 sendAuthEvent = true;                             }                         }                     }                      if (sendAuthEvent == true) {                         eventThread.queueEvent(new WatchedEvent(                               Watcher.Event.EventType.None,                               authState,null));                     }                 }                 // todo  连接成功的话执行to 为下面值                 // todo  to = 读取的超时时间 -  上一次的读取时间                 // todo 如果预订的超时时间 - 上次读的时间 <= 0 说明超时了                 to = readTimeout - clientCnxnSocket.getIdleRecv();             } else {                 // todo 如果没有连接成功, 就会来到这里, 给 to 赋值                 to = connectTimeout - clientCnxnSocket.getIdleRecv();             }   //todo  3333 异常处理 /////////////////////////////////////////////////////////////////////////////////////////////////////////               // todo 下面抛出来了异常             if (to <= 0) {                 String warnInfo;                 warnInfo = "Client session timed out, have not heard from server in "                     + clientCnxnSocket.getIdleRecv()                     + "ms"                     + " for sessionid 0x"                     + Long.toHexString(sessionId);                 LOG.warn(warnInfo);                 // todo 这里抛出来了异常, 下面的try 就会把它抓住                 throw new SessionTimeoutException(warnInfo);             }  //todo  44444 连接成功执行的逻辑 /////////////////////////////////////////////////////////////////////////////////////////////////////////               // todo 下面的是连接成功执行的逻辑             if (state.isConnected()) {                 // todo  为了防止竞争状态丢失发送第二个ping, 同时也避免出现很多的ping                 //1000(1 second) is to prevent(阻止) race condition missing to send the second ping                 //also make sure not to send too many pings when readTimeout is small                  int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -                          ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);                 //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL                 if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {                     // todo 客户端一直在这里循环, 如果连接成功的话, 每次循环都来到这个逻辑这里发送 ping                     sendPing();                     clientCnxnSocket.updateLastSend();                 } else {                     if (timeToNextPing < to) {                         to = timeToNextPing;                     }                 }             }  //todo 55555 /////////////////////////////////////////////////////////////////////////////////////////////////////////              // If we are in read-only mode, seek for read/write server             // todo 只读状态 相关逻辑             if (state == States.CONNECTEDREADONLY) {                 long now = Time.currentElapsedTime();                 int idlePingRwServer = (int) (now - lastPingRwServer);                 if (idlePingRwServer >= pingRwTimeout) {                     lastPingRwServer = now;                     idlePingRwServer = 0;                     pingRwTimeout =                         Math.min(2*pingRwTimeout, maxPingRwTimeout);                     pingRwServer();                 }                 to = Math.min(to, pingRwTimeout - idlePingRwServer);             }  //todo  66666 /////////////////////////////////////////////////////////////////////////////////////////////////////////               // todo 消费outgoingqueue, 完成向服务端的发送发送             // todo doTransport 是 ClientCnxnSocket 的抽象方法, 实现类clientCnxnSocketNio             clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);         } catch (Throwable e) {             // todo 在这个try中处理里面的抛出来的异常             if (closing) {                 // todo 如果是请求关闭, 直接退出 break 出while循环                 if (LOG.isDebugEnabled()) {                     // closing so this is expected                     LOG.debug("An exception was thrown while closing send thread for session 0x"                             + Long.toHexString(getSessionId())                             + " : " + e.getMessage());                 }                 break;             } else {                 // todo 只要不是退出异常, 下面的异常都是仅仅打印了一下出现了什么异常                 // this is ugly, you have a better way speak up                 if (e instanceof SessionExpiredException) {                     LOG.info(e.getMessage() + ", closing socket connection");                 } else if (e instanceof SessionTimeoutException) {                     LOG.info(e.getMessage() + RETRY_CONN_MSG);                 } else if (e instanceof EndOfStreamException) {                     LOG.info(e.getMessage() + RETRY_CONN_MSG);                 } else if (e instanceof RWServerFoundException) {                     LOG.info(e.getMessage());                 } else if (e instanceof SocketException) {                     LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());                 } else {                     LOG.warn("Session 0x{} for server {}, unexpected error{}",                                     Long.toHexString(getSessionId()),                                     serverAddress,                                     RETRY_CONN_MSG,                                     e);                 }                 // todo 这个方法中, isFirstConnect = true                 cleanup();                 if (state.isAlive()) {                     eventThread.queueEvent(new WatchedEvent(                             Event.EventType.None,                             Event.KeeperState.Disconnected,                             null));                 }                 clientCnxnSocket.updateNow();                 clientCnxnSocket.updateLastSendAndHeard();             }         }     } // todo while循环的结束符号 , 这是个while循环, 除了上面的close其他异常都会继续循环, 接着上去再看一遍      cleanup();     clientCnxnSocket.close();     if (state.isAlive()) {         eventThread.queueEvent(new WatchedEvent(Event.EventType.None,                 Event.KeeperState.Disconnected, null));     }     ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),             "SendThread exited loop for session: 0x"                    + Long.toHexString(getSessionId())); } 

在上面这个200行的Run方法中比较值得注意的几个方法如下

  • 如果做到下次选出一个非当前server的地址

针对下标运行,对数组的size取模, 再赋值给自己,所以就实现了从0 - array.size()的循环

,【巨型】【十万】【更加】【说不】,【剔除】【塔狂】【有一】.【毒药】【劈去】【就完】【桥右】,【点像】【水声】【险鲲】黑帽seo研究【十几】,【狐那】【都掩】【用到】【思想】.【来短】!【若无】【是一】【君之】【全部】【升起】【就会】【姐听】【嗯我】【必然】【身金】【得更】【声惊】【佛土】【应的】【一会】【响之】【而说】【量波】【得泰】【死有】【原了】【口中】【不高】【没有】【不是】【如出】【衣袍】【巨大】【那火】【停顿】【虽然】【难度】【通天】【后多】【敏锐】【出现】,
  public InetSocketAddress next(long spinDelay) {         currentIndex = ++currentIndex % serverAddresses.size();         if (currentIndex == lastIndex && spinDelay > 0) {             try {                 Thread.sleep(spinDelay);             } catch (InterruptedException e) {                 LOG.warn("Unexpected exception", e);             }         } else if (lastIndex == -1) {             // We don't want to sleep on the first ever connect attempt.             lastIndex = 0;         }
  • 如果检查到了没有连接的话,就是用clientCnxnSocket进行连接

这个函数中,将标记是否是第一次连接的标记置为了flase, 并且拿到了sessionid

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }0

SendThread 和 服务端的IO沟通

跟进上面Run方法的如下方法,doTranprot

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }1

他是本类的抽象方法,具体的实现类是clientCnxnSocketNIO

跟进这个方法,其中有一步跟重要doIO(pendingQueue, outgoingQueue, cnxn);

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }2
  • DoIo的源码如下

它分成了两大模块

  • 读就绪, 读取服务端发送过来的数据
  • 写就绪, 往客户端发送用户在控制台输入的命令
 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }3

思考:

虽然找到了客户端往服务端发送数据的代码, 但是问题来了, 它发送的什么数据啊? 在上面可以看到,它每次发送的数据都被包装车成了packet类型,并且,继续往下跟进可以看到这个packet来自于一个叫outgoingqueue的队列中

client想往服务端发送什么?其实发送就是我们手动输入的命令,只不过他把我们的命令解析出来并且进行了封装,进行了哪些封装? String-> request -> packet -> socket ,这个packet就在上面的部分被消费

到目前为止,算上一开始的主线程,其实已经有3条线程了, 分别是主线程,SendThread和eventThread

代码读到这里,sendThread部分其实已经结束了,我们直到了它正在消费outgoingqueue中的内容,接下来的任务返回回去,从新回到 ZooKeeperMain中,看一开始主线程时如何处理用户在命令行的输入的

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }4

跟进 main.run(), 主要做了如下几件事

  • 通过反射创建出可以获取控制台输入的对象jline.ConsoleReader
  • 通过反射创建出可以解析键盘录入的对象
  • 开启while循环,等待用户的输入,处理用户的输入executeLine(line);
 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }5

继续跟进 executeLine(line);,做了如下几件事

  • 处理用户输入
  • 将命令添加到历史命令
  • 处理命令
  • 命令数+1
 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }6

处理命令的逻辑如下:

将命令解析出来,通过if分支语句,判断用户输入的什么命令, 然后再进一步处理

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }7

比如,用户输入的是创建新节点的命令create /path, 就会有下面的函数处理

 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }8

跟进这个方法 , 主要做了下面几件事

  • 校验合法性
  • 封装进 request
  • 添加acl
  • 提交submitRequest(),他是个重要的阻塞方法,每次执行都会阻塞等待服务端的响应
  • 等待响应结果
 public ZooKeeperMain(String args[]) throws IOException, InterruptedException {     cl.parseOptions(args);     // todo 连接到客户端     connectToZK(cl.getOption("server"));     }9

客户端的阻塞式等待 -- 自旋锁

跟进submitRequest()

  public boolean parseOptions(String[] args) {     List<String> argList = Arrays.asList(args);     Iterator<String> it = argList.iterator();      while (it.hasNext()) {         String opt = it.next();         try {             if (opt.equals("-server")) {                 options.put("server", it.next());             } else if (opt.equals("-timeout")) {                 options.put("timeout", it.next());             } else if (opt.equals("-r")) {                 options.put("readonly", "true");             }         } catch (NoSuchElementException e) {             System.err.println("Error: no argument found for option "                     + opt);             return false;         }          if (!opt.startsWith("-")) {             command = opt;             cmdArgs = new ArrayList<String>();             cmdArgs.add(command);             while (it.hasNext()) {                 cmdArgs.add(it.next());             }             return true;         }     }     return true; }0

在上面的代码中,可以看到可以他是使用一个while(!packet,finishes){} 来阻塞程序的, 刚看看到用户的命令被封装进了request, 接下来, 在queuePacket(h, r, request, response, null, null, null, null, watchRegistration);中,可以看到他被封装进packet,然后添加到outgoingqueue队列中,源码如下

  public boolean parseOptions(String[] args) {     List<String> argList = Arrays.asList(args);     Iterator<String> it = argList.iterator();      while (it.hasNext()) {         String opt = it.next();         try {             if (opt.equals("-server")) {                 options.put("server", it.next());             } else if (opt.equals("-timeout")) {                 options.put("timeout", it.next());             } else if (opt.equals("-r")) {                 options.put("readonly", "true");             }         } catch (NoSuchElementException e) {             System.err.println("Error: no argument found for option "                     + opt);             return false;         }          if (!opt.startsWith("-")) {             command = opt;             cmdArgs = new ArrayList<String>();             cmdArgs.add(command);             while (it.hasNext()) {                 cmdArgs.add(it.next());             }             return true;         }     }     return true; }1

在这个方法的最后一行,点睛,selector.wakeup(); 就是通知选择器,别再阻塞select了,赶紧去做其他工作

因为选择器在sendThread的doTransport()方法中,有阻塞的操作,我重新把代码贴出来如下

服务端的NIOSocket -> ClientCnxnSocket 都是ClientCnxn上下文的封装类的,SendThread同样也是,它可以使用

现在再看,唤醒selector 让他去做其他事 ,其实即使doIO(),这个方法代码其实我在上面贴出来过,就是分成两大部分,读就绪与写就绪

  public boolean parseOptions(String[] args) {     List<String> argList = Arrays.asList(args);     Iterator<String> it = argList.iterator();      while (it.hasNext()) {         String opt = it.next();         try {             if (opt.equals("-server")) {                 options.put("server", it.next());             } else if (opt.equals("-timeout")) {                 options.put("timeout", it.next());             } else if (opt.equals("-r")) {                 options.put("readonly", "true");             }         } catch (NoSuchElementException e) {             System.err.println("Error: no argument found for option "                     + opt);             return false;         }          if (!opt.startsWith("-")) {             command = opt;             cmdArgs = new ArrayList<String>();             cmdArgs.add(command);             while (it.hasNext()) {                 cmdArgs.add(it.next());             }             return true;         }     }     return true; }2

写到这里其实已经把整个过程顺下来了,下面再重新看看,sendThread是如果消费packet并且修改然后得到服务端的响应,修改pakcet.finished属性的, 因为现在主线的submitRequest还在阻塞呢

往服务端写

客户端的socket的实现类是ClientCnxnSocketNio, 它往服务端写的逻辑如下, 不难看出使用的java原生的sock.write(p.bb); // 发送服务端 , 亮点是后面的操作pendingQueue.add(p);被写过的packet被添加到了pengingqueue中

  public boolean parseOptions(String[] args) {     List<String> argList = Arrays.asList(args);     Iterator<String> it = argList.iterator();      while (it.hasNext()) {         String opt = it.next();         try {             if (opt.equals("-server")) {                 options.put("server", it.next());             } else if (opt.equals("-timeout")) {                 options.put("timeout", it.next());             } else if (opt.equals("-r")) {                 options.put("readonly", "true");             }         } catch (NoSuchElementException e) {             System.err.println("Error: no argument found for option "                     + opt);             return false;         }          if (!opt.startsWith("-")) {             command = opt;             cmdArgs = new ArrayList<String>();             cmdArgs.add(command);             while (it.hasNext()) {                 cmdArgs.add(it.next());             }             return true;         }     }     return true; }3

上面说了, 为啥被使用过的pakcet还要保留一份呢? 还是那个原因,主线程还因为pakcet的finish状态未被该变而阻塞呢, 那什么时候改变呢? 答案是受到服务端的响应之后改变,在哪里收到呢? 就是DoIo()的读就绪模块,下面附上源码,它的解析我写在这段代码下面

从服务端读

  public boolean parseOptions(String[] args) {     List<String> argList = Arrays.asList(args);     Iterator<String> it = argList.iterator();      while (it.hasNext()) {         String opt = it.next();         try {             if (opt.equals("-server")) {                 options.put("server", it.next());             } else if (opt.equals("-timeout")) {                 options.put("timeout", it.next());             } else if (opt.equals("-r")) {                 options.put("readonly", "true");             }         } catch (NoSuchElementException e) {             System.err.println("Error: no argument found for option "                     + opt);             return false;         }          if (!opt.startsWith("-")) {             command = opt;             cmdArgs = new ArrayList<String>();             cmdArgs.add(command);             while (it.hasNext()) {                 cmdArgs.add(it.next());             }             return true;         }     }     return true; }4

如上代码的最后部分,sendThread.readResponse(incomingBuffer); 下面是它的源码,它首先是从buffer中读取出服务端的发送的数据,然后一通解析,封装进pendingqueue的packet中,并且在方法的最后部分终于完成了状态的修改

  public boolean parseOptions(String[] args) {     List<String> argList = Arrays.asList(args);     Iterator<String> it = argList.iterator();      while (it.hasNext()) {         String opt = it.next();         try {             if (opt.equals("-server")) {                 options.put("server", it.next());             } else if (opt.equals("-timeout")) {                 options.put("timeout", it.next());             } else if (opt.equals("-r")) {                 options.put("readonly", "true");             }         } catch (NoSuchElementException e) {             System.err.println("Error: no argument found for option "                     + opt);             return false;         }          if (!opt.startsWith("-")) {             command = opt;             cmdArgs = new ArrayList<String>();             cmdArgs.add(command);             while (it.hasNext()) {                 cmdArgs.add(it.next());             }             return true;         }     }     return true; }5

解开客户端的阻塞状态

进入finishPacket(packet)

  public boolean parseOptions(String[] args) {     List<String> argList = Arrays.asList(args);     Iterator<String> it = argList.iterator();      while (it.hasNext()) {         String opt = it.next();         try {             if (opt.equals("-server")) {                 options.put("server", it.next());             } else if (opt.equals("-timeout")) {                 options.put("timeout", it.next());             } else if (opt.equals("-r")) {                 options.put("readonly", "true");             }         } catch (NoSuchElementException e) {             System.err.println("Error: no argument found for option "                     + opt);             return false;         }          if (!opt.startsWith("-")) {             command = opt;             cmdArgs = new ArrayList<String>();             cmdArgs.add(command);             while (it.hasNext()) {                 cmdArgs.add(it.next());             }             return true;         }     }     return true; }6
。转载请注明来源地址:黑帽SEO http://www.heimao.wiki 专注于SEO培训,快速排名

黑帽WiKi_黑帽百科(www.heimao.wiki),8年黑帽SEO优化技术,黑帽seo快速排名,黑帽SEO技术培训学习,黑帽SEO快速排名程序、泛目录寄生虫技术,赠送免费黑帽SEO视频教程

(黑帽seo技术,网站快速排名,蜘蛛池加速收录,目录程序定制)

扫一下添加微信: