Deprecating the Observer Pattern

 

2012/10/21 ScalaBase in 名古屋

自己紹介

 

  • @gakuzzzz
  • 中村学(なかむら まなぶ)
  • 株式会社Tech to Value
  • ScalaでWebシステム開発してるよ

元論文

 

Abstract

 

Observer Pattern はイケてないから俺がもっとかっこいいAPIを提案してやんよ!

背景

 

  • 昨今インタラクティブなアプリケーションが非常に増大している
  • インタラクティブなアプリケーションは継続的にインタラクションをハンドリングする必要がある
  • しかしそのプログラミングモデルは進化していない

2008 Adobe プレゼンより

 

  • Adobeのデスクトップアプリケーションの1/3がイベントハンドリングコード
  • 報告されたバグの1/2がイベントハンドリングコード内

改善できるよ!

Observerパターンの例(マウスドラッグ)

 

var path: Path = null
val moveObserver = { (event: MouseEvent) =>
  path
.lineTo(event.position)
  draw
(path)
}
control
.addMouseDownObserver { event =>
  path
= new Path(event.position)
  control
.addMouseMoveObserver(moveObserver)
}
control
.addMouseUpObserver { event =>
  control
.removeMouseMoveObserver(moveObserver)
  path
.close()
  draw
(path)
}

いけてない

  • 副作用

    • var !!!
  • カプセル化の破壊

    • path がスコープの外に漏れてる
  • Composabilityが無い

    • それぞれのObserverが独立しているので、同時にObserverを破棄したりできない
  • レイヤリングが不適切

    • パスの追跡だけじゃなく描画まで行われている
  • 拡張性が無い

いけてない2

  • 統一性に欠ける

    • 多彩なObserverが混在する
  • 抽象度が低い

    • Escキーでもドラッグ停止したいよねー
  • リソース管理の必要性

    • ドラッグ中のみマウスイベントを監視したい。そのためにObserverの追加削除を管理する必要がある
  • 意味的な距離がある

    • やりたい事とコードの表現に乖離がある

じゃあどうするの?

まずは統合的なイベントのインターフェイスを考える

 

trait EventSource[A] {
 
def emit(event: A): Unit
}

 

val es = new EventSource[Int]
es emit
1
es emit
2

ソースからのイベントに反応するクロージャを登録する observe メソッドを用意するよ

 

trait Observing
 
def observe[A](es: EventSource[A])
(
f: A => Unit): Objserver = ...
}

 

val ob = observe(es) { x =>
  println
("Receiving " + x)
}
...
ob
.dispose()

 

Observerを破棄するためにイベントソースを覚えておく必要がない!

ボタンの例

 

class Button(label: String) {
 
val clicks = new EventSource[Int] {
   
// call "this emit x" for each system event
 
}
}

 

これによって終了ボタンがこんな風に書ける

 

object Application extends Observing {
 
...
 
val quitButton = new Button("quit")
  observe
(quitButton.clicks) { x => System.exit() }
}

普通は終了ボタンだけじゃなく、メニューからや例外時にも終了したいよね

 

val quitButton = new Button("quit")
val quitMenu = new MenuItem("quit")
val fatalExceptions = new EventSource[Exception]
observe
(quitButton.clicks) { x => System.exit() }
observe
(quitMenu.clicks) { x => System.exit() }
observe
(fatalExceptions) { x => System.exit() }

 

あじゃぱー

そこで merge ですよ

 

trait EventSource[A] {
 
def merge[B>:A](that: EventSource[B]): EventSource[B]
}

 

もし同時にイベントが発行された場合に問題になるけど、それは後で議論するよ。

これで重複はなくなった

 

trait UIApplication extends Observing {
 
...
 
val quit: EventSource[Any]
  observe
(quit) { x =>
   
... // clean up, display dialog, etc
   
System.exit()
 
}
}
object MyApp extends UIApplication {
 
...
 
val quit = (quitButton.clicks
    merge quitMenu
.clicks
    merge fatalExceptions
)
}

でも型が

EventSource[Int]EventSource[String]EventSource[Exception]merge すると EventSource[Any] になっちゃう。

これはあんまりうれしくない。

そこで Functor ですよ

 

def map[B](f: A => B): EventSource[B]

 

val quit =
 
(quitButton.clicks.map(x => "Ok")
    merge quitMenu
.clicks.map(x => "Ok")
    merge fatalExceptions
.map(x => x.getMessage))

 

わーいやったー

ついでだから他のコンビネータも定義しとくよ

 

def collect[B](p: PartialFunction[A, B]): EventSource[B]

def map[B](f: A => B): Events[B] =
  collect
{ case x => f(x) }
def filter(p: A => Boolean): Events[A] =
  collect
{ case x if p(x) => x }

 

collect があれば map も filter も定義できるね。

新しいイベントAPIを使うと最初のマウスドラッグの例はこうなるよ

 

var path: Path = null
var moveObserver = null
observe
(control.mouseDown) { event =>
  path
= new Path(event.position)
  moveObserver
=
    observe
(control.mouseMoves) { event =>
      path
.lineTo(event.position)
      draw
(path)
   
}
}
observe
(control.mouseUp) { event =>
  moveObserver
.dispose()
  path
.close()
  draw
(path)
}

すでに統一的なObserverとイベントのインターフェイスがあるので、ドラッグオペレーションに含まれるイベントを抽象化することが可能だよ

 

def installDragController(start: EventSource[Positional],
    move
: EventSource[Positional], end: EventSource[Positional]) = {
 
var path: Path = null
 
var moveObserver = null
  observe
(start) { event =>
    path
= new Path(event.position)
    moveObserver
=
      observe
(move) { event =>
        path
.lineTo(event.position)
        draw
(path)
     
}
 
}
  observe
(end) { event =>
    moveObserver
.dispose()
    path
.close()
    draw
(path)
 
}
}

やったー これでマウス以外のポイントデバイスや、キー操作でのドラッグ停止なんかができるようになったよー

 

def installDragController(
      pen
.down,
      pen
.moves,
      pen
.up merge escapeKeyDown.map(x => pen.position.now))

でもまだ

副作用とかいっぱいあるし、イマイチだよね

セマンティクスを直接コード化したいんだよ

 

  1. マウスボタンが押されたら、新しいパスをスタートする
  2. マウスが離されるまで、マウスの動きをPathに記録する
  3. マウスボタンが解放されたら、パスをクローズする

これを実現するために

Reactor というものを導入するよ

 

Reactor.once { self =>
 
// step 1:
 
val path = new Path((self next mouseDown).position)
 
// step 2:
  self loopUntil mouseUp
{
   
val m = self next mouseMove
    path
.lineTo(m.position)
    draw
(path)
 
}
 
// step 3:
  path
.close()
  draw
(path)
}

Reactor はこんなメソッド持ってるよ

 

/** body を一回評価する */
def once(body: Reactor => Unit): Reactor
/** body を評価し続ける */
def loop(body: Reactor => Unit): Reactor

/** e がイベントを発行するまでbodyをループする */
def loopUntil[A](e: EventSource[A])(body: => Unit): A

/** e がイベントを発行するまでサスペンドする */
def next[A](e: EventSource[A]): A

今度は時間変化する値を考えてみる

たとえば時間的に変化するラベルを持つボタン

Signal

そこで Signal というインターフェイスを用意してみるよ

 

trait Signal[+A]

class Var[A](init: A) extends Signal[A] {
 
def update(newValue: A): Unit = ...
}

class Button(label: Signal[String])

使い方はこんな感じ

 

val a = new Var(1)
val b = new Var(2)
val sum = Signal{ a()+b() }
observe
(sum) { x => println(x) }
a
()= 7
b
()= 35

 

この例は9と42が表示されるよ

Signalのコンテキストを無視して現在の値を取得する now メソッドを用意しておくよ

 

val b0 = b.now
val sum1 = Signal{ a()+b0 }
val sum2 = Signal{ a()+b.now }
val sum3 = Signal{ a()+b() }

 

  • これらは3つ全部違うよ
  • sum1 は sum2 と異なる b0 はつねに同じ値
  • sum2 は a が update されるたびに b の現在の値を取得する
  • sum3 は b にも依存して、b が update されるとsum3も発火する

最終的に Signal はこんな感じ

 

trait Signal[+A] {
 
def apply(): A
 
def now: A
 
def changes: EventSource[A]
}

 

apply メソッドは Signal式上での関数呼び出し構文 Signal { e() } で使うんだ

またまたドラッグに戻るよ

 

ドラッグを改善するステップとして、Pathの構築と描画を分離するよ

こんな感じ

 

val path: Signal[Path] =
Val(new Path) once { self =>
 
import self._
 
val down = next(mouseDown)
  emit
(previous.moveTo(down.position))
  loopUntil
(mouseUp) {
   
val m = next(mouseMove)
    emit
(previous.lineTo(m.position))
 
}
  emit
(previous.close)
}

メソッド once と loopUntil は Reactor で出てきたけど、Signal に同様のものを導入するんだ。

 

また Path を immutable 版になってる。lineTo と close は元の Path を変更することなく、新しい Path を生成するよ。

 

そして描画は emit メソッドに置き換えられるんだ。

これで外部Observerで描画を実現できるよ

 

observe(path)(draw)

 

わぉ!シンプル!

これで EventSource と Signal という二つの道具を手に入れたけど、ここでさらに共通の機能を抽出しよう。

その前に EventSource についても mutable な性質を分離する Events trait を用意しておくよ。

 

今までの EventSource を引数に取ったり戻り値に返すメソッドは Events に書き換わると思って頂戴。

 

trait Events[+A] {
 
def subscribe(ob: Observer): Unit
 
def message(ob: Observer): Option[A]
}
class EventSource[A] extends Events[A] {
 
def emit(ev: A): Unit
 
...
}

Events と Signal の親はこうなるんだ

 

trait Reactive[+Msg, +Now] {
 
def current(dep: Dependant): Now
 
def message(dep: Dependant): Option[Msg]
 
def now: Now = current(Dependent.Nil)
 
def msg: Msg = message(Dependent.Nil)
}
trait Signal[+A] extends Reactive[A, A]
trait Events[+A] extends Reactive[A, Unit]

 

(Dependentってナニモノ?)

 

(このReactiveが依存するReactiveを抽象化して、依存のないReactiveを表せるようにNilを定義したものっぽい)

once や loop のために中間クラスを導入するよ

 

implicit def eventsToDataflow[A](e: Events[A]) =
 
new EventsToDataflow(e)
implicit def signalToDataflow[A](s: Signal[A]) =
 
new SignalToDataflow(s)
trait ReactiveToDataflow[M, N,
      R  
<: Reactive[M,N],
      DR
<: DataflowReactive[M,N,R]]
   
extends Reactive[M, N] {
 
protected def init: R

 
def loop(body: DR => Unit): R
 
def once(body: DR => Unit): R
}
class EventsToDataflow[A](initial: Events[A])
 
extends Events[A]
 
with ReactiveToDataflow[
    A
, Unit, Events[A], DataflowEvents[A]]
class SignalToDataflow[A](initial: Signal[A])
 
extends Signal[A]
 
with ReactiveToDataflow[
    A
, A, Signal[A], DataflowSignal[A]]

そしてReactiveのためのデータフロー言語は次のように定義できるよ

 

trait DataflowReactive[M, N, R <: Reactive[M,N]] 
   
extends Reactive[M, N] {
 
def emit(m: M): Unit
 
def switchTo(r: R): Unit
 
def delay: Unit
 
def next[B](r: Reactive[B, _]): B
 
def nextVal[B](r: Reactive[_, B]): B
}
  • next

    • 与えられた r の Message を待つよ
  • nextVal

    • 与えられた r の変更を待つよ
  • delay

    • 現在のデータフローを中断して next の伝播を継続させるよ
  • emit

    • 現在のデータフローに m を発信するよ。そして現在値を反映させるんだ。
  • switchTo

    • 現在のデータフローを与えられた r に切り替えるよ。

以前に出てきた collect はこんな感じに

 

def collect[B](p: PartialFunction[A, B]) =
 
