征文|「ShardingJDBC执行过程剖析」

ShardingJDBC执行过程剖析

ShardingSphere-ShardingJDBC实现思路解析

这里使用的版本为ShardingJDBC 4.1.1版本。

传统JDBC的使用

方式1-通过DriverManager

Class.forName("com.mysql.cj.jdbc.Driver");
Connection connection = DriverManager.getConnection(url, user, password);
PreparedStatement preparedStatement = connection.prepareStatement("insert into t_user (id, name) values (?, ?)");
preparedStatement.setInt(1, 12);
preparedStatement.setString(2, "this is paramter");
preparedStatement.executeUpdate();
// 如果执行的是查询,还要关闭ResultSet 
// resultSet.close();
preparedStatement.close();
connection.close();

方式2-通过DataSource

DruidDataSource datasource = new DruidDataSource();
dataSource.setDriverClassName(driver);
dataSource.setUrl(url);
dataSource.setUsername(user);
dataSource.setPassword(password);
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement("select * from t_user");
ResultSet resultSet = preparedStatement.executeQuery();
while(resultSet.next()) {
   int id = resultSet.getInt(1);
   int gener = resultSet.getInt(2)
}
resultSet.close();
preparedStatement.close();
connection.close();

以上两种方式,现在基本上会采用第二种方式了。通过连接池管理数据源,做到连接的复用,节省资源。但是上面两种方式的核心步骤是一样的。即

  1. 加载驱动
  2. 获取连接-Connection
  3. 获取StateMent-PreparedStatement
  4. 为SQL赋值-PreparedStatement.setXXX(paramterIndex, value)
  5. 执行SQL-PreparedStatement.executeXXX();
  6. 获得结果集-ResultSet
  7. 依次关闭各种连接-XXX.close();

SharingJdbc流程解析

  1. 启动时,加载所有数据源(DBCP数据源、Druid数据源、C3P0数据源)配置,将其作为Map,设置为具体数据源(分片数据源、主从数据源、影子数据源)的属性。此数据源为==逻辑上的数据源==。为什么说是逻辑上的数据源呢?后面会解释。

    流程详解:

    SpringBootConfiguration这个类继承了EnvironmentAware接口(一般实现这个接口都是为了读取具体配置文件里的属性),实现了setEnvironment(Environment environment)方法。

    完成的操作是将所有在配置文件中配置的数据源放到一个Map<数据源名字, 数据源实例>中。主要看下这里实例化数据源的步骤,拿到配置文件下每一个数据源的type,假定这里type是 com.alibaba.druid.pool.DruidDataSource


    可以看到直接通过Class.forName().newInstance();方法具体实例化每一个数据源。实例化之后,要将其他属性设置进数据源中,这里首先通过字符串拼接某些属性的Set方法,如,数据源里配置的url,这里拼接完成为setUrl.下一步通过callSetterMethod方法,进行反射调用set方法,将配置文件的具体数据源的其他属性赋值。完成这些之后,会放到上面的Map中。

    执行完setEnvironment方法之后,执行其他代码。

    当==@Conditional==条件满足时,会认为当前配置的为分片操作。于是new一个ShardingDataSource,将刚才设置好的map当做一个属性设置进去。

    为什么要自定义数据源?既然拿到了具体的数据源为什么还要自定义一个数据源?

    原因1-如果在使用Mybatis,Mybatis的SqlSessionFactoryBean要进行数据源的设置(setDataSource方法),如果此时没有数据源,在afterPropertiesSet()方法里会进行数据源的检查,如果为null,则会报错。(因为SqlSessionFactoryBean实现了==InitializingBean==接口,所以要实现afterPropertiesSet()方法)

  2. 开始进行分片查询、新增、删除、更新。执行SQL前的准备

    以使用Mybatis的插入操作为例。

    Mybatis插入的核心操作在SimpleExector.doUpdate方法中,这个方法里要获取Connection、Statement、设置参数。prepareStatement方法中this.getConnectoin().里面会调用openConnection()方法,最终会调用当前DataSource.getConnection方法。也就是调用ShardingDataSource.getConnection.返回一个ShardingConnection.

    为什么要返回一个自定义的Connection呢?后面会说。

    拿到connection之后,还要获得Statement,调用handler.prepare方法,最终会调用instantiateStatement方法中通过connection.prepareStatement方法获得。这里调用ShardingConnection.prepareStatement方法返回一个ShardingPreparedStatement.

    为什么要返回一个自定义的PreparedStatement呢?

    拿到Statement之后,自然就是为SQL的参数赋值。调用handler.parameterize方法,接着会调用DefaultParameterHandler.setParameters方法,为不同的参数类型,选择不同的TypeHandler,最后会调用BaseTypeHandler.setParameter().setNonNullParameter方法,根据不同的TypeHandler执行PreparedStatement.setInt或者setString或者其他setXXX方法.

    调用前面返回的SharingPreparedStatement.setInt()方法

    正常的PreparedStatement接口的实现类,例如MySQL的ClientPreparedStatement的setInt会进行设置参数和值的绑定,但是这里ShardingPreparedStatement却没有

    这里仅仅是将其放到了一个List中。其他setString、setXXX也一样。就返回了。

    值得注意的是,到完成这一步,在Mybatis的角度来看,已经拿到了connection、statement、为SQL设置好了值。接下来就是最后一步,执行SQL就行了。但是,这里拿到的只不过是ShardingJDBC自定义的几个类的对象。连真正拿到connection的地方都还看到。

  3. 开始进行执行SQL

    Mybatis认为一切准备就绪,最终会执行ps.execute()方法。这行代码的最终执行依旧还是ShardingPreparedStatement的execute方法。因为前面拿到的就是它。

    prepare方法进行的是根据分片、主从、影子的规则进行SQL改写。也就是核心的功能。这里不做讨论。

    进入144行方法

    有三个子方法

    preparedStatementExecutor.init(executionContext);
    setParametersForStatements();
    replayMethodForStatements();
    

    第一个方法:

    此时才是真正去拿connection的地方

    可以看到,这里根据具体的配置文件里的datasource名字,去刚才启动时候的map里去找对应的已经实例好的DataSource.最后会根据dataSource.getConnection方法获得真正的connection.获得connection之后,还会执行上面截图的createdPreparedStatement方法去获取真正的Statement。==也就是调用真正connection的connection.prepareStatement==

    第二个子方法:获得connection、statement之后。进入setParametersForStatements方法


    通过反射调用PreparedStatement.setObject方法,对参数设置,这里的循环的是刚才Mybatis设置参数时所放进去的list。

  4. 最终执行SQL

    准备完毕之后,即路由、改写SQL之后,获取连接、statement、参数赋值之后,终于执行了preparedStatementExecutor.execute()方法,最终会执行org.apache.shardingsphere.sharding.execute.sql.execute.SQLExecuteCallback#executeSQL方法

    这里拿到的不再是ShardingConnection或者ShardingPreparedStatement而是真正数据源的connection、statement。进行执行

  5. 结果集的处理

    Mybatis的核心ps.execute执行完毕之后,会对结果集进行处理。

    ==注意== :此时的ps是ShardingPreparedStatement,因为具体的PreparedStatement是在上一步ps.execute里面才拿到的。

    最终会在DefaultResultSetHandler里进行获取resultSet

    调用ShardingPreparedStatement的具体getResultSet方法

    这里的each是真正的PreparedStatement。直接调用getResultSet即可获得结果集。获得结果集之后,ShardingJdb会对结果集进行合并,也是核心功能。这里不做讨论

