Functional Scala Caching
雎鳩
Functional Scala Caching
Do one thing and do it well micro birds library series
libraryDependencies += "us.oyanglul" %% "jujiu" % version
Quick Started in Scala 3^{new
}
package us.oyanglul.jujiu
import us.oyanglul.jujiu.syntax.caffeine._
import us.oyanglul.jujiu.syntax.cache._
import scala.concurrent.ExecutionContext
import org.specs2.mutable.Specification
import cats.effect._
import com.github.benmanes.caffeine.cache
There are only two simple steps to use cache:
- Initiate a protocol-agnostic Cache DSL, which means the DSL only aware of what to operate, not how to
- using the DSL syntax
fetchF
,parFetchAllF
etc to describe how to use the Cache in the program given
a instance ofCache[Key, Val]
, in the example we created a Caffeine instance, which tells how exactly how to actually do the cache.class JujiuScala3Spec extends Specification: given ContextShift[IO] = IO.contextShift(ExecutionContext.global) "works with IO" >> { "normal cache" >> { val dsl: Cache[IO, cache.Cache, String, String] = new CaffeineCache[IO, String, String]{} def program(using cache.Cache[String, String]) = for _ <- IO(println("something")) _ <- dsl.putF("key1", "value1") r1 <- dsl.fetchF("key1") r2 <- dsl.fetchF("key2", _ => IO("value2")) r3 <- dsl.fetchAllF(List("key1", "key2")) r4 <- dsl.parFetchAllF[List, IO.Par](List("key1", "key2")) _ <- dsl.clearF("key1") yield (r1, r2, r3, r4) given cache.Cache[String, String] = Caffeine().sync[String, String] program.unsafeRunSync() must_== ( ( Some("value1"), "value2", List(Some("value1"), Some("value2")), List(Some("value1"), Some("value2")) ) ) } } end JujiuScala3Spec
Caffeine
package us.oyanglul.jujiu
import us.oyanglul.jujiu.syntax.caffeine._
import us.oyanglul.jujiu.syntax.cache._
import cats.{Applicative}
import cats.data.Kleisli
import java.util.concurrent.CompletableFuture
import scala.concurrent.ExecutionContext
import org.specs2.mutable.Specification
import cats.instances.list._
import cats.syntax.all._
import cats.effect._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import com.github.benmanes.caffeine.cache
import us.oyanglul.jujiu.syntax.CaffeineSyntax
class JujiuSpec extends Specification with org.specs2.mock.Mockito {
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
<<get_set_cache>>
<<async_load_failure>>
<<catsio_cache>>
<<catsio_loading_cache>>
<<caffeine_builder>>
<<tagless_final>>
<<readerT>>
<<redis>>
}
1: "it should able to get and set cache" >> {
2: object cacheDsl extends CaffeineCache[IO, String, String] // <- (dsl)
3: val program = for {
4: r1 <- cacheDsl.fetch("not exist yet") // <- (fetch)
5: r2 <- cacheDsl.fetch("not exist yet", _ => IO("default")) // <- (fetchOr)
6: _ <- cacheDsl.put("not exist yet", "now exist") // <- (put)
7: r3 <- cacheDsl.fetch("not exist yet")
8: _ <- cacheDsl.clear("not exist yet")
9: r4 <- cacheDsl.fetch("not exist yet")
10: } yield (r1, r2, r3, r4)
11: program(Caffeine().sync) // <- (run)
12: .unsafeRunSync() must_== ((None, "default", Some("now exist"), None))
13: }
"it should IO error when async load failure" >> {
object dsl extends CaffeineAsyncCache[IO, String, String] {
implicit val executionContext = global
}
val program = for {
r1 <- dsl.fetch("not exist yet")
r2 <- dsl.fetch("not exist yet", _ => IO("default"))
} yield (r1, r2)
val failCache = mock[cache.AsyncCache[String, String]]
failCache.getIfPresent("not exist yet") returns CompletableFuture.supplyAsync(() => IO.raiseError[String](new Exception("cache load error")).unsafeRunSync())
program(
failCache
).unsafeRunSync() must throwA[Exception](message = "cache load error")
}
This README is a literal programming file, all code here will generate the test file
I can walk you through line by line though
- line-dsl creates an instance of
CaffeineCache
which has side effectIO
, key isString
and value isString
as well - line-fetch won't acutally trigger any effect, it just returns a
DSL, represent as type
Klesili[IO, Cache, String]
which in English, "give me aCache
and I can provide you anIO[String]
" - line-fetchOr is new
fetch
DSL, the second parameter is a functionK => IO[V]
, if cache not exist, it will run the function can put the result into the cache, and return the value - line-put will update the value of key "not exist yet" to "overrided"
- line-run is the Scala idiomatic syntax to build synchronize
Caffeine Cache
if you still recall that theprogram
is actuallyKlesili[IO, Cache, String]
so now
I provide it aCache
byprogram(Caffeine().sync)
it shall return me aIO[String]
.unsafeRunSync()
the IO and all effects you described before inprogram
will be triggered
and you will get the actual result
works with Cats IO
Jujiu has very flexible DSL, If you don't like Kleisli, it works with IO(technically you IO type just need to be a Async
) as well
what you'll need to import some syntax
import us.oyanglul.jujiu.syntax.cache._
"works with IO" >> {
"normal cache" >> {
val c: Cache[IO, cache.Cache, String, String] = new CaffeineCache[IO, String, String] {}
implicit val cacheProvider: cache.Cache[String, String] = Caffeine().sync[String, String]
def program =
for {
_ <- IO(println("something"))
_ <- c.putF("key1", "value1")
r1 <- c.fetchF("key1")
r2 <- c.fetchF("key2", _ => IO("value2"))
r3 <- c.fetchAllF(List("key1", "key2"))
r4 <- c.parFetchAllF[List, IO.Par](List("key1", "key2"))
_ <- c.clearF("key1")
} yield (r1, r2, r3, r4)
program.unsafeRunSync() must_== (
(
Some("value1"),
"value2",
List(Some("value1"), Some("value2")),
List(Some("value1"), Some("value2"))
)
)
}
and provide cacheProvider
implicitly, since you are not using Kleisli, you need to tell what cache
these DSLs will run on
"loading cache" >> {
val c: LoadingCache[IO, cache.LoadingCache, String, String] = new CaffeineLoadingCache[IO, String, String] {}
implicit val cacheProvider: cache.LoadingCache[String, String] = Caffeine().sync(identity)
def program =
for {
_ <- IO(println("something"))
r1 <- c.fetchF("1")
r2 <- c.fetchAllF(List("2", "3"))
r3 <- c.parFetchAllF[List, IO.Par](List("4", "5"))
} yield (r1, r2, r3)
program.unsafeRunSync() must_== (("1", List("2", "3"), List("4", "5")))
}
}
similar to
ExecutionContext
, you need to provide context the thread can run on
and all dsl suffix with F
idiomatic syntax for Caffeine builder
Dealing with Java DSL and Java Future is too verbose and painful in Scala project
Let's see how Jiujiu makes Caffeine friendly to Cats IO as well
A good example is the Async Loading Cache
First you will need caffeine builder syntax
import us.oyanglul.jujiu.syntax.caffeine._
"it should able to get and set async loading cache" >> {
object cache extends CaffeineAsyncLoadingCache[IO, Integer, String] {
implicit val executionContext = global // <-- (executionContext)
}
val program = for {
r1 <- cache.fetch(1)
r2 <- cache.fetch(2)
r3 <- cache.fetchAll(List[Integer](1, 2, 3))
} yield (r1, r2, r3)
val caffeineA: com.github.benmanes.caffeine.cache.AsyncLoadingCache[Integer, String] = Caffeine()
.executionContext(global) // <-- (global)
.withExpire( // <-- (expire)
(_: Integer, _: String) => 1.second,
(_: Integer, _: String, currentDuration: FiniteDuration) => currentDuration,
(_: Integer, _: String, currentDuration: FiniteDuration) => currentDuration
)
.async((key: Integer) => IO("async string" + key)) // <-- (async)
val caffeineB = Caffeine()
.withExpireAfterAccess(1.second)
.withExpireAfterWrite(2.seconds)
.withRefreshAfterWrite(3.seconds)
.async((key: Integer) => IO("async string" + key))
val expected = (
"async string1",
"async string2",
List("async string1", "async string2", "async string3")
)
program(caffeineA).unsafeRunSync() must_== expected
program(caffeineB).unsafeRunSync() must_== expected
program(Caffeine().async(_ => IO.raiseError(new Exception("something wrong"))))
.unsafeRunSync() must throwA[Exception]
}
- line-executionContext Async Loading Cache need an Execution Context to execute the Java Future things
- line-global
.executionContext(global)
will make sure the cache using Scala execution context as default to execute java future, otherwise its default java folk join pool. alternatively you can also use Akka's execution context. - line-expire default the expiring policy, here it's more Scala idiomatic
lambda and
Duration
- line-async will create an
async loading cache.
the async loading function that it will use is
K => IO[V]
so you don't need to deal with awful Java Future.
Works with Tagless Final
No matter what style of effect abstraction you project is using, Jujiu can easily fit in
i.e. Tagless Final
"works with tagless final" >> {
trait LogDsl[F[_]] {
def log(msg: String): F[Unit]
}
type ProgramDsl[F[_]] = CaffeineCache[F, String, String] with LogDsl[F]
def program[F[_]: Async](dsl: ProgramDsl[F])
(implicit ev: cache.Cache[String, String]): F[Option[String]] =
for {
value <- dsl.fetchF("key")
_ <- dsl.log("something")
} yield value
{
object dsl extends CaffeineCache[IO, String, String] with LogDsl[IO] {
def log(msg: String) = IO(println(msg))
}
implicit val cacheProvider: cache.Cache[String, String] = Caffeine().sync[String, String]
program[IO](dsl).unsafeRunSync() must_== None
}
}
just extends CaffeineCache[F, K, V]
and provide cacheProvider
ReaderT Pattern
if your code is in ReaderT pattern, good, it will fit in more naturally
"works with tagless final style readerT" >> {
// Layer 1: Environment
trait HasLogger {
def logger: String => Unit
}
trait HasCacheProvider {
def cacheProvider: cache.Cache[String, String]
}
type Env = HasLogger with HasCacheProvider
// Layer 2: DSL
trait LogDsl[F[_]] {
def log(msg: String)(implicit M: Applicative[F]): Kleisli[F, Env, Unit] = Kleisli(a => M.pure(a.logger(msg)))
}
type Dsl[F[_]] = CaffeineCache[F, String, String] with LogDsl[F]
// Layer 3: Business
def program[F[_]](dsl: Dsl[F])(
implicit ev: Async[F]
) =
for {
_ <- dsl.log("something")
value <- dsl.fetch("key").local[Env](_.cacheProvider)
} yield value
object dsl extends CaffeineCache[IO, String, String] with LogDsl[IO]
program[IO](dsl)
.run(new HasLogger with HasCacheProvider {
def logger = println
def cacheProvider = Caffeine().sync
})
.unsafeRunSync() must_== None
}
notice that proper contravariant adapt need .local[Env](_.cacheProvider)
Extensible
it's extensible by design as Kleisli, if you provider another cache provider, the same dsl will work.
"run on redis" >> {
import redis.clients.jedis._
def program[F[_]: Async, S[_, _]](dsl: Cache[F, S, String, String]) = for {
r1 <- dsl.fetch("not exist yet")
r2 <- dsl.fetch("not exist yet", _ => Async[F].delay("default"))
_ <- dsl.put("not exist yet", "now exist")
r3 <- dsl.fetch("not exist yet")
_ <- dsl.clear("not exist yet")
r4 <- dsl.fetch("not exist yet")
} yield (r1, r2, r3, r4)
type J[A, B] = Jedis
object dsl extends Cache[IO, J, String, String] {
def put(k: String, v: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Unit] =
Kleisli { redis =>
M.delay{
redis.set(k, v)
()
}
}
def fetch(k: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Option[String]] =
Kleisli(redis => M.delay(Option(redis.get(k))))
def clear(k: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Unit] =
Kleisli(redis => M.delay{
redis.del(k)
()
})
}
program(dsl).run(
new Jedis("localhost")
).unsafeRunSync() must_== ((None, "default", Some("now exist"), None))
}.pendingUntilFixed("Redis")