在数据持久层框架中,【数据源】是一个非常重要的组件,它的性能直接关系到整个数据持久层的性能。
在Java中,数据源被定义为接口DataSource
,源码如下:
public interface DataSource extends CommonDataSource, Wrapper {// 获取数据库连接Connection getConnection() throws SQLException;// 给定用户名,密码获取连接Connection getConnection(String username, String password) throws SQLException;
}
DataSource职责非常单一,就是从数据源获取数据库连接Connection。有了Connection我们才能执行SQL,获得结果集。
MyBatis提供了两类数据源:UnpooledDataSource和PooledDataSource,同时它还支持与第三方数据源做集成,常用的有:阿里的Druid、C3P0、SpringBoot默认的Hikari等等。本篇文章不讨论第三方数据源,仅分析MyBatis内置的数据源。
顾名思义,UnpooledDataSource是未池化的数据源,当你调用getConnection()
时,它每次都会打开一个新的连接,使用完毕后再调用close()
方法销毁连接。数据库连接是非常宝贵的资源,首先,新连接的创建和销毁是非常耗时的,这在一定程度上会降低服务性能。其次,数据库支持的连接数有限,当突发流量较大时,创建大量的连接会导致数据库僵死。因此,线上环境几乎不会使用未池化的数据源,而是数据库连接池。
PooledDataSource是MyBatis提供的使用池化技术的数据源,它事先会创建一批连接静静的躺在池子里等待被调用,SQL执行完毕后调用close()
不会关闭连接,而是归还到连接池,等待下次被调用。这样就避免了连接的频繁创建和关闭,连接数也变得可控,不会导致数据库僵死。
UnpooledDataSource源码非常简单,如果你还记得原生JDBC操作数据库,那你一眼就能看懂它的源码。
先看属性:
public class UnpooledDataSource implements DataSource {// 驱动类加载器private ClassLoader driverClassLoader;// 驱动属性private Properties driverProperties;// 注册的驱动private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();// 数据库驱动private String driver;// 数据库连接private String url;// 用户名private String username;// 密码private String password;// 是否自动提交private Boolean autoCommit;// 默认的事务隔离级别private Integer defaultTransactionIsolationLevel;// 默认超时时间private Integer defaultNetworkTimeout;
}
使用JDBC获取数据库连接,首先就是加载数据库驱动Driver,根据url、用户名、密码获取连接,因此这些属性都很好理解。调用getConnection()
方法它每次都会打开新的连接,因此重点看doGetConnection()
。
// 根据用户名密码获取数据库连接
private Connection doGetConnection(String username, String password) throws SQLException {Properties props = new Properties();if (driverProperties != null) {// 设置驱动属性props.putAll(driverProperties);}// 设置用户名,密码if (username != null) {props.setProperty("user", username);}if (password != null) {props.setProperty("password", password);}// 获取连接return doGetConnection(props);
}
知道数据库驱动、数据库链接、用户名、密码,接下来就是JDBC原生API获取数据库连接了。
private Connection doGetConnection(Properties properties) throws SQLException {// 加载驱动initializeDriver();// 根据url和用户名密码获取连接Connection connection = Connection(url, properties);// 设置:超时、自动提交、事务隔离级别configureConnection(connection);return connection;
}
这就是UnpooledDataSource的源码,只要你还记得JDBC,就很容易理解。
一般都会使用数据库连接池,PooledDataSource也是MyBatis默认的数据源,因此我们重点分析。
PooledDataSource使用了【装饰者模式】,它本身不会去创建新连接,只负责维护连接池。一旦需要创建新的连接,它会委托给UnpooledDataSource执行。
先看属性:
public class PooledDataSource implements DataSource {// 连接池状态private final PoolState state = new PoolState(this);// 创建新连接的任务委托给UnpooledDataSourceprivate final UnpooledDataSource dataSource;// 最大活跃连接数protected int poolMaximumActiveConnections = 10;// 最大空闲连接数protected int poolMaximumIdleConnections = 5;// 最大可回收时间,超过该时间会强制回收连接。protected int poolMaximumCheckoutTime = 20000;// 无连接可用时的等待时间protected int poolTimeToWait = 20000;// 获取到失效连接的重试次数protected int poolMaximumLocalBadConnectionTolerance = 3;// 检测连接是否有效时执行的SQLprotected String poolPingQuery = "NO PING QUERY SET";// 是否开启连接检测,如果开启必须配置poolPingQuery。默认值:falseprotected boolean poolPingEnabled;// 连接检测的频率protected int poolPingConnectionsNotUsedFor;/*** 缓存的连接标识,为(url+username+password)的哈希码。* 避免将其他Connection Push到连接池。*/private int expectedConnectionTypeCode;
}
wait()
方法等待。获取连接,它的代码和UnpooledDataSource差异很大,如下:
@Override
public Connection getConnection(String username, String password) throws SQLException {// 从连接池弹出一个连接,并返回其代理对象return popConnection(username, password).getProxyConnection();
}
不再是简单的创建新连接,而是尝试从【连接池】中pop出一个连接。这个连接也许是新创建的,也可能是复用的旧连接。
还有一点需要注意,popConnection()
已经获取到连接了,为什么不直接返回,而是调用getProxyConnection()
返回代理连接对象呢?这里先卖个关子,后面会说到。
从连接池中获取连接的源码如下:
// 从连接池中弹出一个可用的连接
private PooledConnection popConnection(String username, String password) throws SQLException {// 是否发生了等待boolean countedWait = false;PooledConnection conn = null;long t = System.currentTimeMillis();int localBadConnectionCount = 0;// 循环重试,直到获取连接while (conn == null) {synchronized (state) {if (!state.idleConnections.isEmpty()) {// 存在空闲连接// 取出列表中表头连接conn = ve(0);if (log.isDebugEnabled()) {log.debug("Checked out connection " + RealHashCode() + " from pool.");}} else {// 当前没有空闲的可用连接if (state.activeConnections.size() < poolMaximumActiveConnections) {// 当前活跃连接数量未达到最大值,创建新的连接conn = new Connection(), this);if (log.isDebugEnabled()) {log.debug("Created connection " + RealHashCode() + ".");}} else { // 已达到最大活跃连接数,不能创建新的连接了。// 获取最老的一个连接,判断是否可回收。PooledConnection oldestActiveConnection = (0);long longestCheckoutTime = CheckoutTime();if (longestCheckoutTime > poolMaximumCheckoutTime) {// 连接可回收的情况// 统计过期连接数据state.claimedOverdueConnectionCount++;state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;state.accumulatedCheckoutTime += longestCheckoutTime;// 回收的连接从活跃列表中移除,如果不是自动提交事务,则帮其回滚事务。ve(oldestActiveConnection);if (!RealConnection().getAutoCommit()) {try {RealConnection().rollback();} catch (SQLException e) {log.debug("Bad connection. Could not roll back");}}// 创建一个新的PooledConnectionconn = new RealConnection(), this);conn.CreatedTimestamp());conn.LastUsedTimestamp());// 原连接置为失效oldestActiveConnection.invalidate();if (log.isDebugEnabled()) {log.debug("Claimed overdue connection " + RealHashCode() + ".");}} else {// 达到最大活跃连接数,且没有连接可以回收,则必须等待。try {if (!countedWait) {// 统计等待次数state.hadToWaitCount++;countedWait = true;}if (log.isDebugEnabled()) {log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");}long wt = System.currentTimeMillis();// 调用wait()方法等待其他连接的释放state.wait(poolTimeToWait);state.accumulatedWaitTime += System.currentTimeMillis() - wt;} catch (InterruptedException e) {break;}}}}if (conn != null) {// 回收的连接可能已失效,这里做有效性检测// ping to server and check the connection is valid or notif (conn.isValid()) {if (!RealConnection().getAutoCommit()) {RealConnection().rollback();}conn.setConnectionTypeCode(Url(), username, password));conn.setCheckoutTimestamp(System.currentTimeMillis());conn.setLastUsedTimestamp(System.currentTimeMillis());// 连接有效,添加到活跃连接中state.activeConnections.add(conn);questCount++;state.accumulatedRequestTime += System.currentTimeMillis() - t;} else {if (log.isDebugEnabled()) {log.debug("A bad connection (" + RealHashCode() + ") was returned from the pool, getting another connection.");}// 统计无效连接数据state.badConnectionCount++;localBadConnectionCount++;conn = null;if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {// 达到最大重试次数,抛异常if (log.isDebugEnabled()) {log.debug("PooledDataSource: Could not get a good connection to the database.");}throw new SQLException("PooledDataSource: Could not get a good connection to the database.");}}}}}if (conn == null) {// 没有获取到数据库连接,抛异常。if (log.isDebugEnabled()) {log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");}throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");}// 获取到连接,则返回return conn;
}
wait
等待其他线程释放连接。对于PooledDataSource来说,连接使用完毕是要归还给连接池的,对应的方法是pushConnection()
:
// 归还连接
protected void pushConnection(PooledConnection conn) throws SQLException {synchronized (state) {// 从活跃连接中删除ve(conn);if (conn.isValid()) {// 回收前必须保证连接有效// 判断空闲连接数是否已达到最大值 & 校验连接标识if (state.idleConnections.size() < poolMaximumIdleConnections && ConnectionTypeCode() == expectedConnectionTypeCode) {// 统计连接使用时间state.accumulatedCheckoutTime += CheckoutTime();if (!RealConnection().getAutoCommit()) {RealConnection().rollback();}// 基于原生Connection创建新的PooledConnection,并添加到空闲连接列表PooledConnection newConn = new RealConnection(), this);state.idleConnections.add(newConn);newConn.CreatedTimestamp());newConn.LastUsedTimestamp());// 旧连接置为失效conn.invalidate();if (log.isDebugEnabled()) {log.debug("Returned connection " + RealHashCode() + " to pool.");}// 有连接可用了,唤醒等待线程。ifyAll();} else {// 空闲连接数已达到最大值,新回收的连接会被直接关闭state.accumulatedCheckoutTime += CheckoutTime();if (!RealConnection().getAutoCommit()) {RealConnection().rollback();}RealConnection().close();if (log.isDebugEnabled()) {log.debug("Closed connection " + RealHashCode() + ".");}// 被关闭的连接置为失效conn.invalidate();}} else {// 连接已失效,统计失效连接数if (log.isDebugEnabled()) {log.debug("A bad connection (" + RealHashCode() + ") attempted to return to the pool, discarding connection.");}state.badConnectionCount++;}}
}
notifyAll
通知其它等待线程。和PooledDataSource息息相关的还有一个类PoolState,它代表的是连接池的状态。例如:当前有多少空闲/活跃连接?系统获取连接的次数是多少?连接使用的总时长是多少?多少连接失效了?获取连接有没有发生等待?等待的总时长是多少等等统计信息。
先看属性:
public class PoolState {// 数据源protected PooledDataSource dataSource;// 空闲连接protected final List<PooledConnection> idleConnections = new ArrayList<>();// 活跃连接protected final List<PooledConnection> activeConnections = new ArrayList<>();// 获取连接的次数protected long requestCount = 0;// 累计获取连接消耗的时间protected long accumulatedRequestTime = 0;// 累计连接被使用的时间protected long accumulatedCheckoutTime = 0;// 累计过期被回收的连接数protected long claimedOverdueConnectionCount = 0;// 累计过期连接的使用时间protected long accumulatedCheckoutTimeOfOverdueConnections = 0;// 无可用连接时的累计等待时间protected long accumulatedWaitTime = 0;// 等待的次数protected long hadToWaitCount = 0;// 无效的连接数protected long badConnectionCount = 0;
}
PoolState代码很简单,主要就是负责统计数据源连接池的状态数据,这里就不贴代码了。
细心的同学会发现,PooledDataSource获取的Connection并不是JDBC原生的,而是PooledConnection对象。MyBatis为何还要再封装一层呢?先说结论:主要原因是为了调用close方法回收连接。
从DataSource获取到Connection,执行指定的SQL,然后将Connection关闭。这是正常流程,但是PooledDataSource的需求是调用Connection.close()
方法是将连接回收,而非关闭。如何实现呢?
一种是实现Connection接口,重写close()
方法。另一种是为Connection对象生成一个代理对象,拦截close()
方法,实现自定义的逻辑。
MyBatis选择了后者,因为Connection本身就是接口,既然是接口就可以很方便的使用JDK动态代理生成代理对象。
话不多说,直接看源码,先看属性:
class PooledConnection implements InvocationHandler {// 要拦截的方法名,调用close()归还连接,而非关闭连接private static final String CLOSE = "close";private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };// 哈希码private final int hashCode;// 数据源,用于归还连接private final PooledDataSource dataSource;// 原生Connectionprivate final Connection realConnection;// JDK动态代理生成的代理连接private final Connection proxyConnection;// 取出时间戳private long checkoutTimestamp;// 创建时间戳private long createdTimestamp;// 最后一次使用时间戳private long lastUsedTimestamp;// 连接标识:url+username+password 哈希码private int connectionTypeCode;// 是否有效private boolean valid;
}
close
方法外,其他方法委托它去执行。
它的构造函数需要原生Connection对象和数据源DataSource,在构造函数中,同时会生成代理对象。
/*** 将原生Connection包装为池化的PooledConnection* @param connection 原生连接* @param dataSource 数据源连接池,close时用于归还连接*/
public PooledConnection(Connection connection, PooledDataSource dataSource) {// 代理对象保存原生哈希码this.hashCode = connection.hashCode();// 原始连接alConnection = connection;this.dataSource = atedTimestamp = System.currentTimeMillis();this.lastUsedTimestamp = System.currentTimeMillis();this.valid = true;// 生成代理连接对象this.proxyConnection = (Connection) wProxyInstance(ClassLoader(), IFACES, this);
}
JDK动态代理生成的对象,其实主要就是看invoke
方法。对于close
方法,MyBatis会调用dataSource.pushConnection()
回收连接,而非原生的关闭连接。对于其他方法,则委托给原生Connection执行,PooledConnection只负责拦截close
方法。
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = Name();if (CLOSE.equals(methodName)) {// 如果调用的是close(),不去关闭连接,而是归还到连接池。dataSource.pushConnection(this);return null;}try {if (!Object.class.DeclaringClass())) {checkConnection();}// 其他方法,正常委派给原生Connection执行return method.invoke(realConnection, args);} catch (Throwable t) {throw ExceptionUtil.unwrapThrowable(t);}
}
MyBatis提供了两种数据源:UnpooledDataSource和PooledDataSource,前者每次获取连接都会创建新的连接,这会带来服务性能差、连接数不可控、面对突发流量数据库僵死等诸多缺点。后者是数据库连接池,针对连接进行池化管理,使得连接可以被复用、不用频繁创建和关闭、连接数也变得可控,线上首选。
与PooledDataSource息息相关的类PoolState记录了连接池的状态信息,这些统计信息很有用,可基于此来判断连接池的效率。
另一个重要的类就是PooledConnection,它可以为原生Connection生成代理对象,使得外界在调用其close
方法时不是直接关闭连接,而是回收连接。
本文发布于:2024-02-01 06:08:16,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170673889834444.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |