Deprecating the Observer Pattern
2012/10/21 ScalaBase in 名古屋
2012/10/21 ScalaBase in 名古屋
Authors
Observer Pattern はイケてないから俺がもっとかっこいいAPIを提案してやんよ!
Observerパターンの例(マウスドラッグ)
var path: Path = null
val moveObserver = { (event: MouseEvent) =>
path.lineTo(event.position)
draw(path)
}
control.addMouseDownObserver { event =>
path = new Path(event.position)
control.addMouseMoveObserver(moveObserver)
}
control.addMouseUpObserver { event =>
control.removeMouseMoveObserver(moveObserver)
path.close()
draw(path)
}
副作用
カプセル化の破壊
Composabilityが無い
レイヤリングが不適切
統一性に欠ける
抽象度が低い
リソース管理の必要性
意味的な距離がある
まずは統合的なイベントのインターフェイスを考える
trait EventSource[A] {
def emit(event: A): Unit
}
val es = new EventSource[Int]
es emit 1
es emit 2
ソースからのイベントに反応するクロージャを登録する observe メソッドを用意するよ
trait Observing
def observe[A](es: EventSource[A])
(f: A => Unit): Objserver = ...
}
val ob = observe(es) { x =>
println("Receiving " + x)
}
...
ob.dispose()
Observerを破棄するためにイベントソースを覚えておく必要がない!
ボタンの例
class Button(label: String) {
val clicks = new EventSource[Int] {
// call "this emit x" for each system event
}
}
これによって終了ボタンがこんな風に書ける
object Application extends Observing {
...
val quitButton = new Button("quit")
observe(quitButton.clicks) { x => System.exit() }
}
普通は終了ボタンだけじゃなく、メニューからや例外時にも終了したいよね
val quitButton = new Button("quit")
val quitMenu = new MenuItem("quit")
val fatalExceptions = new EventSource[Exception]
observe(quitButton.clicks) { x => System.exit() }
observe(quitMenu.clicks) { x => System.exit() }
observe(fatalExceptions) { x => System.exit() }
あじゃぱー
trait EventSource[A] {
def merge[B>:A](that: EventSource[B]): EventSource[B]
}
もし同時にイベントが発行された場合に問題になるけど、それは後で議論するよ。
これで重複はなくなった
trait UIApplication extends Observing {
...
val quit: EventSource[Any]
observe(quit) { x =>
... // clean up, display dialog, etc
System.exit()
}
}
object MyApp extends UIApplication {
...
val quit = (quitButton.clicks
merge quitMenu.clicks
merge fatalExceptions)
}
でも型が
EventSource[Int]
と EventSource[String]
と EventSource[Exception]
を merge
すると EventSource[Any]
になっちゃう。
これはあんまりうれしくない。
def map[B](f: A => B): EventSource[B]
val quit =
(quitButton.clicks.map(x => "Ok")
merge quitMenu.clicks.map(x => "Ok")
merge fatalExceptions.map(x => x.getMessage))
わーいやったー
ついでだから他のコンビネータも定義しとくよ
def collect[B](p: PartialFunction[A, B]): EventSource[B]
def map[B](f: A => B): Events[B] =
collect { case x => f(x) }
def filter(p: A => Boolean): Events[A] =
collect { case x if p(x) => x }
collect があれば map も filter も定義できるね。
新しいイベントAPIを使うと最初のマウスドラッグの例はこうなるよ
var path: Path = null
var moveObserver = null
observe(control.mouseDown) { event =>
path = new Path(event.position)
moveObserver =
observe(control.mouseMoves) { event =>
path.lineTo(event.position)
draw(path)
}
}
observe(control.mouseUp) { event =>
moveObserver.dispose()
path.close()
draw(path)
}
すでに統一的なObserverとイベントのインターフェイスがあるので、ドラッグオペレーションに含まれるイベントを抽象化することが可能だよ
def installDragController(start: EventSource[Positional],
move: EventSource[Positional], end: EventSource[Positional]) = {
var path: Path = null
var moveObserver = null
observe(start) { event =>
path = new Path(event.position)
moveObserver =
observe(move) { event =>
path.lineTo(event.position)
draw(path)
}
}
observe(end) { event =>
moveObserver.dispose()
path.close()
draw(path)
}
}
やったー これでマウス以外のポイントデバイスや、キー操作でのドラッグ停止なんかができるようになったよー
def installDragController(
pen.down,
pen.moves,
pen.up merge escapeKeyDown.map(x => pen.position.now))
副作用とかいっぱいあるし、イマイチだよね
セマンティクスを直接コード化したいんだよ
Reactor というものを導入するよ
Reactor.once { self =>
// step 1:
val path = new Path((self next mouseDown).position)
// step 2:
self loopUntil mouseUp {
val m = self next mouseMove
path.lineTo(m.position)
draw(path)
}
// step 3:
path.close()
draw(path)
}
Reactor はこんなメソッド持ってるよ
/** body を一回評価する */
def once(body: Reactor => Unit): Reactor
/** body を評価し続ける */
def loop(body: Reactor => Unit): Reactor
/** e がイベントを発行するまでbodyをループする */
def loopUntil[A](e: EventSource[A])(body: => Unit): A
/** e がイベントを発行するまでサスペンドする */
def next[A](e: EventSource[A]): A
今度は時間変化する値を考えてみる
たとえば時間的に変化するラベルを持つボタン
そこで Signal というインターフェイスを用意してみるよ
trait Signal[+A]
class Var[A](init: A) extends Signal[A] {
def update(newValue: A): Unit = ...
}
class Button(label: Signal[String])
使い方はこんな感じ
val a = new Var(1)
val b = new Var(2)
val sum = Signal{ a()+b() }
observe(sum) { x => println(x) }
a()= 7
b()= 35
この例は9と42が表示されるよ
Signalのコンテキストを無視して現在の値を取得する now メソッドを用意しておくよ
val b0 = b.now
val sum1 = Signal{ a()+b0 }
val sum2 = Signal{ a()+b.now }
val sum3 = Signal{ a()+b() }
最終的に Signal はこんな感じ
trait Signal[+A] {
def apply(): A
def now: A
def changes: EventSource[A]
}
apply メソッドは Signal式上での関数呼び出し構文 Signal { e() }
で使うんだ
またまたドラッグに戻るよ
ドラッグを改善するステップとして、Pathの構築と描画を分離するよ
こんな感じ
val path: Signal[Path] =
Val(new Path) once { self =>
import self._
val down = next(mouseDown)
emit(previous.moveTo(down.position))
loopUntil(mouseUp) {
val m = next(mouseMove)
emit(previous.lineTo(m.position))
}
emit(previous.close)
}
メソッド once と loopUntil は Reactor で出てきたけど、Signal に同様のものを導入するんだ。
また Path を immutable 版になってる。lineTo と close は元の Path を変更することなく、新しい Path を生成するよ。
そして描画は emit メソッドに置き換えられるんだ。
これで外部Observerで描画を実現できるよ
observe(path)(draw)
わぉ!シンプル!
これで EventSource と Signal という二つの道具を手に入れたけど、ここでさらに共通の機能を抽出しよう。
その前に EventSource についても mutable な性質を分離する Events trait を用意しておくよ。
今までの EventSource を引数に取ったり戻り値に返すメソッドは Events に書き換わると思って頂戴。
trait Events[+A] {
def subscribe(ob: Observer): Unit
def message(ob: Observer): Option[A]
}
class EventSource[A] extends Events[A] {
def emit(ev: A): Unit
...
}
Events と Signal の親はこうなるんだ
trait Reactive[+Msg, +Now] {
def current(dep: Dependant): Now
def message(dep: Dependant): Option[Msg]
def now: Now = current(Dependent.Nil)
def msg: Msg = message(Dependent.Nil)
}
trait Signal[+A] extends Reactive[A, A]
trait Events[+A] extends Reactive[A, Unit]
(Dependentってナニモノ?)
(このReactiveが依存するReactiveを抽象化して、依存のないReactiveを表せるようにNilを定義したものっぽい)
once や loop のために中間クラスを導入するよ
implicit def eventsToDataflow[A](e: Events[A]) =
new EventsToDataflow(e)
implicit def signalToDataflow[A](s: Signal[A]) =
new SignalToDataflow(s)
trait ReactiveToDataflow[M, N,
R <: Reactive[M,N],
DR <: DataflowReactive[M,N,R]]
extends Reactive[M, N] {
protected def init: R
def loop(body: DR => Unit): R
def once(body: DR => Unit): R
}
class EventsToDataflow[A](initial: Events[A])
extends Events[A]
with ReactiveToDataflow[
A, Unit, Events[A], DataflowEvents[A]]
class SignalToDataflow[A](initial: Signal[A])
extends Signal[A]
with ReactiveToDataflow[
A, A, Signal[A], DataflowSignal[A]]
そしてReactiveのためのデータフロー言語は次のように定義できるよ
trait DataflowReactive[M, N, R <: Reactive[M,N]]
extends Reactive[M, N] {
def emit(m: M): Unit
def switchTo(r: R): Unit
def delay: Unit
def next[B](r: Reactive[B, _]): B
def nextVal[B](r: Reactive[_, B]): B
}
next
nextVal
delay
emit
switchTo
以前に出てきた collect はこんな感じに
def collect[B](p: PartialFunction[A, B]) =
Events.loop[B] { self =>
val x = self next outer
if (p isDefinedAt x) self emit p(x)
else self.delay
}
さらにいくつか便利なコンビネータを提供するよ
def hold(init: A): Signal[A] =
Val(init) loop { self =>
self emit (self next this)
}
hold は以前の値を保持し続けるコンビネータなんだ
def switch[A](before: Signal[A],
after: =>Signal[A]): Signal[A] =
before once { self =>
self next this
self switchTo after
}
switch は切り替えスイッチみたいなもので、最初は before のように振る舞い、自身が on となったら after として振舞うようになるよ
def take(n: Int) = Events.once[A] { self =>
var x = 0
while(x < n) {
self emit (self next outer)
x += 1
}
}
take は n回までのイベントを発生させたらその後は沈黙するんだ
同様の仕組みで drop も作れるよ
みんな大好き flatten も
def flattenEvents[B](implicit witness: A => Events[B]) =
Events.loop[B] { self =>
self switchTo witness(self next this)
}
def flatten[B](implicit witness: A => Signal[B]) =
witness(this.now) loop { self =>
self switchTo witness(self next this)
}
今 Events と Signal の flatten を別々で定義したけど、一般化して定義もできるよ
def flatten[M, N,
R <: Reactive[M,N],
DR <: DataflowReactive[M,N,R]]
(implicit c: A => R
with ReactiveToDataflow[M,N,R,DR]): R =
c(now) loop { self =>
self switchTo c(self next this)
}
今やマウスドラッグの問題をコンビネータで表現できるようになったよ。
val moves = mouseDown map { md =>
mouseMove map (mm => new Drag(mm))
}
val drops = mouseUp map { mu =>
Events.Now(new Drop(mu))
}
val drags = (moves merge drops).flatten
再帰的な定義もできるようにしたいね
val counter = 0 loop { self =>
self emit (self.now + 1)
}
これは現在の値を評価しようとして現在の値を参照するためおかしなことになっちゃう
そこで前の値を意味する previous を導入するよ
val counter = 0 loop { self =>
self emit (self.previous + 1)
}
やったー
毎秒その値を更新するフレームレートのSignalなんかもつくれちゃう
val frameRate = Val(0) loop { self =>
val c0 = counter.now
self next Clock.inSeconds(1)
self emit (counter.now - c0)
}
Reactive は Signal や Events だけじゃなく様々な実装クラスを作れるよ
たとえば Future
trait Future[+A] extends Reactive[A,Option[A]]
ドラッグの例に戻ると、リアクティブなPathだって考えることができるよ
以下のような PathDelta があったとして
sealed class PathDelta
case class MoveTo(x: Int, y: Int) extends PathDelta
case class LineTo(x: Int, y: Int) extends PathDelta
case object Close extends PathDelta
こんな実装が考えられる
class RPath extends Reactive[PathDelta, Path]
さらに RPath から Detaflow への変換も
class DataflowRPath(init: RPath) extends RPath
with DataflowReactive[PathDelta, Path,
RPath, DataflowRPath]
implicit def rpath2dataflowrpath(r: RPath) =
new DataflowRPath(r)
すると RPath をこんな風に構築できるようになるよ
val path: RPath = (new RPath) once { self =>
val down = self next mouseDown
self emit MoveTo(down.position)
val up = self loopUntil mouseUp {
val m = self next mouseMove
self emit LineTo(m.position)
}
self emit Close
}
もちろん DataflowRPath に メソッド追加する方法だってかまわない
def lineTo(x: Int, y: Int) = emit(LineTo(x,y))
def close(x: Int, y: Int) = emit(Close)
で、このイケてるAPIだけど、実装にあたって注意点があるんだ
それは一貫性の問題
例えば次の例を考えてみよう
Reactor.once {
val es: Events[Device] = connectDevice()
(self next es).initialize()
}
もし、es が リアクターのnext呼び出しの前にイベントが発行できたとすると、何らかのデバイスの初期化を見逃してしまうことになる!これはヤバい
実際の実装では、Push-driven なアプローチによってこの問題を解決してるよ。
まずリアクティブの位相的にソートされた依存グラフを構築するんだ。
つまり、全てのソースリアクティブはレベル0として、依存リアクティブは最も高い依存関係のレベルに1を足したものとすした依存グラフということ
こうしてできた依存グラフを元に、次のようにイベントの伝播サイクルを進めるよ
この方法はシンプルだけど、リアクティブのレベルが変更されない範囲であれば、十分にデータ不整合を避けられるよ。
ただし、前の依存を削除したり新しい依存を作成したりするためには、どうしてもSignal式やデータフローリアクティブ内で条件分岐などを処理する必要があるんだ。
こうした動的な依存のために次の例を考えてみよう
val x = Var(2) // level 0
val y = Cache { f(x()) } // level 1
val z = Cache { g(y()) } // level 2
val result =
Signal { if(x()==2) y() else z() } // level 2 or 3
result は x に依存していて、位相レベル2か3を持ってるね
シグナルに常に可能性のあるレベルよりも高いレベル割り当てるってアプローチが簡単そうに思えるけど、残念ながらSignal式のレベルを静的に決定することができないんだ
だから実際に現在の値を評価する際に、そのレベルが前に知っていたレベルより大きいとわかったら、Exception を throw してリスケジュールする方法をとってるよ
そのため、再計算の可能性があるから、Reactiveの動的な依存関係が参照し終わるまで、負荷の高い計算は取り除いておく事をお勧めしとくよ
まぁでも幸いなことに、殆どのコンビネータはレベルを予測するのに十分な情報をもってるし、Signalとかはウォームアップするから安心して
相互再帰についても考える必要があるね
val c = Var(true)
val x = Signal { if(c()) 1 else y() }
val y = Signal { if(c()) x() else 1 }
これにはサイクルを検出した際に巻き込まれたリアクティブのみレベルをリセットすることで対応してるよ。
それ以外にも実装の問題として副作用の扱いとかメモリリークを避ける工夫とかあるけど、資料作る時間もぅない……
あきらめるのょくなぃって……ぉもって……がんばった……でも……ゴメン……まにあわなかった……でもDataflowBaseと限定継続ゎ… ズッ友だょ……!
最初にObserverパターンがイケてないとdisったけど、同じ視点で新しいAPIを評価してみよう
統一性と抽象度
カプセル化
リソースマネジメント
副作用
Composability
スケーラビリティ
意味的な距離
実際にこのAPIを実装した Scala.React というライブラリを公開してるよ