Skip to content

Commit

Permalink
Reorganize to include examples
Browse files Browse the repository at this point in the history
  • Loading branch information
jpolchlo committed Jun 7, 2019
1 parent 559d1f6 commit b7230f4
Show file tree
Hide file tree
Showing 108 changed files with 651 additions and 53 deletions.
15 changes: 13 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,22 @@ val vpExtraSettings = Seq(
// micrositeBaseUrl := "/vectorpipe"
// micrositeDocumentationUrl := "/vectorpipe/latest/api/#vectorpipe.package" /* Location of Scaladocs */

lazy val root = project
.in(file("."))
.aggregate(vectorpipe, examples)
.settings(commonSettings, vpExtraSettings)

/* Main project */
lazy val vectorpipe = project
.in(file("."))
.in(file("core"))
.settings(commonSettings, vpExtraSettings, release)

/* Example projects */
lazy val examples = project
.in(file("examples"))
.settings(commonSettings, vpExtraSettings)
.dependsOn(vectorpipe)

/* Benchmarking suite.
* Benchmarks can be executed by first switching to the `bench` project and then by running:
jmh:run -t 1 -f 1 -wi 5 -i 5 .*Bench.*
Expand All @@ -162,7 +173,7 @@ lazy val bench = project
.dependsOn(vectorpipe)
.enablePlugins(JmhPlugin)


onLoad in Global ~= (_ andThen ("project vectorpipe" :: _))


// assemblyShadeRules in assembly := {
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.locationtech.jts.{geom => jts}

import scala.reflect.ClassTag

object VectorPipe {

/** Vectortile conversion options.
Expand Down Expand Up @@ -46,7 +48,10 @@ object VectorPipe {
def forAllZoomsWithSrcProjection(zoom: Int, crs: CRS) = Options(zoom, Some(0), crs, None)
}

def apply(input: DataFrame, pipeline: vectortile.Pipeline, options: Options): Unit = {
def apply[T: ClassTag](input: DataFrame, pipeline: vectortile.Pipeline, options: Options): Unit = {
import input.sparkSession.implicits._
import vectorpipe.encoders._

val geomColumn = pipeline.geometryColumn
assert(input.columns.contains(geomColumn) &&
input.schema(geomColumn).dataType.isInstanceOf[org.apache.spark.sql.jts.AbstractGeometryUDT[jts.Geometry]],
Expand Down Expand Up @@ -74,46 +79,49 @@ object VectorPipe {
SpatialKey(k.col / 2, k.row / 2) }.toSeq
}

def generateVectorTiles[G <: Geometry](df: DataFrame, level: LayoutLevel): RDD[(SpatialKey, VectorTile)] = {
def generateVectorTiles[G <: Geometry](df: DataFrame, level: LayoutLevel): Dataset[(SpatialKey, Array[Byte])] = {
val zoom = level.zoom
val clip = udf { (g: jts.Geometry, key: GenericRowWithSchema) =>
val k = getSpatialKey(key)
pipeline.clip(g, k, level)
}

val selectedGeometry = pipeline
.select(df, zoom, keyColumn)
val selectedGeometry = pipeline.select match {
case None => df
case Some(select) => select(df, zoom, keyColumn)
}

val clipped = selectedGeometry
val keyed = selectedGeometry
.withColumn(keyColumn, explode(col(keyColumn)))
.repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping
.withColumn(geomColumn, clip(col(geomColumn), col(keyColumn)))

val clipped = pipeline.clip match {
case None => keyed
case Some(clipper) =>
val clip = udf { (g: jts.Geometry, key: GenericRowWithSchema) =>
val k = getSpatialKey(key)
clipper(g, k, level)
}
val toClip = keyed.repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping
toClip.withColumn(geomColumn, clip(col(geomColumn), col(keyColumn)))
}

pipeline.layerMultiplicity match {
case SingleLayer(layerName) =>
clipped
.rdd
.map { r => (getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) }
.groupByKey
.map { case (key, feats) =>
.map { r => SingleLayerEntry(getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) }
.groupByKey(_.key)
.mapGroups { (key: SpatialKey, sleIter: Iterator[SingleLayerEntry]) =>
val ex = level.layout.mapTransform.keyToExtent(key)
key -> buildVectorTile(feats, layerName, ex, options.tileResolution, options.orderAreas)
key -> buildVectorTile(sleIter.map(_.feature).toIterable, layerName, ex, options.tileResolution, options.orderAreas).toBytes
}
case LayerNamesInColumn(layerNameCol) =>
assert(selectedGeometry.schema(layerNameCol).dataType == StringType,
s"layerMultiplicity=${pipeline.layerMultiplicity} requires String-type column of name ${layerNameCol}")

clipped
.rdd
.map { r => (getSpatialKey(r, keyColumn), r.getAs[String](layerNameCol) -> pipeline.pack(r, zoom)) }
.groupByKey
.mapPartitions{ iter: Iterator[(SpatialKey, Iterable[(String, VectorTileFeature[Geometry])])] =>
iter.map{ case (key, groupedFeatures) => {
val layerFeatures: Map[String, Iterable[VectorTileFeature[Geometry]]] =
groupedFeatures.groupBy(_._1).mapValues(_.map(_._2))
val ex = level.layout.mapTransform.keyToExtent(key)
key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas)
}}
}
.map { r => MultipleLayerEntry(getSpatialKey(r, keyColumn), r.getAs[String](layerNameCol), pipeline.pack(r, zoom)) }
.groupByKey(_.key)
.mapGroups{ (key: SpatialKey, iter: Iterator[MultipleLayerEntry]) =>
val ex = level.layout.mapTransform.keyToExtent(key)
val layerFeatures = iter.toSeq.groupBy(_.layer).mapValues(_.map(_.feature))
key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas).toBytes
}
}
}

Expand All @@ -134,16 +142,30 @@ object VectorPipe {
} else {
df
}
val simplify = udf { g: jts.Geometry => pipeline.simplify(g, level.layout) }
val reduced = pipeline
.reduce(working, level, keyColumn)
val prepared = reduced
.withColumn(geomColumn, simplify(col(geomColumn)))
val vts = generateVectorTiles(prepared, level)

val reduced = pipeline.reduce match {
case None => working
case Some(reduce) => reduce(working, level, keyColumn)
}

val simplified = pipeline.simplify match {
case None => reduced
case Some(simplifier) =>
val simplify = udf { g: jts.Geometry => simplifier(g, level.layout) }
reduced.withColumn(geomColumn, simplify(col(geomColumn)))
}

val vts = generateVectorTiles(simplified, level)
saveVectorTiles(vts, zoom, pipeline.baseOutputURI)
prepared.withColumn(keyColumn, reduceKeys(col(keyColumn)))

simplified.withColumn(keyColumn, reduceKeys(col(keyColumn)))
}

}

private case class SingleLayerEntry(key: SpatialKey, feature: VectorTileFeature[Geometry])
private case class MultipleLayerEntry(key: SpatialKey, layer: String, feature: VectorTileFeature[Geometry])

private implicit def sleEncoder: Encoder[SingleLayerEntry] = Encoders.kryo[SingleLayerEntry]
private implicit def mleEncoder: Encoder[MultipleLayerEntry] = Encoders.kryo[MultipleLayerEntry]
}
22 changes: 22 additions & 0 deletions core/src/main/scala/vectorpipe/encoders/GTEncoders.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package vectorpipe.encoders

import geotrellis.vector._
import geotrellis.vectortile._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

object GTEncoders {
implicit def gtGeometryEncoder: Encoder[Geometry] = Encoders.kryo[Geometry]
implicit def gtPointEncoder: Encoder[Point] = ExpressionEncoder()
implicit def gtMultiPointEncoder: Encoder[MultiPoint] = ExpressionEncoder()
implicit def gtLineEncoder: Encoder[Line] = ExpressionEncoder()
implicit def gtMultiLineEncoder: Encoder[MultiLine] = ExpressionEncoder()
implicit def gtPolygonEncoder: Encoder[Polygon] = ExpressionEncoder()
implicit def gtMultiPolygonEncoder: Encoder[MultiPolygon] = ExpressionEncoder()

implicit def gtFeatureEncoder[G <: Geometry, D](implicit ev1: Encoder[G], ev2: Encoder[D]): Encoder[Feature[G, D]] = Encoders.kryo[Feature[G, D]]

implicit def gtVectorTileEncoder: Encoder[VectorTile] = Encoders.kryo[VectorTile]
//implicit def gtLayerEncoder: Encoder[Layer] = Encoders.javaSerialization[Layer]
//implicit def gtStrictLayerEncoder: Encoder[StrictLayer] = Encoders.kryo[StrictLayer]
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ package object internal {

// when an element has been deleted, it doesn't include any tags; use a window function to retrieve the last tags
// present and use those
history
frame
.where('type === "relation")
.repartition('id)
.select(
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ trait Pipeline {
* Array[SpatialKey] giving the list of keys that the
* geometry interacts with
*/
def reduce(input: DataFrame, layoutLevel: LayoutLevel, keyColumn: String): DataFrame = input
val reduce: Option[(DataFrame, LayoutLevel, String) => DataFrame] = None

/*
* Lower complexity of geometry while moving to less resolute zoom levels.
Expand All @@ -111,7 +111,7 @@ trait Pipeline {
* is a simplifier using JTS's topology-preserving simplifier available in
* [[vectorpipe.vectortile.Simplify]].
*/
def simplify(g: jts.Geometry, layout: LayoutDefinition): jts.Geometry = g
val simplify: Option[(jts.Geometry, LayoutDefinition) => jts.Geometry] = None

/**
* Select geometries for display at a given zoom level.
Expand All @@ -129,7 +129,7 @@ trait Pipeline {
* String-typed column of the name contained in [[layerName]] to indicate
* which layer a geometry belongs in.
*/
def select(input: DataFrame, targetZoom: Int, keyColumn: String): DataFrame = input
val select: Option[(DataFrame, Int, String) => DataFrame] = None

/**
* Clip geometries prior to writing to vector tiles.
Expand All @@ -140,7 +140,7 @@ trait Pipeline {
*
* Basic (non-no-op) clipping functions can be found in [[Clipping]].
*/
def clip(geom: jts.Geometry, key: SpatialKey, layoutLevel: LayoutLevel): jts.Geometry = geom
val clip: Option[(jts.Geometry, SpatialKey, LayoutLevel) => jts.Geometry] = None

/**
* Convert table rows to output features.
Expand Down
119 changes: 119 additions & 0 deletions core/src/main/scala/vectorpipe/vectortile/export/SaveToHadoop.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.spark.render._
import geotrellis.spark.SpatialKey

import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql._

import scala.collection.concurrent.TrieMap

object SaveToHadoop {
/** Saves records from an iterator and returns them unchanged.
*
* @param recs Key, Value records to be saved
* @param keyToUri A function from K (a key) to Hadoop URI
* @param toBytes A function from record to array of bytes
* @param conf Hadoop Configuration to used to get FileSystem
*/
def saveIterator[K, V](
recs: Iterator[(K, V)],
keyToUri: K => String,
conf: Configuration
)(toBytes: (K, V) => Array[Byte]): Iterator[(K, V)] = {
val fsCache = TrieMap.empty[String, FileSystem]

for ( row @ (key, data) <- recs ) yield {
val path = keyToUri(key)
val uri = new URI(path)
val fs = fsCache.getOrElseUpdate(
uri.getScheme,
FileSystem.get(uri, conf))
val out = fs.create(new Path(path))
try { out.write(toBytes(key, data)) }
finally { out.close() }
row
}
}

/**
* Sets up saving to Hadoop, but returns an RDD so that writes can
* be chained.
*
* @param keyToUri A function from K (a key) to a Hadoop URI
*/
def setup[K](
dataset: Dataset[(K, Array[Byte])],
keyToUri: K => String
)(implicit ev: Encoder[(K, Array[Byte])]): Dataset[(K, Array[Byte])] = {
import dataset.sparkSession.implicits._
dataset.mapPartitions { partition =>
saveIterator(partition, keyToUri, new Configuration){ (k, v) => v }
}
}

/**
* Sets up saving to Hadoop, but returns an RDD so that writes can
* be chained.
*
* @param keyToUri A function from K (a key) to a Hadoop URI
* @param toBytes A function from record to array of bytes
*/
def setup[K, V](
dataset: Dataset[(K, V)],
keyToUri: K => String,
toBytes: (K, V) => Array[Byte]
)(implicit ev: Encoder[(K, V)]): Dataset[(K, V)] = {
import dataset.sparkSession.implicits._
val conf = dataset.sparkSession.sparkContext.hadoopConfiguration
dataset.mapPartitions { partition =>
saveIterator(partition, keyToUri, new Configuration)(toBytes)
}
}

/**
* Saves to Hadoop FileSystem, returns an count of records saved.
*
* @param keyToUri A function from K (a key) to a Hadoop URI
*/
def apply[K](
dataset: Dataset[(K, Array[Byte])],
keyToUri: K => String
)( implicit ev: Encoder[(K, Array[Byte])]): Long = {
import dataset.sparkSession.implicits._
setup(dataset, keyToUri).count
}

/**
* Saves to Hadoop FileSystem, returns an count of records saved.
*
* @param keyToUri A function from K (a key) to a Hadoop URI
* @param toBytes A function from record to array of bytes
*/
def apply[K, V](
dataset: Dataset[(K, V)],
keyToUri: K => String,
toBytes: (K, V) => Array[Byte]
)(implicit ev: Encoder[(K, V)]): Long = {
import dataset.sparkSession.implicits._
setup(dataset, keyToUri, toBytes).count
}
}
Loading

0 comments on commit b7230f4

Please sign in to comment.