Events.loop[B] { self =>
   
val x = self next outer
   
if (p isDefinedAt x) self emit p(x)
   
else self.delay
 
}

さらにいくつか便利なコンビネータを提供するよ

 

def hold(init: A): Signal[A] =
 
Val(init) loop { self =>
    self emit
(self next this)
 
}

 

hold は以前の値を保持し続けるコンビネータなんだ

def switch[A](before: Signal[A], 
      after
: =>Signal[A]): Signal[A] =
  before once
{ self =>
    self next
this
    self switchTo after
 
}

 

switch は切り替えスイッチみたいなもので、最初は before のように振る舞い、自身が on となったら after として振舞うようになるよ

def take(n: Int) = Events.once[A] { self =>
 
var x = 0
 
while(x < n) {
    self emit
(self next outer)
    x
+= 1
 
}
}

take は n回までのイベントを発生させたらその後は沈黙するんだ

 

同様の仕組みで drop も作れるよ

みんな大好き flatten も

 

def flattenEvents[B](implicit witness: A => Events[B]) =
 
Events.loop[B] { self =>
    self switchTo witness
(self next this)
 
}
def flatten[B](implicit witness: A => Signal[B]) =
  witness
(this.now) loop { self =>
    self switchTo witness
(self next this)
 
}

今 Events と Signal の flatten を別々で定義したけど、一般化して定義もできるよ

 

def flatten[M, N, 
      R
<: Reactive[M,N],
      DR
<: DataflowReactive[M,N,R]]
   
(implicit c: A => R
     
with ReactiveToDataflow[M,N,R,DR]): R =
  c
(now) loop { self =>
    self switchTo c
(self next this)
 
}

今やマウスドラッグの問題をコンビネータで表現できるようになったよ。

 

val moves = mouseDown map { md =>
  mouseMove map
(mm => new Drag(mm))
}
val drops = mouseUp map { mu =>
 
Events.Now(new Drop(mu))
}
val drags = (moves merge drops).flatten

再帰的な定義もできるようにしたいね

 

val counter = 0 loop { self =>
  self emit
(self.now + 1)
}

 

これは現在の値を評価しようとして現在の値を参照するためおかしなことになっちゃう

そこで前の値を意味する previous を導入するよ

 

val counter = 0 loop { self =>
  self emit
(self.previous + 1)
}

 

やったー

毎秒その値を更新するフレームレートのSignalなんかもつくれちゃう

 

val frameRate = Val(0) loop { self =>
 
val c0 = counter.now
  self next
Clock.inSeconds(1)
  self emit
(counter.now - c0)
}

Reactive は Signal や Events だけじゃなく様々な実装クラスを作れるよ

たとえば Future

 

trait Future[+A] extends Reactive[A,Option[A]]

ドラッグの例に戻ると、リアクティブなPathだって考えることができるよ

 

以下のような PathDelta があったとして

 

sealed class PathDelta
case class MoveTo(x: Int, y: Int) extends PathDelta
case class LineTo(x: Int, y: Int) extends PathDelta
case object Close extends PathDelta

こんな実装が考えられる

 

class RPath extends Reactive[PathDelta, Path]

さらに RPath から Detaflow への変換も

 

class DataflowRPath(init: RPath) extends RPath
 
with DataflowReactive[PathDelta, Path,
   
RPath, DataflowRPath]
implicit def rpath2dataflowrpath(r: RPath) =
 
new DataflowRPath(r)

すると RPath をこんな風に構築できるようになるよ

 

val path: RPath = (new RPath) once { self =>
 
val down = self next mouseDown
  self emit
MoveTo(down.position)
 
val up = self loopUntil mouseUp {
   
val m = self next mouseMove
    self emit
LineTo(m.position)
 
}
  self emit
Close
}

もちろん DataflowRPath に メソッド追加する方法だってかまわない

 

def lineTo(x: Int, y: Int) = emit(LineTo(x,y))
def close(x: Int, y: Int) = emit(Close)

で、このイケてるAPIだけど、実装にあたって注意点があるんだ

 

それは一貫性の問題

例えば次の例を考えてみよう

 

