HOME | EDIT | RSS | INDEX | ABOUT | GITHUB

Functional Scala Caching

雎鳩ju jiu

Functional Scala Caching

badge.svg latest.svg?v=1 jujiu_2.13.svg?label=document badge.svg

Do one thing and do it well micro birds library series

val version = latest.svg?v=1

libraryDependencies += "us.oyanglul" %% "jujiu" % version

Quick Started in Scala 3^{new}

package us.oyanglul.jujiu

import us.oyanglul.jujiu.syntax.caffeine._
import us.oyanglul.jujiu.syntax.cache._
import scala.concurrent.ExecutionContext
import org.specs2.mutable.Specification
import cats.effect._
import com.github.benmanes.caffeine.cache

There are only two simple steps to use cache:

  1. Initiate a protocol-agnostic Cache DSL, which means the DSL only aware of what to operate, not how to
  2. using the DSL syntax fetchF, parFetchAllF etc to describe how to use the Cache in the program
  3. given a instance of Cache[Key, Val], in the example we created a Caffeine instance, which tells how exactly how to actually do the cache.

    class JujiuScala3Spec extends Specification:
      given ContextShift[IO] = IO.contextShift(ExecutionContext.global)
      "works with IO" >> {
        "normal cache" >> {
          val dsl: Cache[IO, cache.Cache, String, String] = new CaffeineCache[IO, String, String]{}
    
          def program(using cache.Cache[String, String]) =
            for
              _ <- IO(println("something"))
              _ <- dsl.putF("key1", "value1")
              r1 <- dsl.fetchF("key1")
              r2 <- dsl.fetchF("key2", _ => IO("value2"))
              r3 <- dsl.fetchAllF(List("key1", "key2"))
              r4 <- dsl.parFetchAllF[List, IO.Par](List("key1", "key2"))
              _ <- dsl.clearF("key1")
            yield (r1, r2, r3, r4)
    
          given cache.Cache[String, String] = Caffeine().sync[String, String]
          program.unsafeRunSync() must_== (
            (
              Some("value1"),
              "value2",
              List(Some("value1"), Some("value2")),
              List(Some("value1"), Some("value2"))
            )
          )
        }
      }
    end JujiuScala3Spec
    

cats-badge-tiny.png Caffeine

package us.oyanglul.jujiu
import us.oyanglul.jujiu.syntax.caffeine._
import us.oyanglul.jujiu.syntax.cache._
import cats.{Applicative}
import cats.data.Kleisli
import java.util.concurrent.CompletableFuture
import scala.concurrent.ExecutionContext
import org.specs2.mutable.Specification
import cats.instances.list._
import cats.syntax.all._
import cats.effect._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import com.github.benmanes.caffeine.cache
import us.oyanglul.jujiu.syntax.CaffeineSyntax

class JujiuSpec extends Specification with org.specs2.mock.Mockito {
  implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
  <<get_set_cache>>
  <<async_load_failure>>
  <<catsio_cache>>
  <<catsio_loading_cache>>
  <<caffeine_builder>>
  <<tagless_final>>
  <<readerT>>
  <<redis>>
}
 1: "it should able to get and set cache" >> {
 2:   object cacheDsl extends CaffeineCache[IO, String, String]   // <- (dsl)
 3:   val program = for {
 4:     r1 <- cacheDsl.fetch("not exist yet")                     // <- (fetch)
 5:     r2 <- cacheDsl.fetch("not exist yet", _ => IO("default")) // <- (fetchOr)
 6:     _ <- cacheDsl.put("not exist yet", "now exist")           // <- (put)
 7:     r3 <- cacheDsl.fetch("not exist yet")
 8:     _ <- cacheDsl.clear("not exist yet")
 9:     r4 <- cacheDsl.fetch("not exist yet")
10:   } yield (r1, r2, r3, r4)
11:   program(Caffeine().sync)                                    // <- (run)
12:     .unsafeRunSync() must_== ((None, "default", Some("now exist"), None))
13: }
"it should IO error when async load failure" >> {
  object dsl extends CaffeineAsyncCache[IO, String, String] {
    implicit val executionContext = global
  }
  val program = for {
    r1 <- dsl.fetch("not exist yet")
    r2 <- dsl.fetch("not exist yet", _ => IO("default"))
  } yield (r1, r2)

  val failCache = mock[cache.AsyncCache[String, String]]
  failCache.getIfPresent("not exist yet") returns CompletableFuture.supplyAsync(() => IO.raiseError[String](new Exception("cache load error")).unsafeRunSync())

  program(
    failCache
  ).unsafeRunSync() must throwA[Exception](message = "cache load error")
}

