此文已由作者张镐薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。

3. 连接模块

如之前所述,MyCat的连接分为前端和后端,下面是连接基本相关类图:

3.1 ClosableConnection:

public interface ClosableConnection {    String getCharset();    //关闭连接
    void close(String reason);    boolean isClosed();    public void idleCheck();    long getStartupTime();    String getHost();    int getPort();    int getLocalPort();    long getNetInBytes();    long getNetOutBytes();
}

根据字面意思,一个可以关闭的连接需要实现关闭方法-_-,并且需要原因判断是否是正常关闭。MySQL的通信都需要指定字符集。MyCat服务器建立ServerSocket时输入的端口为服务器在其上面监听客户的连接,当有客户连接时,在随机选择一个没用的端口与客户端通信;建立客户socket时输入的为服务端的监听端口,在本地选择一个未用端口与服务器通信,至于服务器怎么知道和客户端的哪个端口通信,和客户端怎么知道和服务端的哪个端口通信(因为这两个端口都是随机生成的),tcp是采用"三次握手"建立连接,而udp则是每次发送信息时将端口号放在ip报文的数据段里面。所以,连接里面需要提供获得监听端口和服务端口的方法。此外,还需要检查连接是否为空闲状态(idle)。最后,需要一些统计数据。

3.2 NIOConnection:

public interface NIOConnection extends ClosableConnection {    //connected
    void register() throws IOException;    //处理数据
    void handle(byte[] data);    // 写出一块缓冲数据
    void write(ByteBuffer buffer); }

所有NIO的通信需要在多路复用选择器上注册channel,这里有个对应的register()方法需要实现。然后,读取和写入数据都需要通过缓冲。缓冲区(Buffer)就是在内存中预留指定大小的存储空间用来对输入/输出(I/O)的数据作临时存储,这部分预留的内存空间就叫做缓冲区,使用缓冲区有这么两个好处:

  1. 减少实际的物理读写次数

  2. 缓冲区在创建时就被分配内存,这块内存区域一直被重用,可以减少动态分配和回收内存的次数 读取到的数据需要经过处理,这里对应的就是handle(byte[])方法。

    3.3 AbstractConnection:

从上面的实体图,我们发现,AbstractConnection其实就是把Java的NetworkChannel进行封装,同时需要依赖其他几个类来完成他所需要的操作,如下:  其中,NIOProcessor是对AbstractConnection实现NIO读写的方法类,NIOHandler是处理AbstractConnection读取的数据的处理方法类,NIOSocketWR是执行以上方法的线程类。

  1. 3.3.1 NIOProcessor:

NIOProcessor的构建方法:

public NIOProcessor(String name, BufferPool bufferPool,
            NameableExecutor executor) throws IOException {        this.name = name;        this.bufferPool = bufferPool;        this.executor = executor;        this.frontends = new ConcurrentHashMap<Long, FrontendConnection>();        this.backends = new ConcurrentHashMap<Long, BackendConnection>();        this.commands = new CommandCount();
    }

调用位置:  MyCatServer.java

...
bufferPool = new BufferPool(processBuferPool, processBufferChunk,
                socketBufferLocalPercent / processorCount);
        businessExecutor = ExecutorUtil.create("BusinessExecutor",
                threadPoolSize);
...for (int i = 0; i < processors.length; i++) {
            processors[i] = new NIOProcessor("Processor" + i, bufferPool,
                    businessExecutor);
        }
...

每个MyCat实例会初始化processors个NIOProcessor,每个NIOProcessor公用同一个bufferPool和businessExecutor。 bufferPool是缓冲池,BufferPool这个类负责缓冲统一管理 businessExecutor如之前所述,是业务线程池。 NIOProcessor被池化,很简单,就是保存到数组中,通过MyCatServer的nextProcessor()方法轮询获取一个NIOProcessor,之后每个AbstractConnection通过setNIOProcessor方法,设置NIOProcessor。

public NIOProcessor nextProcessor() {        int i = ++nextProcessor;        if (i >= processors.length) {
            i = nextProcessor = 0;
        }        return processors[i];
    }

可以看出,每个AbstractConnection依赖于一个NIOProcessor,每个NIOProcessor保存着多个AbstractConnection。AbstractConnection分为FrontendConnection和BackendConnection被分别保存在NIOProcessor的frontends和backends这两个ConcurrentHashMap中。 用ConcurrentHashMap是因为NIOAcceptor和NIOConnector线程以及RW线程池都会访问这两个变量。 NIOProcessor其实主要负责连接资源的管理:  MyCat会定时检查前端和后端空闲连接,并清理和回收资源: MyCatServer.java:

// 处理器定时检查任务
    private TimerTask processorCheck() {        return new TimerTask() {            @Override
            public void run() {
                timerExecutor.execute(new Runnable() {                    @Override
                    public void run() {                        try {                            for (NIOProcessor p : processors) {
                                p.checkBackendCons();
                            }
                        } catch (Exception e) {
                            LOGGER.warn("checkBackendCons caught err:" + e);
                        }                     }
                });
                timerExecutor.execute(new Runnable() {                    @Override
                    public void run() {                        try {                            for (NIOProcessor p : processors) {
                                p.checkFrontCons();
                            }
                        } catch (Exception e) {
                            LOGGER.warn("checkFrontCons caught err:" + e);
                        }
                    }
                });
            }
        };
    }

