结果集归并源码解析
本文转自“天河聊技术”微信公众号
找到这个方法,执行查询的方法
com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement#executeQuery
@Override public ResultSet executeQuery() throws SQLException { ResultSet result; try {// 路由到预编译对象执行单元集合 CollectionpreparedStatementUnits = route();// 多线程执行sql查询返回结果集对象集合 List resultSets = new PreparedStatementExecutor( getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();//走结果计归并引擎执行结果集归并逻辑-》 result = new ShardingResultSet(resultSets, new MergeEngine( getShardingConnection().getShardingContext().getDatabaseType(), resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge()); } finally { clearBatch(); } setCurrentResultSet(result); return result; }
创建结果集归并引擎对象,进入到构造器
/** * 分片结果集归并引擎. * * @author zhangliang */public final class MergeEngine { private final DatabaseType databaseType; private final ListresultSets; private final SelectStatement selectStatement; private final Map columnLabelIndexMap; public MergeEngine(final DatabaseType databaseType, final List resultSets, final SelectStatement selectStatement) throws SQLException { this.databaseType = databaseType; this.resultSets = resultSets; this.selectStatement = selectStatement; columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0)); }
获取结果集的源数据
columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));
private MapgetColumnLabelIndexMap(final ResultSet resultSet) throws SQLException {// 获取resultSet的源数据 ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); Map result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i); } return result; }
进入到这个方法
com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#merge
/** * 合并结果集. * * @return 归并完毕后的结果集 * @throws SQLException SQL异常 */public ResultSetMerger merge() throws SQLException {//结果集合并业务方法 selectStatement.setIndexForItems(columnLabelIndexMap); return decorate(build());}
进入到这个方法
com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#build
private ResultSetMerger build() throws SQLException {// 排序项不为空或者聚合选择项不为空 if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {// 如果分组项和排序项一致,走流式结果集归并-》 if (selectStatement.isSameGroupByAndOrderByItems()) { return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } else {// 否则走内存结果集归并,要尽量避免这种情况,会占用大量内存=》 return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } } if (!selectStatement.getOrderByItems().isEmpty()) { return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType()); } return new IteratorStreamResultSetMerger(resultSets); }
return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
创建分组流式结果集归并,进入到构造器方法
public GroupByStreamResultSetMerger( final MaplabelAndIndexMap, final List resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(resultSets, selectStatement.getOrderByItems(), nullOrderType); this.labelAndIndexMap = labelAndIndexMap; this.selectStatement = selectStatement; currentRow = new ArrayList<>(labelAndIndexMap.size()); currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();}
这一行代码
super(resultSets, selectStatement.getOrderByItems(), nullOrderType);
创建排序流式结果集归并对象
public OrderByStreamResultSetMerger(final ListresultSets, final List orderByItems, final OrderType nullOrderType) throws SQLException { this.orderByItems = orderByItems;//优先级队列实现 this.orderByValuesQueue = new PriorityQueue<>(resultSets.size()); this.nullOrderType = nullOrderType;// 把要排序的结果集往队列里放-》 orderResultSetsToQueue(resultSets); isFirstNext = true; }
// 把要排序的结果集往队列里放-》 orderResultSetsToQueue(resultSets);
private void orderResultSetsToQueue(final ListresultSets) throws SQLException { for (ResultSet each : resultSets) { OrderByValue orderByValue = new OrderByValue(each, orderByItems, nullOrderType); if (orderByValue.next()) { orderByValuesQueue.offer(orderByValue); } }// 流式结果集归并,设置当前的流式归并结果集,大家看这里存储是当前的结果集所以不会出现内存溢出问题 setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet()); }
返回到这个方法
private ResultSetMerger build() throws SQLException {// 排序项不为空或者聚合选择项不为空 if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {// 如果分组项和排序项一致,走流式结果集归并-》 if (selectStatement.isSameGroupByAndOrderByItems()) { return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } else {// 否则走内存结果集归并,要尽量避免这种情况,会占用大量内存=》 return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } } if (!selectStatement.getOrderByItems().isEmpty()) { return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType()); } return new IteratorStreamResultSetMerger(resultSets); }
这一行
// 否则走内存结果集归并,要尽量避免这种情况,会占用大量内存=》 return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
创建内存分组结果集归并对象
public GroupByMemoryResultSetMerger( final MaplabelAndIndexMap, final List resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(labelAndIndexMap); this.selectStatement = selectStatement; this.nullOrderType = nullOrderType;// 创建内存结果集行对象 memoryResultSetRows = init(resultSets); }
返回到这个方法
private ResultSetMerger build() throws SQLException {// 排序项不为空或者聚合选择项不为空 if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {// 如果分组项和排序项一致,走流式结果集归并-》 if (selectStatement.isSameGroupByAndOrderByItems()) { return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } else {// 否则走内存结果集归并,要尽量避免这种情况,会占用大量内存=》 return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType()); } } if (!selectStatement.getOrderByItems().isEmpty()) { return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType()); }// 创建迭代归并结果集归并对象 return new IteratorStreamResultSetMerger(resultSets); }
这一行
// 创建迭代归并结果集归并对象 return new IteratorStreamResultSetMerger(resultSets);
public IteratorStreamResultSetMerger(final ListresultSets) { this.resultSets = resultSets.iterator(); setCurrentResultSet(this.resultSets.next());}
返回到这个方法
/** * 合并结果集. * * @return 归并完毕后的结果集 * @throws SQLException SQL异常 */public ResultSetMerger merge() throws SQLException {//结果集合并业务方法 selectStatement.setIndexForItems(columnLabelIndexMap); return decorate(build());}
进入这个方法com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#decorate
private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException { ResultSetMerger result = resultSetMerger; if (null != selectStatement.getLimit()) {// 装饰器模式对分页结果集归并进行了进一步的封装 result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit()); } return result; }
结果集归并源码解析到这里就结束了。
说到最后
源码解析的部分内容比较多,还是直接看源码比较直观,我这里的源码解析只是给予大家一个阅读源码的思路,我已经尽量把sharding-jdbc实现的关键点都介绍了下,可能还有其他的好的实现没有介绍出来,sharding-jdbc源码解析系列到这里就全部结束了,如果需要进一步深度沟通,请加我微信,我会把你拉到天河聊技术技术讨论群里相互交流,以上源码解析仅供参考。