Skip to content

Commit

Permalink
Add invalidation for the schema-list cache (close #215)
Browse files Browse the repository at this point in the history
  • Loading branch information
voropaevp committed Nov 23, 2022
1 parent f667a7b commit ddf2016
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,36 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
* Get list of available schemas for particular vendor and name part
* Server supposed to return them in proper order
*/
def listSchemasResult(vendor: Vendor, name: Name, model: Model)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[ResolutionError, SchemaListLookupResult]] =
listSchemasResult(vendor, name, model, None)

/**
* Vendor, name, model are extracted from supplied schema key to call on the `listSchemas`. The important difference
* from `listSchemas` is that it would invalidate cache, if returned list did not contain SchemaKey supplied in
* argument. Making it a safer option is latest schema bound is known.
*/
def listSchemasLikeResult(schemaKey: SchemaKey)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[ResolutionError, SchemaListLookupResult]] =
listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model, Some(schemaKey))

/**
* Get list of available schemas for particular vendor and name part
* Has an extra argument `mustIncludeKey` which is used to invalidate cache if SchemaKey supplied in it is not in the
* list.
* Server supposed to return them in proper order
*/
def listSchemasResult(
vendor: Vendor,
name: Name,
model: Model
model: Model,
mustIncludeKey: Option[SchemaKey] = None
)(implicit
F: Monad[F],
L: RegistryLookup[F],
Expand Down Expand Up @@ -140,7 +166,11 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac

getSchemaListFromCache(vendor, name, model).flatMap {
case Some(TimestampedItem(Right(schemaList), timestamp)) =>
Monad[F].pure(Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp)))
if (mustIncludeKey.forall(schemaList.schemas.contains))
Monad[F].pure(Right(ResolverResult.Cached((vendor, name, model), schemaList, timestamp)))
else
traverseRepos[F, SchemaList](get, prioritize(vendor, allRepos.toList), Map.empty)
.flatMap(handleAfterFetch)
case Some(TimestampedItem(Left(failures), _)) =>
retryCached[F, SchemaList](get, vendor)(failures)
.flatMap(handleAfterFetch)
Expand All @@ -165,6 +195,19 @@ final case class Resolver[F[_]](repos: List[Registry], cache: Option[ResolverCac
): F[Either[ResolutionError, SchemaList]] =
listSchemasResult(vendor, name, model).map(_.map(_.value))

/**
* Vendor, name, model are extracted from supplied schema key to call on the `listSchemas`. The important difference
* from `listSchemas` is that it would invalidate cache, if returned list did not contain SchemaKey supplied in
* argument. Making it a safer option is latest schema bound is known.
*/
def listSchemasLike(schemaKey: SchemaKey)(implicit
F: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): F[Either[ResolutionError, SchemaList]] =
listSchemasResult(schemaKey.vendor, schemaKey.name, schemaKey.version.model, Some(schemaKey))
.map(_.map(_.value))

/** Get list of full self-describing schemas available on Iglu Server for particular vendor/name pair */
def fetchSchemas(
vendor: Vendor,
Expand Down Expand Up @@ -365,6 +408,7 @@ object Resolver {

result.value
}

def parseConfig(
config: Json
): Either[DecodingFailure, ResolverConfig] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
*/
package com.snowplowanalytics.iglu.client.resolver

import java.net.URI
import com.snowplowanalytics.iglu.core.SchemaList

import java.time.Instant
import java.net.URI
import scala.collection.immutable.SortedMap
import scala.concurrent.duration.DurationInt

// Cats
import cats.Id
import cats.effect.IO
import cats.implicits._
import cats.syntax.all._

// circe
import io.circe.Json
Expand Down Expand Up @@ -69,6 +71,7 @@ class ResolverSpec extends Specification with CatsEffect {
a Resolver should accumulate errors from all repositories $e8
we can construct a Resolver from a valid resolver 1-0-2 configuration JSON $e10
a Resolver should cache SchemaLists with different models separately $e11
a Resolver should use schemaKey provided in SchemaListLike for result validation $e12
"""

import ResolverSpec._
Expand Down Expand Up @@ -404,4 +407,49 @@ class ResolverSpec extends Specification with CatsEffect {
case _ => ko("Unexpected result for two consequent listSchemas")
}
}

def e12 = {
val IgluCentralServer = Registry.Http(
Registry.Config("Iglu Central EU1", 0, List("com.snowplowanalytics")),
Registry
.HttpConnection(URI.create("https://com-iglucentral-eu1-prod.iglu.snplow.net/api"), None)
)

val schema100 = SchemaKey(
"com.snowplowanalytics.snowplow",
"link_click",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)
val schema101 = SchemaKey(
"com.snowplowanalytics.snowplow",
"link_click",
"jsonschema",
SchemaVer.Full(1, 0, 1)
)

val resolverRef = Resolver.init[Id](10, None, IgluCentralServer)
val resolver = resolverRef.map(res =>
new Resolver(
res.repos,
res.cache.flatMap { c =>
c.putSchemaList(
"com.snowplowanalytics.snowplow",
"link_click",
1,
SchemaList(List(schema100)).asRight
)
c.some
}
)
)

val resultOne = resolver.listSchemasLike(schema100)
val resultTwo = resolver.listSchemasLike(schema101)

resultOne must beRight(SchemaList(List(schema100)))
resultTwo.map(s => s.copy(schemas = s.schemas.take(2))) must beRight(
SchemaList(List(schema100, schema101))
)
}
}

0 comments on commit ddf2016

Please sign in to comment.