到此为止,ShardingJDBC的工作流程解析完毕,再看前面几个问题,为什么要自定义datasource(最上面说的逻辑数据源)、connection、statement、result?

为了和各种ORM框架的进行==兼容==(正因为有了这些自定义的类,Mybatis的正常流程才不受阻碍),同时不开发重复功能,只专注于核心功能的实现。ShardingJDBC将JDBC的核心步骤全部下沉到最底层的execute方法中。在底层execute方法执行之前,借助于自定义的类,完成自身核心功能(分片、主从、加解密、影子)的实现。对于其他框架来说,ShardingJDBC的这些类就是”可用的“,可以说,这些自定义的类在充当傀儡的同时(让其他框架以为自己拿到的就是真的),还实现了自身核心功能。

模拟ShardingJdbc执行过程

走完上面的流程,简单自定义模拟一下上述过程

自定义DataSource-MyDataSource

自定义Connection-Myconnection

自定义PreparedStatement-MyPreparedStatement

容器启动时,创建自定义MyDataSource

public class DataSourceConfiguration implements EnvironmentAware {
    
    private Environment environment;
    
    @Override
    public void setEnvironment(final Environment environment) {
        this.environment = environment;
    }
    
    @Bean
    @SneakyThrows
    public DataSource createDataSource() {
        Map<String, Object> property = getDataSourcePropertyByPrefix("spring.datasource");
        Preconditions.checkState(CollectionUtil.isNotEmpty(property));
        Map<String, DataSource> dataSourceMap = new HashMap<>(1);
        DataSource dataSource = (DataSource) Class.forName(property.get("type").toString()).newInstance();
        property.remove("type");
        Iterator<Entry<String, Object>> iterator = property.entrySet().iterator();
        while (iterator.hasNext()) {
            Entry<String, Object> entry = iterator.next();
            // 通过反射调用各个属性的set方法,设置必要属性
            callSetterMethod(dataSource, getSetterMethodName(entry.getKey()), entry.getValue().toString());
        }
        // 将dataSource设置进去
        dataSourceMap.put("master", dataSource);
        return new MyDataSource(dataSourceMap);
    }
    