This README is a literal programming file, all code here will generate the test file

I can walk you through line by line though

  • line-dsl creates an instance of CaffeineCache which has side effect IO, key is String and value is String as well
  • line-fetch won't acutally trigger any effect, it just returns a DSL, represent as type Klesili[IO, Cache, String] which in English, "give me a Cache and I can provide you an IO[String]"
  • line-fetchOr is new fetch DSL, the second parameter is a function K => IO[V], if cache not exist, it will run the function can put the result into the cache, and return the value
  • line-put will update the value of key "not exist yet" to "overrided"
  • line-run is the Scala idiomatic syntax to build synchronize Caffeine Cache
    if you still recall that the program is actually Klesili[IO, Cache, String] so now
    I provide it a Cache by program(Caffeine().sync)
    it shall return me a IO[String] .unsafeRunSync() the IO and all effects you described before in program will be triggered
    and you will get the actual result

works with Cats IO

Jujiu has very flexible DSL, If you don't like Kleisli, it works with IO(technically you IO type just need to be a Async) as well

what you'll need to import some syntax

import us.oyanglul.jujiu.syntax.cache._
"works with IO" >> {
  "normal cache" >> {
    val c: Cache[IO, cache.Cache, String, String] = new CaffeineCache[IO, String, String] {}
    implicit val cacheProvider: cache.Cache[String, String] = Caffeine().sync[String, String]
    def program =
      for {
        _ <- IO(println("something"))
        _ <- c.putF("key1", "value1")
        r1 <- c.fetchF("key1")
        r2 <- c.fetchF("key2", _ => IO("value2"))
        r3 <- c.fetchAllF(List("key1", "key2"))
        r4 <- c.parFetchAllF[List, IO.Par](List("key1", "key2"))
        _ <- c.clearF("key1")
      } yield (r1, r2, r3, r4)
    program.unsafeRunSync() must_== (
      (
        Some("value1"),
        "value2",
        List(Some("value1"), Some("value2")),
        List(Some("value1"), Some("value2"))
      )
    )
  }

and provide cacheProvider implicitly, since you are not using Kleisli, you need to tell what cache these DSLs will run on

  "loading cache" >> {
    val c: LoadingCache[IO, cache.LoadingCache, String, String] = new CaffeineLoadingCache[IO, String, String] {}
    implicit val cacheProvider: cache.LoadingCache[String, String] = Caffeine().sync(identity)
    def program =
      for {
        _ <- IO(println("something"))
        r1 <- c.fetchF("1")
        r2 <- c.fetchAllF(List("2", "3"))
        r3 <- c.parFetchAllF[List, IO.Par](List("4", "5"))
      } yield (r1, r2, r3)
    program.unsafeRunSync() must_== (("1", List("2", "3"), List("4", "5")))
  }
}

similar to ExecutionContext, you need to provide context the thread can run on

and all dsl suffix with F

idiomatic syntax for Caffeine builder

Dealing with Java DSL and Java Future is too verbose and painful in Scala project

Let's see how Jiujiu makes Caffeine friendly to Cats IO as well

A good example is the Async Loading Cache

First you will need caffeine builder syntax

import us.oyanglul.jujiu.syntax.caffeine._
"it should able to get and set async loading cache" >> {
  object cache extends CaffeineAsyncLoadingCache[IO, Integer, String] {
    implicit val executionContext = global // <-- (executionContext)
  }

  val program = for {
    r1 <- cache.fetch(1)
    r2 <- cache.fetch(2)
    r3 <- cache.fetchAll(List[Integer](1, 2, 3))
  } yield (r1, r2, r3)

  val caffeineA: com.github.benmanes.caffeine.cache.AsyncLoadingCache[Integer, String] = Caffeine()
    .executionContext(global) // <-- (global)
    .withExpire( // <-- (expire)
      (_: Integer, _: String) => 1.second,
      (_: Integer, _: String, currentDuration: FiniteDuration) => currentDuration,
      (_: Integer, _: String, currentDuration: FiniteDuration) => currentDuration
    )
    .async((key: Integer) => IO("async string" + key)) // <-- (async)

  val caffeineB = Caffeine()
    .withExpireAfterAccess(1.second)
    .withExpireAfterWrite(2.seconds)
    .withRefreshAfterWrite(3.seconds)
    .async((key: Integer) => IO("async string" + key))

  val expected = (
    "async string1",
    "async string2",
    List("async string1", "async string2", "async string3")
  )
  program(caffeineA).unsafeRunSync() must_== expected
  program(caffeineB).unsafeRunSync() must_== expected
  program(Caffeine().async(_ => IO.raiseError(new Exception("something wrong"))))
    .unsafeRunSync() must throwA[Exception]
}
  • line-executionContext Async Loading Cache need an Execution Context to execute the Java Future things
  • line-global .executionContext(global) will make sure the cache using Scala execution context as default to execute java future, otherwise its default java folk join pool. alternatively you can also use Akka's execution context.
  • line-expire default the expiring policy, here it's more Scala idiomatic lambda and Duration
  • line-async will create an async loading cache. the async loading function that it will use is K => IO[V] so you don't need to deal with awful Java Future.

