GeoSpark源碼解析(二)
本節(jié)我們還是以查詢?yōu)槔⌒铮聪翯eoSpark如何封裝JTS中的索引的燕耿。在上節(jié)我們簡單看了SpaitalRDD
昧廷,其中降到了JavaRDD<T> rawSpatialRDD
被碗,在不使用索引的情況下奏属,也是我們GeoSpark提供了match方法伶丐,調(diào)用RDD的map方法完成的悼做,本節(jié)我們在看一個SpatialRDD
的成員indexedRawRDD
public class SpatialRDD<T extends Geometry>
implements Serializable
{
/**
* The raw spatial RDD.
*/
public JavaRDD<T> rawSpatialRDD;
/**
* The indexed raw RDD.
*/
public JavaRDD<SpatialIndex> indexedRawRDD;
...
}}
indexedRawRDD
需要我們調(diào)用buildIndex(final IndexType indexType, boolean buildIndexOnSpatialPartitionedRDD)
來構(gòu)建,我們看下這個函數(shù)是如何構(gòu)建索引的
public void buildIndex(final IndexType indexType, boolean buildIndexOnSpatialPartitionedRDD)
throws Exception
{
if (buildIndexOnSpatialPartitionedRDD == false) {
//This index is built on top of unpartitioned SRDD
this.indexedRawRDD = this.rawSpatialRDD.mapPartitions(new IndexBuilder(indexType));
}
else {
if (this.spatialPartitionedRDD == null) {
throw new Exception("[AbstractSpatialRDD][buildIndex] spatialPartitionedRDD is null. Please do spatial partitioning before build index.");
}
this.indexedRDD = this.spatialPartitionedRDD.mapPartitions(new IndexBuilder(indexType));
}
}
我們先看buildIndexOnSpatialPartitionedRDD
為false
的情況(分區(qū)的情況我們下次介紹)哗魂,第6行代碼this.indexedRawRDD = this.rawSpatialRDD.mapPartitions(new IndexBuilder(indexType));
又調(diào)用了mapPartitions
方法肛走,只不過這次傳遞的是IndexBuilder
對象,我們看下這個對象
public final class IndexBuilder<T extends Geometry>
implements FlatMapFunction<Iterator<T>, SpatialIndex>
{
IndexType indexType;
public IndexBuilder(IndexType indexType)
{
this.indexType = indexType;
}
@Override
public Iterator<SpatialIndex> call(Iterator<T> objectIterator)
throws Exception
{
SpatialIndex spatialIndex;
if (indexType == IndexType.RTREE) {
spatialIndex = new STRtree();
}
else {
spatialIndex = new Quadtree();
}
while (objectIterator.hasNext()) {
T spatialObject = objectIterator.next();
spatialIndex.insert(spatialObject.getEnvelopeInternal(), spatialObject);
}
Set<SpatialIndex> result = new HashSet();
spatialIndex.query(new Envelope(0.0, 0.0, 0.0, 0.0));
result.add(spatialIndex);
return result.iterator();
}
}
整個類僅有30行左右代碼录别,功能很簡單朽色,就是提供一個Map映射函數(shù),indexType是構(gòu)建索引類型组题,JTS提供了兩種STRTreee
和Quadtree
葫男,關(guān)于他們的原理大家可以去看GIS相關(guān)教材,第20行的while循環(huán)就開始將RDD中的Geometry添加到索引樹中崔列,因為Spark規(guī)定call
函數(shù)是一定要返回一個迭代器的梢褐,所以GeoSpark就將spatialIndex
加到Set集合,并返回其迭代器赵讯。這里補充一點盈咳,從這里我們還能得出一點,實際上一個分區(qū)只有一個索引边翼,這也與GIS實際情況相吻合鱼响。
索引構(gòu)造完成后,就可以利用索引來進行并行分析了讯私,我們還是以查詢?yōu)槔?/p>
public static <U extends Geometry, T extends Geometry> JavaRDD<T> SpatialRangeQuery(SpatialRDD<T> spatialRDD, U originalQueryGeometry, boolean considerBoundaryIntersection, boolean useIndex)
throws Exception
{
U queryGeometry = originalQueryGeometry;
if (spatialRDD.getCRStransformation()) {
queryGeometry = CRSTransformation.Transform(spatialRDD.getSourceEpsgCode(), spatialRDD.getTargetEpgsgCode(), originalQueryGeometry);
}
if (useIndex == true) {
if (spatialRDD.indexedRawRDD == null) {
throw new Exception("[RangeQuery][SpatialRangeQuery] Index doesn't exist. Please build index on rawSpatialRDD.");
}
return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
}
else {
return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
}
}
首先热押,GeoSpark在第9行先判斷是否使用索引,緊接著判斷是否構(gòu)建了索引斤寇,若構(gòu)建了索引桶癣,就執(zhí)行第13行return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
我們看RangeFilterUsingIndex
這個類。
public class RangeFilterUsingIndex<U extends Geometry, T extends Geometry>
extends JudgementBase
implements FlatMapFunction<Iterator<SpatialIndex>, T>
{
public RangeFilterUsingIndex(U queryWindow, boolean considerBoundaryIntersection, boolean leftCoveredByRight)
{
super(queryWindow, considerBoundaryIntersection, leftCoveredByRight);
}
@Override
public Iterator<T> call(Iterator<SpatialIndex> treeIndexes)
throws Exception
{
assert treeIndexes.hasNext() == true;
SpatialIndex treeIndex = treeIndexes.next();
List<T> results = new ArrayList<T>();
List<T> tempResults = treeIndex.query(this.queryGeometry.getEnvelopeInternal());
for (T tempResult : tempResults) {
if (leftCoveredByRight) {
if (match(tempResult, queryGeometry)) {
results.add(tempResult);
}
}
else {
if (match(queryGeometry, tempResult)) {
results.add(tempResult);
}
}
}
return results.iterator();
}
}
注意到call
這個方法娘锁,首先在第14行SpatialIndex treeIndex = treeIndexes.next();
取出索引treeIndex
牙寞,然后首先根據(jù)查詢窗口queryGeometry
利用索引樹快速查出這個范圍內(nèi)的Geometry(因為是索引查詢,結(jié)果不是精確的)莫秆,然后從第17行開始遍歷tempResults
间雀,調(diào)用match方法,相比于直接搜索镊屎,優(yōu)勢就在于我們不在搜索整個結(jié)果集惹挟,當(dāng)數(shù)據(jù)量大的時候,是有這很大優(yōu)勢的缝驳,match
它在父類JudgementBase
定義有
public boolean match(Geometry spatialObject, Geometry queryWindow)
{
if (considerBoundaryIntersection) {
if (queryWindow.intersects(spatialObject)) { return true; }
}
else {
if (queryWindow.covers(spatialObject)) { return true; }
}
return false;
}
這里面连锯,我們可以看到第4行和第7行均是利用了JTS來判斷的,到這里用狱,就一目了然了运怖,實際上還是我們提供了match這個方法,利用Spark來計算夏伊。
到這里摇展,我們就將索引和非索引查詢分析完了,下節(jié)我們來看下Spark的另一個重要特性分區(qū)溺忧。