Reactor.once {
 
val es: Events[Device] = connectDevice()
 
(self next es).initialize()
}

 

もし、es が リアクターのnext呼び出しの前にイベントが発行できたとすると、何らかのデバイスの初期化を見逃してしまうことになる!これはヤバい

実際の実装では、Push-driven なアプローチによってこの問題を解決してるよ。

まずリアクティブの位相的にソートされた依存グラフを構築するんだ。

 

つまり、全てのソースリアクティブはレベル0として、依存リアクティブは最も高い依存関係のレベルに1を足したものとすした依存グラフということ

 

こうしてできた依存グラフを元に、次のようにイベントの伝播サイクルを進めるよ

 

  1. 全ての変更または発行されているリアクティブを、リアクティブのレベルを優先度として、Priority Queue に入れる
  2. キューに値がある間、最も低いレベルのリアクティブを取得し validate する
  3. そのリアクティブは依存している他のリアクティブにメッセージを伝播するかどうか決定する
  4. もし伝播するのであれば、そのリアクティブを同様に Priority Queue に追加する

この方法はシンプルだけど、リアクティブのレベルが変更されない範囲であれば、十分にデータ不整合を避けられるよ。

 

ただし、前の依存を削除したり新しい依存を作成したりするためには、どうしてもSignal式やデータフローリアクティブ内で条件分岐などを処理する必要があるんだ。

こうした動的な依存のために次の例を考えてみよう

 

val x = Var(2) // level 0
val y = Cache { f(x()) } // level 1
val z = Cache { g(y()) } // level 2
val result =
 
Signal { if(x()==2) y() else z() } // level 2 or 3

result は x に依存していて、位相レベル2か3を持ってるね

 

シグナルに常に可能性のあるレベルよりも高いレベル割り当てるってアプローチが簡単そうに思えるけど、残念ながらSignal式のレベルを静的に決定することができないんだ

 

だから実際に現在の値を評価する際に、そのレベルが前に知っていたレベルより大きいとわかったら、Exception を throw してリスケジュールする方法をとってるよ

そのため、再計算の可能性があるから、Reactiveの動的な依存関係が参照し終わるまで、負荷の高い計算は取り除いておく事をお勧めしとくよ

まぁでも幸いなことに、殆どのコンビネータはレベルを予測するのに十分な情報をもってるし、Signalとかはウォームアップするから安心して

相互再帰についても考える必要があるね

 

val c = Var(true)
val x = Signal { if(c()) 1 else y() }
val y = Signal { if(c()) x() else 1 }

突然の無限ループ!

これにはサイクルを検出した際に巻き込まれたリアクティブのみレベルをリセットすることで対応してるよ。

それ以外にも実装の問題として副作用の扱いとかメモリリークを避ける工夫とかあるけど、資料作る時間もぅない……

 

あきらめるのょくなぃって……ぉもって……がんばった……でも……ゴメン……まにあわなかった……でもDataflowBaseと限定継続ゎ… ズッ友だょ……!

まとめ

最初にObserverパターンがイケてないとdisったけど、同じ視点で新しいAPIを評価してみよう

  • 統一性と抽象度

    • Reactiveインターフェイスが実際にイベントを発行するオブジェクトから独立して、なおかつポリモーフィックに動作するからイケてるよね
  • カプセル化

    • マウスドラッグの例でも外部状態を露出することなしに実現できたよ
  • リソースマネジメント

    • ObserverのライフタイムをTraitによって制限できるよ
  • 副作用

    • カプセル化と同様に実行状態を内部に制限できるので副作用も制限できるよ
  • Composability

    • 様々なコンビネータが提供されてるよ
  • スケーラビリティ

    • Reactive の実装を増やしてく例を見たよね
  • 意味的な距離

    • 途中の例でも見たとおり、意味どおりにコード書けるね

つまりイケてる!

実際にこのAPIを実装した Scala.React というライブラリを公開してるよ

 

  • http://lamp.epfl.ch/~imaier
  • 商用のゲームエンジンで使用されてるすごいやつだよ

参考論文とか関連Worksは元論文読んでね

おしまい