Works with Tagless Final

No matter what style of effect abstraction you project is using, Jujiu can easily fit in

i.e. Tagless Final

"works with tagless final" >> {
  trait LogDsl[F[_]] {
    def log(msg: String): F[Unit]
  }

  type ProgramDsl[F[_]] = CaffeineCache[F, String, String] with LogDsl[F]

  def program[F[_]: Async](dsl: ProgramDsl[F])
  (implicit ev: cache.Cache[String, String]): F[Option[String]] =
    for {
      value <- dsl.fetchF("key")
      _ <- dsl.log("something")
    } yield value

  {
    object dsl extends CaffeineCache[IO, String, String] with LogDsl[IO] {
      def log(msg: String) = IO(println(msg))
    }

    implicit val cacheProvider: cache.Cache[String, String] = Caffeine().sync[String, String]

    program[IO](dsl).unsafeRunSync() must_== None
  }
}

just extends CaffeineCache[F, K, V] and provide cacheProvider

ReaderT Pattern

if your code is in ReaderT pattern, good, it will fit in more naturally

"works with tagless final style readerT" >> {
  // Layer 1: Environment
  trait HasLogger {
    def logger: String => Unit
  }
  trait HasCacheProvider {
    def cacheProvider: cache.Cache[String, String]
  }

  type Env = HasLogger with HasCacheProvider

  // Layer 2: DSL
  trait LogDsl[F[_]] {
    def log(msg: String)(implicit M: Applicative[F]): Kleisli[F, Env, Unit] = Kleisli(a => M.pure(a.logger(msg)))
  }

  type Dsl[F[_]] = CaffeineCache[F, String, String] with LogDsl[F]

  // Layer 3: Business
  def program[F[_]](dsl: Dsl[F])(
    implicit ev: Async[F]
  ) =
    for {
      _ <- dsl.log("something")
      value <- dsl.fetch("key").local[Env](_.cacheProvider)
    } yield value

  object dsl extends CaffeineCache[IO, String, String] with LogDsl[IO]

  program[IO](dsl)
    .run(new HasLogger with HasCacheProvider {
      def logger = println
      def cacheProvider = Caffeine().sync
    })
    .unsafeRunSync() must_== None
}

notice that proper contravariant adapt need .local[Env](_.cacheProvider)

Extensible

it's extensible by design as Kleisli, if you provider another cache provider, the same dsl will work.

"run on redis" >> {
  import redis.clients.jedis._

  def program[F[_]: Async, S[_, _]](dsl: Cache[F, S, String, String]) = for {
    r1 <- dsl.fetch("not exist yet")
    r2 <- dsl.fetch("not exist yet", _ => Async[F].delay("default"))
    _ <- dsl.put("not exist yet", "now exist")
    r3 <- dsl.fetch("not exist yet")
    _ <- dsl.clear("not exist yet")
    r4 <- dsl.fetch("not exist yet")
  } yield (r1, r2, r3, r4)

  type J[A, B] = Jedis
  object dsl extends Cache[IO, J, String, String] {
    def put(k: String, v: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Unit] =
      Kleisli { redis =>
        M.delay{
          redis.set(k, v)
          ()
        }
      }
    def fetch(k: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Option[String]] =
      Kleisli(redis => M.delay(Option(redis.get(k))))
    def clear(k: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Unit] =
      Kleisli(redis => M.delay{
        redis.del(k)
        ()
      })
  }

  program(dsl).run(
     new Jedis("localhost")
  ).unsafeRunSync() must_== ((None, "default", Some("now exist"), None))
}.pendingUntilFixed("Redis")