Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break up into subprojects, including examples #93

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Better handling of falsy boolean values in tag UDFs
- Adds `riverbank`, `stream_end`, `dam`, `weir`, `waterfall`, and `pressurised`
to the list of waterway features
- Reorganized repo to put project code into `core` subproject; introduced `examples` subproject

### Fixed

Expand All @@ -35,4 +36,3 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Mark all logger vals and some UDF vals as @transient lazy to avoid Spark serialization issues
- Properly strip leading and trailing slashes from S3 URIs when exporting vector tiles

15 changes: 13 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,22 @@ val vpExtraSettings = Seq(
// micrositeBaseUrl := "/vectorpipe"
// micrositeDocumentationUrl := "/vectorpipe/latest/api/#vectorpipe.package" /* Location of Scaladocs */

lazy val root = project
.in(file("."))
.aggregate(vectorpipe, examples)
.settings(commonSettings, vpExtraSettings)

/* Main project */
lazy val vectorpipe = project
.in(file("."))
.in(file("core"))
.settings(commonSettings, vpExtraSettings, release)

/* Example projects */
lazy val examples = project
.in(file("examples"))
.settings(commonSettings, vpExtraSettings)
.dependsOn(vectorpipe)

/* Benchmarking suite.
* Benchmarks can be executed by first switching to the `bench` project and then by running:
jmh:run -t 1 -f 1 -wi 5 -i 5 .*Bench.*
Expand All @@ -162,7 +173,7 @@ lazy val bench = project
.dependsOn(vectorpipe)
.enablePlugins(JmhPlugin)


onLoad in Global ~= (_ andThen ("project vectorpipe" :: _))


// assemblyShadeRules in assembly := {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.locationtech.jts.{geom => jts}

import scala.reflect.ClassTag

object VectorPipe {

/** Vectortile conversion options.
Expand Down Expand Up @@ -46,7 +48,10 @@ object VectorPipe {
def forAllZoomsWithSrcProjection(zoom: Int, crs: CRS) = Options(zoom, Some(0), crs, None)
}

def apply(input: DataFrame, pipeline: vectortile.Pipeline, options: Options): Unit = {
def apply[T: ClassTag](input: DataFrame, pipeline: vectortile.Pipeline, options: Options): Unit = {
import input.sparkSession.implicits._
import vectorpipe.encoders._

val geomColumn = pipeline.geometryColumn
assert(input.columns.contains(geomColumn) &&
input.schema(geomColumn).dataType.isInstanceOf[org.apache.spark.sql.jts.AbstractGeometryUDT[jts.Geometry]],
Expand Down Expand Up @@ -74,46 +79,49 @@ object VectorPipe {
SpatialKey(k.col / 2, k.row / 2) }.toSeq
}

def generateVectorTiles[G <: Geometry](df: DataFrame, level: LayoutLevel): RDD[(SpatialKey, VectorTile)] = {
def generateVectorTiles[G <: Geometry](df: DataFrame, level: LayoutLevel): Dataset[(SpatialKey, Array[Byte])] = {
val zoom = level.zoom
val clip = udf { (g: jts.Geometry, key: GenericRowWithSchema) =>
val k = getSpatialKey(key)
pipeline.clip(g, k, level)
}

val selectedGeometry = pipeline
.select(df, zoom, keyColumn)
val selectedGeometry = pipeline.select match {
case None => df
case Some(select) => select(df, zoom, keyColumn)
}

val clipped = selectedGeometry
val keyed = selectedGeometry
.withColumn(keyColumn, explode(col(keyColumn)))
.repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping
.withColumn(geomColumn, clip(col(geomColumn), col(keyColumn)))

val clipped = pipeline.clip match {
case None => keyed
case Some(clipper) =>
val clip = udf { (g: jts.Geometry, key: GenericRowWithSchema) =>
val k = getSpatialKey(key)
clipper(g, k, level)
}
val toClip = keyed.repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping
toClip.withColumn(geomColumn, clip(col(geomColumn), col(keyColumn)))
}

pipeline.layerMultiplicity match {
case SingleLayer(layerName) =>
clipped
.rdd
.map { r => (getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) }
.groupByKey
.map { case (key, feats) =>
.map { r => SingleLayerEntry(getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) }
.groupByKey(_.key)
.mapGroups { (key: SpatialKey, sleIter: Iterator[SingleLayerEntry]) =>
val ex = level.layout.mapTransform.keyToExtent(key)
key -> buildVectorTile(feats, layerName, ex, options.tileResolution, options.orderAreas)
key -> buildVectorTile(sleIter.map(_.feature).toIterable, layerName, ex, options.tileResolution, options.orderAreas).toBytes
}
case LayerNamesInColumn(layerNameCol) =>
assert(selectedGeometry.schema(layerNameCol).dataType == StringType,
s"layerMultiplicity=${pipeline.layerMultiplicity} requires String-type column of name ${layerNameCol}")

clipped
.rdd
.map { r => (getSpatialKey(r, keyColumn), r.getAs[String](layerNameCol) -> pipeline.pack(r, zoom)) }
.groupByKey
.mapPartitions{ iter: Iterator[(SpatialKey, Iterable[(String, VectorTileFeature[Geometry])])] =>
iter.map{ case (key, groupedFeatures) => {
val layerFeatures: Map[String, Iterable[VectorTileFeature[Geometry]]] =
groupedFeatures.groupBy(_._1).mapValues(_.map(_._2))
val ex = level.layout.mapTransform.keyToExtent(key)
key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas)
}}
}
.map { r => MultipleLayerEntry(getSpatialKey(r, keyColumn), r.getAs[String](layerNameCol), pipeline.pack(r, zoom)) }
.groupByKey(_.key)
.mapGroups{ (key: SpatialKey, iter: Iterator[MultipleLayerEntry]) =>
val ex = level.layout.mapTransform.keyToExtent(key)
val layerFeatures = iter.toSeq.groupBy(_.layer).mapValues(_.map(_.feature))
key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas).toBytes
}
}
}

Expand All @@ -134,16 +142,30 @@ object VectorPipe {
} else {
df
}
val simplify = udf { g: jts.Geometry => pipeline.simplify(g, level.layout) }
val reduced = pipeline
.reduce(working, level, keyColumn)
val prepared = reduced
.withColumn(geomColumn, simplify(col(geomColumn)))
val vts = generateVectorTiles(prepared, level)

val reduced = pipeline.reduce match {
case None => working
case Some(reduce) => reduce(working, level, keyColumn)
}

val simplified = pipeline.simplify match {
case None => reduced
case Some(simplifier) =>
val simplify = udf { g: jts.Geometry => simplifier(g, level.layout) }
reduced.withColumn(geomColumn, simplify(col(geomColumn)))
}

val vts = generateVectorTiles(simplified, level)
saveVectorTiles(vts, zoom, pipeline.baseOutputURI)
prepared.withColumn(keyColumn, reduceKeys(col(keyColumn)))

simplified.withColumn(keyColumn, reduceKeys(col(keyColumn)))
}

}

private case class SingleLayerEntry(key: SpatialKey, feature: VectorTileFeature[Geometry])
private case class MultipleLayerEntry(key: SpatialKey, layer: String, feature: VectorTileFeature[Geometry])

private implicit def sleEncoder: Encoder[SingleLayerEntry] = Encoders.kryo[SingleLayerEntry]
private implicit def mleEncoder: Encoder[MultipleLayerEntry] = Encoders.kryo[MultipleLayerEntry]
}
22 changes: 22 additions & 0 deletions core/src/main/scala/vectorpipe/encoders/GTEncoders.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package vectorpipe.encoders

import geotrellis.vector._
import geotrellis.vectortile._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

object GTEncoders {
implicit def gtGeometryEncoder: Encoder[Geometry] = Encoders.kryo[Geometry]
implicit def gtPointEncoder: Encoder[Point] = ExpressionEncoder()
implicit def gtMultiPointEncoder: Encoder[MultiPoint] = ExpressionEncoder()
implicit def gtLineEncoder: Encoder[Line] = ExpressionEncoder()
implicit def gtMultiLineEncoder: Encoder[MultiLine] = ExpressionEncoder()
implicit def gtPolygonEncoder: Encoder[Polygon] = ExpressionEncoder()
implicit def gtMultiPolygonEncoder: Encoder[MultiPolygon] = ExpressionEncoder()

implicit def gtFeatureEncoder[G <: Geometry, D](implicit ev1: Encoder[G], ev2: Encoder[D]): Encoder[Feature[G, D]] = Encoders.kryo[Feature[G, D]]

implicit def gtVectorTileEncoder: Encoder[VectorTile] = Encoders.kryo[VectorTile]
//implicit def gtLayerEncoder: Encoder[Layer] = Encoders.javaSerialization[Layer]
//implicit def gtStrictLayerEncoder: Encoder[StrictLayer] = Encoders.kryo[StrictLayer]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, Row}
import vectorpipe.internal.{NodeType, WayType, RelationType}
import vectorpipe.model.Member
import vectorpipe.util._

import scala.util.matching.Regex
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -217,6 +219,19 @@ package object osm {

@transient lazy val compressMemberTypes: UserDefinedFunction = udf(_compressMemberTypes, MemberSchema)

private val _uncompressMemberTypes = (members: Seq[Row]) =>
members.map { row =>
val t = row.getAs[Byte]("type") match {
case NodeType => "node"
case WayType => "way"
case RelationType => "relation"
}
val ref = row.getAs[Long]("ref")
val role = row.getAs[String]("role")

Row(t, ref, role)
}

/**
* Checks if members have byte-encoded types
*/
Expand Down Expand Up @@ -278,6 +293,7 @@ package object osm {

def isBuilding(tags: Column): Column =
!lower(coalesce(tags.getItem("building"), lit("no"))).isin(FalsyValues: _*) as 'isBuilding
//!array_contains(splitDelimitedValues(tags.getItem("building")), "no") as 'isBuilding

@transient lazy val isPOI: UserDefinedFunction = udf {
tags: Map[String, String] => POITags.intersect(tags.keySet).nonEmpty
Expand All @@ -292,8 +308,12 @@ package object osm {
def isWaterway(tags: Column): Column =
array_intersects(splitDelimitedValues(tags.getItem("waterway")), lit(WaterwayValues.toArray)) as 'isWaterway

def mergeTags: UserDefinedFunction = udf {
(_: Map[String, String]) ++ (_: Map[String, String])
def mergeTags: UserDefinedFunction = udf { (a: Map[String, String], b: Map[String, String]) =>
mergeMaps(a.mapValues(Set(_)), b.mapValues(Set(_)))(_ ++ _).mapValues(_.mkString(";"))
}

val reduceTags: UserDefinedFunction = udf { tags: Iterable[Map[String, String]] =>
tags.map(x => x.mapValues(Set(_))).reduce((a, b) => mergeMaps(a, b)(_ ++ _)).mapValues(_.mkString(";"))
}

val array_intersects: UserDefinedFunction = udf { (a: Seq[_], b: Seq[_]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@ package object functions {
private val _mergeCounts = (a: Map[String, Int], b: Map[String, Int]) =>
mergeMaps(Option(a).getOrElse(Map.empty[String, Int]),
Option(b).getOrElse(Map.empty[String, Int]))(_ + _)

val array_intersects: UserDefinedFunction = udf { (a: Seq[_], b: Seq[_]) =>
a.intersect(b).nonEmpty}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ package object internal {
} else {
@transient val idByVersion = Window.partitionBy('id).orderBy('version)

// when a node has been deleted, it doesn't include any tags; use a window function to retrieve the last tags
// present and use those
// when a node has been deleted, it doesn't include any tags or nds; use a window function to retrieve the last
// tags and nds present and use those
history
.where('type === "way")
.repartition('id)
Expand All @@ -142,7 +142,8 @@ package object internal {
when(!'visible and (lag('tags, 1) over idByVersion).isNotNull,
lag('tags, 1) over idByVersion)
.otherwise('tags) as 'tags,
$"nds.ref" as 'nds,
when(!'visible, lag($"nds.ref", 1) over idByVersion)
.otherwise($"nds.ref") as 'nds,
'changeset,
'timestamp,
(lead('timestamp, 1) over idByVersion) as 'validUntil,
Expand Down Expand Up @@ -180,17 +181,18 @@ package object internal {
history.withColumn("members", compressMemberTypes('members))
}

// when an element has been deleted, it doesn't include any tags; use a window function to retrieve the last tags
// present and use those
history
// when an element has been deleted, it doesn't include any tags or members; use a window function to retrieve
// the last tags and members present and use those
frame
.where('type === "relation")
.repartition('id)
.select(
'id,
when(!'visible and (lag('tags, 1) over idByUpdated).isNotNull,
lag('tags, 1) over idByUpdated)
.otherwise('tags) as 'tags,
'members,
when(!'visible, lag('members, 1) over idByUpdated)
.otherwise('members) as 'members,
'changeset,
'timestamp,
(lead('timestamp, 1) over idByUpdated) as 'validUntil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ trait Pipeline {
* Array[SpatialKey] giving the list of keys that the
* geometry interacts with
*/
def reduce(input: DataFrame, layoutLevel: LayoutLevel, keyColumn: String): DataFrame = input
val reduce: Option[(DataFrame, LayoutLevel, String) => DataFrame] = None

/*
* Lower complexity of geometry while moving to less resolute zoom levels.
Expand All @@ -111,7 +111,7 @@ trait Pipeline {
* is a simplifier using JTS's topology-preserving simplifier available in
* [[vectorpipe.vectortile.Simplify]].
*/
def simplify(g: jts.Geometry, layout: LayoutDefinition): jts.Geometry = g
val simplify: Option[(jts.Geometry, LayoutDefinition) => jts.Geometry] = None

/**
* Select geometries for display at a given zoom level.
Expand All @@ -129,7 +129,7 @@ trait Pipeline {
* String-typed column of the name contained in [[layerName]] to indicate
* which layer a geometry belongs in.
*/
def select(input: DataFrame, targetZoom: Int, keyColumn: String): DataFrame = input
val select: Option[(DataFrame, Int, String) => DataFrame] = None

/**
* Clip geometries prior to writing to vector tiles.
Expand All @@ -140,7 +140,7 @@ trait Pipeline {
*
* Basic (non-no-op) clipping functions can be found in [[Clipping]].
*/
def clip(geom: jts.Geometry, key: SpatialKey, layoutLevel: LayoutLevel): jts.Geometry = geom
val clip: Option[(jts.Geometry, SpatialKey, LayoutLevel) => jts.Geometry] = None

/**
* Convert table rows to output features.
Expand Down
Loading