Slick 3.0.0 documentation - 05 Database I/O Actions

Permalink to Database I/O Actions — Slick 3.0.0 documentation

データベースI/Oアクション 

クエリの結果を取得したり(myQuery.result)、テーブルを作成したり(myTable.schema.create)、データを挿入する(myTable += item)といったデータベースに対して実行する全ての事柄は、DBIOActionのインスタンスになる。

Database I/O Actions はいくつかの異なるコンビネータにより結合されるが(詳細はDBIOAction classDBIO objectで)、それらはいつも直列に実行され、(少なくとも概念上は)1つのデータベースセッションにおいて実行される。

大抵の場合、DBIOの型エイリアスを通常時のデータベースI/Oアクションとして、StreamingDBIOの型エイリアスををストリーミング可能なデータベースI/Oアクションとして利用したいと思うだろう。これらは、DBIOActionによってサポートされた副次的な effect types を省略させる(They omit the optional *effect types* supported by slick.dbio.DBIOAction.)。

Executing Database I/O Actions 

DBIOActionsを実行すると、データベースから得られた具象化された結果やストリーミングデータを得る事が出来る。

Materialized 

データベースに対しDBIOActionを実行し、具象化された結果を得るにはrunを用いる。これは例えば、単一のクエリ結果を引く場合(myTable.length.result)、コレクションを結果として得るクエリを引く場合(myTable.to[Set].result)などに利用される。どのDBIOActionもこのような実行処理をサポートしている。

runが呼ばれた時点で、アクションの実行が開始される。そして具象化された結果は非同期に処理が実行され終了するものとして、Futureにくるまって返却される。

val q = for (c <- coffees) yield c.name
val a = q.result
val f: Future[Seq[String]] = db.run(a)
f.onSuccess { case s => println(s"Result: $s") }

Streaming 

コレクションが得られるクエリには、ストリーミングの結果を返却する機能が備わっている。この場合、実際のコレクションの型は無視され、要素が直接Reactive StreamsPublisherを通して返却されることになる。これはAkka Streamsにより処理・計算されたものとなる。

DBIOActionの実行処理は、Subscriberをストリームに繋げるまで実行されない。Subscriberは1つだけ 購読 させる事が可能であり、それ以上の 購読 を行おうとするとそれらは失敗してしまう。DBIOActionのストリーミング部分において、ストリームの各要素は利用出来る状態になるとすぐに実行可能であると合図を送る。例えばトランザクションの中でストリーミング処理を行った場合にも、全ての要素は正常に届けられ、トランザクションがコミットされなかった場合にもきちんとストリームも失敗するようにできている。

val q = for (c <- coffees) yield c.name
val a = q.result
val p: DatabasePublisher[String] = db.stream(a)
...
// .foreach is a convenience method on DatabasePublisher.
// Use Akka Streams for more elaborate stream processing.
p.foreach { s => println(s"Element: $s") }

JDBCの結果集合をストリーミングする際、もしSubscriberが多くのデータを受け取る準備が出来ていないのなら、次の結果ページはバックグラウンドにバッファリングされる。一方で、全ての結果要素は同期的に渡されるし、結果集合は同期処理が終了する前に先に進んでしまったりはしない。これにより、結果集合の状態に依存するBlobのようなJDBCの低レベルな値に対しても同期的なコールバックが利用可能となる。mapResultのような便利なメソッドがこの目的のために提供されている。

val q = for (c <- coffees) yield c.image
val a = q.result
val p1: DatabasePublisher[Blob] = db.stream(a)
val p2: DatabasePublisher[Array[Byte]] = p1.mapResult { b =>
  b.getBytes(0, b.length().toInt)
}

Transactions and Pinned Sessions 

いくつかの小さいアクションで構成されたDBIOActionを実行する際には、Slickはコネクションプールから得られたセッションを要求し、その後セッションを開放する。データベース外の計算から結果を得るのを待ち合わせる間(例えば、flatMap)、不必要なセッションは保持されない。データベースに計算させることなく、2つのデータベースのアクションを結合するDBIOAction combinatorsandThenzip)は、1つのセッション内で融合されたアクションを実行する副作用を伴いつつ、より効率的にこれらのアクションを融合する。1つのセッションでの利用を強制するには、withPinnedSessionを利用すれば良い。これを用いる事で、データベース外での計算を待ち合わせる際に、既存のセッションを開き続けたままにしておくことが出来る。

トランザクションの利用を強制するtransactionallyと呼ばれるコンビネータもある。これは、実行されるDBIOActionの処理全体が自動的に成功か失敗のいずれかに収まる。

Warning

失敗というのはtransactionallyでラップされた個々のDBIOActionのアトミック性を保証するものでは無いため、この時点でエラー回復を図るコンビネータを適用すべきではない。作成されたデータベース側のトランザクションは、transactionallyアクションの外側でコミットやロールバックを行う。

val a = (for {
  ns <- coffees.filter(_.name.startsWith("ESPRESSO")).map(_.name).result
  _ <- DBIO.seq(ns.map(n => coffees.filter(_.name === n).delete): _*)
} yield ()).transactionally
val f: Future[Unit] = db.run(a)

JDBC Interoperability 

Slickで利用出来ない機能を使うためにJDBCのレベルを落とすには、SimpleDBIOアクションを用いれば良い。SimpleDBIOアクションは、データベースのスレッド上で実行され、JDBCのConnectionへの接続を得るものである。

val getAutoCommit = SimpleDBIO[Boolean](_.connection.getAutoCommit)
Fork me on GitHub