    @SneakyThrows
    private void callSetterMethod(final DataSource dataSource, final String setterMethodName, final String value) {
        Method method = dataSource.getClass().getMethod(setterMethodName, String.class);
        method.invoke(dataSource, value);
    }
    
    private String getSetterMethodName(final String key) {
        return key.contains("-") ? CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, "set-" + key) : "set" + String.valueOf(key.charAt(0)).toUpperCase() + key.substring(1);
    }
    
    private Map getDataSourcePropertyByPrefix(String prefix) {
        // 利用Binder获取配置文件的信息
        Binder binder = Binder.get(environment);
        BindResult<Map> bind = binder.bind(prefix, Bindable.of(Map.class));
        return bind.get();
    }
}

DataSource.getConnection()方法,返回自定义MyConnection

public class MyDataSource implements DataSource {
    
    /**
     * 自定义dataSourceMap,里面放置了各种DataSource,如Druid,C3P0.
     */
    private final Map<String, DataSource> dataSourceMap;
    
    public MyDataSource(final Map<String, DataSource> dataSourceMap) {
        this.dataSourceMap = dataSourceMap;
    }
    
    /**
     * 获取连接,仅仅返回一个MyConnection对象.
     */
    @Override
    public Connection getConnection() throws SQLException {
        return new MyConnection(dataSourceMap);
    }
}

MyConnection.prepareStatement返回自定义MyPreparedStatement

@Getter
public class MyConnection implements Connection {
    
    private final Map<String, DataSource> dataSourceMap;
    
    public MyConnection(final Map<String, DataSource> dataSourceMap) {
        this.dataSourceMap = dataSourceMap;
    }
    
    @Override
    public Statement createStatement() throws SQLException {
        return null;
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        return new MyPreparedStatement(this, sql);
    }
}

在自定义的MyPreparedStatement方法中真正执行JDBC核心流程

@Getter
public class MyPreparedStatement implements PreparedStatement {
    
    private final MyConnection myConnection;
    private final String sql;
    private ResultSet resultSet;
    private final List<Object> parameters = new ArrayList<>();
    
    public MyPreparedStatement(final MyConnection myConnection, final String sql) {
        this.myConnection = myConnection;
        this.sql = sql;
    }
   // ....省略部分方法
   @Override
    public void setInt(final int parameterIndex, final int x) throws SQLException {
        // Mybatis TypeHandler处理的时候,现将参数存放在list中
        
        // list必须有值,才可以调用set方法
        parameters.add(null);
        parameters.set(parameterIndex - 1, x);
    }  
  /**
     * 真正执行获取Connection-连接、获取PreparedStatement并且执行的地方.
     */
    @Override
    public boolean execute() throws SQLException {
        // 获取具体数据源
        DataSource dataSource = this.myConnection.getDataSourceMap().get("master");
        // 获取具体数据源的connection
        Connection connection = dataSource.getConnection();
        // 真正获取preparedStatement的地方
        PreparedStatement preparedStatement = connection.prepareStatement(this.sql);
        // 通过反射 调用 设置属性值
        final AtomicInteger loop = new AtomicInteger(0);
        parameters.forEach(item -> {
            Method method = null;
            try {
                // 第一种方式: 直接通过preparedStatement.setObject 设置参数值
                preparedStatement.setObject(loop.get()+1, item);
                
                // 第二种方式: 通过反射调用,也是调用preparedStatement.setObject
               // method = PreparedStatement.class.getMethod("setObject", int.class, Object.class);
                //method.invoke(preparedStatement, loop.get()+1, item);
                // NoSuchMethodException | IllegalAccessException | InvocationTargetException |
            } catch ( SQLException e) {
                e.printStackTrace();
            }
            loop.getAndIncrement();
        });
        preparedStatement.execute();
        ResultSet statementResultSet = preparedStatement.getResultSet();
        this.resultSet = statementResultSet;
        // 不关闭资源,还有handler要拿下面的资源进行结果集的处理
        //statementResultSet.close();
        // preparedStatement.close();
        //connection.close();
        return true;
    }
    @Override
    public ResultSet getResultSet() throws SQLException {
        return this.resultSet;
    }
}

以上流程基本模拟了ShardingJDBC所完成的整体的流程。全部实现JDBC的DataSource、Connection、Statement接口的方法。还少一个资源的关闭,没有添加例子

备注

  1. 以上所有示例代码,均已放到GitHub上,如有需要自取
  2. 以上涉及到Mybatis的执行流程的细节,没有特别细致的分析,可参考相关Mybatis分析相关博客

状态:原创(CSDN博主
JackSparrow414

4 个赞
京ICP备2021015875号