检查前端连接,回收空闲资源:

    /**
     * 定时执行该方法,回收部分资源。
     */
    public void checkFrontCons() {
        frontendCheck();
    }    private void frontendCheck() {
        Iterator<Entry<Long, FrontendConnection>> it = frontends.entrySet()
                .iterator();        while (it.hasNext()) {
            FrontendConnection c = it.next().getValue();            // 删除空连接
            if (c == null) {
                it.remove();                this.frontendsLength.decrementAndGet();                continue;
            }            // 清理已关闭连接,否则空闲检查。
            if (c.isClosed()) {
                c.cleanup();
                it.remove();                this.frontendsLength.decrementAndGet();
            } else {                // very important ,for some data maybe not sent
                checkConSendQueue(c);
                c.idleCheck();
            }
        }
    }

在关闭前端连接时,会清理连接占用的缓存资源: FrontendConnection.java:

protected void cleanup() {        //回收读缓冲
        if (readBuffer != null) {
            recycle(readBuffer);            this.readBuffer = null;            this.readBufferOffset = 0;
        }        //回收写缓冲
        if (writeBuffer != null) {
            recycle(writeBuffer);            this.writeBuffer = null;
        }        //回收压缩协议栈编码解码队列
        if(!decompressUnfinishedDataQueue.isEmpty())
        {
            decompressUnfinishedDataQueue.clear();
        }        if(!compressUnfinishedDataQueue.isEmpty())
        {
            compressUnfinishedDataQueue.clear();
        }        //回收写队列
        ByteBuffer buffer = null;        while ((buffer = writeQueue.poll()) != null) {
            recycle(buffer);
        }
    }

后端连接检查,除了要清理已关闭的连接,还有要检查SQL执行时间是否超时:

    /**
     * 定时执行该方法,回收部分资源。
     */
    public void checkBackendCons() {
        backendCheck();
    }    // 后端连接检查
    private void backendCheck() {        long sqlTimeout = MycatServer.getInstance().getConfig().getSystem().getSqlExecuteTimeout() * 1000L;
        Iterator<Entry<Long, BackendConnection>> it = backends.entrySet().iterator();        while (it.hasNext()) {
            BackendConnection c = it.next().getValue();            // 删除空连接
            if (c == null) {
                it.remove();                continue;
            }            // SQL执行超时的连接关闭
            if (c.isBorrowed()
                    && c.getLastTime() < TimeUtil.currentTimeMillis()
                            - sqlTimeout) {
                LOGGER.warn("found backend connection SQL timeout ,close it "
                        + c);
                c.close("sql timeout");
            }            // 清理已关闭连接,否则空闲检查。
            if (c.isClosed()) {
                it.remove();             } else {                // very important ,for some data maybe not sent
                if (c instanceof AbstractConnection) {
                    checkConSendQueue((AbstractConnection) c);
                }
                c.idleCheck();
            }
        }
    }

同时,在检查连接是否关闭时,需要检查写队列是否为空。写队列不为空,证明还有请求没有响应。需要将写队列的剩余请求异步写出,通过NIOSocketWR。

    private void checkConSendQueue(AbstractConnection c) {        // very important ,for some data maybe not sent
        if (!c.writeQueue.isEmpty()) {
            c.getSocketWR().doNextWriteCheck();
        }
    }

免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击

相关文章:
【推荐】 类似gitlab代码提交的热力图怎么做?
【推荐】 JAVA虚拟机的类加载机制

最新文章

  1. [Ajax系列]Ajax介绍
  2. iOS黑客技术相关
  3. Redis的复制(Master/Slave)
  4. android中View的GONE和INVISIBLE的原理
  5. 【leetcode❤python】 203. Remove Linked List Elements
  6. linux 命令之系统活动报告sar
  7. 配置 Cocoapods的简单配置及胡思乱想
  8. jQuery之$(document).ready()使用介绍
  9. spring与MyBatis结合
  10. hdoj 4324 Triangle LOVE 【拓扑】
  11. NYoj1058
  12. 关于bootstrap在IE8下不能支持自适应的问题
  13. 迭代(遍历)时候不可以使用集合的remove和add方法,但可使用Java迭代器的remove和add方法
  14. 【底层原理】深入理解Cache (上)
  15. GCC编译器原理(一)------GCC 工具:addr2line、ar、as、c++filt和elfedit
  16. 开源一个CSV解析器(附设计过程 )
  17. C# WinForm窗体控件Panel修改边框颜色以及边框宽度方法
  18. Socket通信的Demo
  19. URL加载jar
  20. MySQL(2)数据库 表的查询操作

热门文章

  1. css多余字符显示省略号
  2. python列表(list)常用方法
  3. 我的Java开发学习之旅------>Java String对象作为参数传递的问题解惑
  4. SAP采购寄售业务操作步骤
  5. Java for LeetCode 081 Search in Rotated Sorted Array II
  6. 小程序开发之xxx is not defined
  7. java和js互调 webview
  8. iOS 开发实践之 Auto Layout
  9. Linux中常用文件传输命令及使用方法
  10. iOS实时监控网络状态的改变