博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
sharding-jdbc源码解析之结果集归并
阅读量:5827 次
发布时间:2019-06-18

本文共 8932 字,大约阅读时间需要 29 分钟。

hot3.png

结果集归并源码解析

本文转自“天河聊技术”微信公众号

 

找到这个方法,执行查询的方法

com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement#executeQuery

@Override    public ResultSet executeQuery() throws SQLException {        ResultSet result;        try {//            路由到预编译对象执行单元集合            Collection
preparedStatementUnits = route();//            多线程执行sql查询返回结果集对象集合            List
resultSets = new PreparedStatementExecutor( getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();//走结果计归并引擎执行结果集归并逻辑-》            result = new ShardingResultSet(resultSets, new MergeEngine( getShardingConnection().getShardingContext().getDatabaseType(), resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge());        } finally { clearBatch();        } setCurrentResultSet(result);        return result;    }

创建结果集归并引擎对象,进入到构造器

/** * 分片结果集归并引擎. * * @author zhangliang */public final class MergeEngine {        private final DatabaseType databaseType;        private final List
resultSets;        private final SelectStatement selectStatement;        private final Map
columnLabelIndexMap;        public MergeEngine(final DatabaseType databaseType, final List
resultSets, final SelectStatement selectStatement) throws SQLException { this.databaseType = databaseType;        this.resultSets = resultSets;        this.selectStatement = selectStatement;        columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));    }

获取结果集的源数据

columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));
private Map
getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException {//        获取resultSet的源数据        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();        Map
result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i);        } return result;    }

进入到这个方法

com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#merge

