03月15, 2017

线程实践

该部分包含若干多线程编程的实践案例。

模拟数据库线程池简单实现

  • 连接池模拟类:该类包含连接池最基本的连接数初始化、拿取与释放连接方法。主要结构如下:
/**
 * @auther:fyang
 * @date: 2017/3/14
 * @description: 利用线程模仿数据库连接池实现
 */
public class ConectionsPoolSimulator {
     // 连接池
    private LinkedList<Connection> pool = new LinkedList<>();

    // 初始化特定连接数连接池
    public ConectionsPoolSimulator(int initialConSize) {
          ...
     }

    // 连接池获取连接
    public Connection fetchConnection(long mills) throws InterruptedException {
       ...
    }

    // 释放连接回连接池
    public void releaseConnection(Connection connection){
        ...
    }
}

连接池构造: 根据传入参数循环调用ConnectinDriver.createConnection()初始化相应数量连接。

public ConectionsPoolSimulator(int initialConSize) {
    if(initialConSize > 0){
        for (int i = 0; i < initialConSize; i++) {
            this.pool.addLast(ConnectinDriver.createConnection());
        }
    }
}

获取连接: 在给定超时时间内返回连接,失败或超时否则返回null。

   public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool){
            // 获取动作超时
            if(mills <= 0){
                while(pool.isEmpty()){
                    pool.wait();
                }
                return pool.removeFirst();
            }else{
                // 超时时间点
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while(pool.isEmpty() && remaining > 0){
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if(!pool.isEmpty()){
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }

利用循环判断连接池状态及剩余时间控制对象锁pool状态。

释放连接: 将使用完毕的连接归还给连接池集合,并通知唤醒所有等待线程获取连接。

public void releaseConnection(Connection connection){
    if(null != connection){
        synchronized (pool){
            pool.addLast(connection);
            // 当连接归还连接池后需要通知所有fetchConnection上等待队列的线程
            pool.notifyAll();
        }
    }
}

  • 连接驱动类:该类是java.sql.Connection的模拟实现,因为JDK将数据库连接设置成为接口,具体实现交由数据库各自实现。
public class ConnectinDriver {
    static class ConnectionHandler implements InvocationHandler {
        @Override
        public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
            if ("commit".equals(method.getName())) {
                // 模拟提交时睡眠1s
                TimeUnit.MILLISECONDS.sleep(100);
            }
            return null;
}
    }

    public static Connection createConnection() {
        return (Connection) Proxy.newProxyInstance(ConnectinDriver.class.getClassLoader(), new Class<?>[] { Connection.class}, new ConnectionHandler());
    }
}

用动态代理模拟实现提交方法线程睡眠100ms。


  • 测试类
public class ConnectionsPoolTest {

    // 初始化连接池
    static ConectionsPoolSimulator pool = new ConectionsPoolSimulator(10);
    // CountDownLatch控制线程开始时间
    static CountDownLatch startLatch = new CountDownLatch(1);
    static CountDownLatch endLatch;

    // 模拟获取线程的客户端
    static class ConnectionRunner implements Runnable {
      ...
    }
        public static void main(String[] args) throws InterruptedException {
         // 开启线程数
        int threadCount = 50;
        endLatch = new CountDownLatch(threadCount);
        // 每个线程请求连接数
        int count = 20;
        AtomicInteger gotCount = new AtomicInteger();
        AtomicInteger missCount = new AtomicInteger();

        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new ConnectionRunner(count, gotCount, missCount), "ConnectionRunnerThread");
            thread.start();
        }

        startLatch.countDown();
        endLatch.await();
        System.out.println(" total invoke: " + (threadCount * count));
        System.out.println(" got connection: " + gotCount);
        System.out.println(" miss connection " + missCount);

    }

}
  • 利用线程模仿客户端获取连接,并记录获取成功连接数与失败连接数: // 模拟获取线程的客户端
static class ConnectionRunner implements Runnable {
    // 从连接池获取连接目标数量
    private int count;
    // 获取连接成功数量
    AtomicInteger gotCount;
    // 获取连接失败数量
    AtomicInteger missCount;

    public ConnectionRunner(int count, AtomicInteger gotCount, AtomicInteger missCount) {
        this.count = count;
        this.gotCount = gotCount;
        this.missCount = missCount;
    }

    @Override
    public void run() {

        try {
            startLatch.await();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }

        while (count > 0) {
            try {
                Connection connection = pool.fetchConnection(1000);
                if (null != connection) {
                    try {
                        connection.createStatement();
                        connection.commit();
                    } finally {
                        pool.releaseConnection(connection);
                        gotCount.incrementAndGet();
                    }
                } else {
                    missCount.incrementAndGet();
                }
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                count--;
            }
        }
        endLatch.countDown();
    }
}

启动线程:50
单线程连接数:20 结果: total invoke: 1000 got connection: 817 miss connection 183


连接池实现中利用等待/通知机制完成对连接获取/释放的合理分配,并配合超时检验完成超时自动返回而不会让程序因阻塞卡死在某一操作上,也是程序自我保护的机制之一。

本文链接:https://check321.net/post/thread_in_action.html

-- EOF --

Comments

请在后台配置评论类型和相关的值。