/** * 合并结果集. * * @return 归并完毕后的结果集 * @throws SQLException SQL异常 */public ResultSetMerger merge() throws SQLException {//结果集合并业务方法    selectStatement.setIndexForItems(columnLabelIndexMap);    return decorate(build());}

进入到这个方法

com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#build

private ResultSetMerger build() throws SQLException {//        排序项不为空或者聚合选择项不为空        if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {//            如果分组项和排序项一致,走流式结果集归并-》            if (selectStatement.isSameGroupByAndOrderByItems()) {                return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());            } else {//                否则走内存结果集归并,要尽量避免这种情况,会占用大量内存=》                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());            }        }        if (!selectStatement.getOrderByItems().isEmpty()) {            return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType());        }        return new IteratorStreamResultSetMerger(resultSets);    }
return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());

 

创建分组流式结果集归并,进入到构造器方法

public GroupByStreamResultSetMerger(        final Map
labelAndIndexMap, final List
resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(resultSets, selectStatement.getOrderByItems(), nullOrderType);    this.labelAndIndexMap = labelAndIndexMap;    this.selectStatement = selectStatement;    currentRow = new ArrayList<>(labelAndIndexMap.size());    currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();}

这一行代码

super(resultSets, selectStatement.getOrderByItems(), nullOrderType);

创建排序流式结果集归并对象

public OrderByStreamResultSetMerger(final List
resultSets, final List
orderByItems, final OrderType nullOrderType) throws SQLException { this.orderByItems = orderByItems;//优先级队列实现        this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());        this.nullOrderType = nullOrderType;//        把要排序的结果集往队列里放-》        orderResultSetsToQueue(resultSets);        isFirstNext = true;    }
//        把要排序的结果集往队列里放-》        orderResultSetsToQueue(resultSets);
private void orderResultSetsToQueue(final List
resultSets) throws SQLException { for (ResultSet each : resultSets) { OrderByValue orderByValue = new OrderByValue(each, orderByItems, nullOrderType);            if (orderByValue.next()) { orderByValuesQueue.offer(orderByValue);            } }//        流式结果集归并,设置当前的流式归并结果集,大家看这里存储是当前的结果集所以不会出现内存溢出问题        setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());    }

返回到这个方法

private ResultSetMerger build() throws SQLException {//        排序项不为空或者聚合选择项不为空        if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {//            如果分组项和排序项一致,走流式结果集归并-》            if (selectStatement.isSameGroupByAndOrderByItems()) {                return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());            } else {//                否则走内存结果集归并,要尽量避免这种情况,会占用大量内存=》                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());            }        }        if (!selectStatement.getOrderByItems().isEmpty()) {            return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType());        }        return new IteratorStreamResultSetMerger(resultSets);    }

这一行

//                否则走内存结果集归并,要尽量避免这种情况,会占用大量内存=》                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());

创建内存分组结果集归并对象

public GroupByMemoryResultSetMerger(            final Map
labelAndIndexMap, final List
resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException { super(labelAndIndexMap);        this.selectStatement = selectStatement;        this.nullOrderType = nullOrderType;//        创建内存结果集行对象        memoryResultSetRows = init(resultSets);    }

返回到这个方法

private ResultSetMerger build() throws SQLException {//        排序项不为空或者聚合选择项不为空        if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {//            如果分组项和排序项一致,走流式结果集归并-》            if (selectStatement.isSameGroupByAndOrderByItems()) {                return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());            } else {//                否则走内存结果集归并,要尽量避免这种情况,会占用大量内存=》                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());            }        }        if (!selectStatement.getOrderByItems().isEmpty()) {            return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType());        }//        创建迭代归并结果集归并对象        return new IteratorStreamResultSetMerger(resultSets);    }

这一行

//        创建迭代归并结果集归并对象        return new IteratorStreamResultSetMerger(resultSets);
public IteratorStreamResultSetMerger(final List
resultSets) { this.resultSets = resultSets.iterator();    setCurrentResultSet(this.resultSets.next());}

返回到这个方法

/** * 合并结果集. * * @return 归并完毕后的结果集 * @throws SQLException SQL异常 */public ResultSetMerger merge() throws SQLException {//结果集合并业务方法    selectStatement.setIndexForItems(columnLabelIndexMap);    return decorate(build());}

进入这个方法com.dangdang.ddframe.rdb.sharding.merger.MergeEngine#decorate

private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {        ResultSetMerger result = resultSetMerger;        if (null != selectStatement.getLimit()) {//            装饰器模式对分页结果集归并进行了进一步的封装            result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());        }        return result;    }

 

结果集归并源码解析到这里就结束了。

 

 

说到最后

源码解析的部分内容比较多,还是直接看源码比较直观,我这里的源码解析只是给予大家一个阅读源码的思路,我已经尽量把sharding-jdbc实现的关键点都介绍了下,可能还有其他的好的实现没有介绍出来,sharding-jdbc源码解析系列到这里就全部结束了,如果需要进一步深度沟通,请加我微信,我会把你拉到天河聊技术技术讨论群里相互交流,以上源码解析仅供参考。

转载于:https://my.oschina.net/u/3775437/blog/1785209

你可能感兴趣的文章
【CAS单点登录视频教程】 第04集 -- tomcat下配置https环境
查看>>
自适应网页布局经验
查看>>
Ubuntu apache 禁止目录浏览
查看>>
常用脚本--归档ERRORLOG
查看>>
js网页倒计时精确到秒级
查看>>
常用CSS缩写语法总结
查看>>
TDD:什么是桩(stub)和模拟(mock)?
查看>>
C# 模拟POST提交文件
查看>>
PAT 解题报告 1004. Counting Leaves (30)
查看>>
Android开发之蓝牙 --修改本机蓝牙设备的可见性,并扫描周围可用的蓝牙设备
查看>>
[Head First设计模式]生活中学设计模式——外观模式
查看>>
Repository模式中,Update总是失败及其解析
查看>>
.Net 转战 Android 4.4 日常笔记(2)--HelloWorld入门程序
查看>>
[原创]浅谈测试团队转型,思维模式的转变是关键
查看>>
Redis学习-SortedSet
查看>>
android CoordinatorLayout使用
查看>>
机器学习资料大汇总
查看>>
Python selenium 滚动条 详解
查看>>
微信程序开发
查看>>
如何退出minicom【学习笔记】
查看>>