作ってみよう Enumerator - http-enumerator で Twitter API ぺろぺろ
これは Haskell Advent Calendar 2011 17 日目の参加記事です。
Iteratee 戦国時代らしいですね。Iteratee like なパッケージは現時点でも既に iteratee、 enumerator、 iterIO の三つがあり、それに加えて、 最近 Conduit が新たに作られ、現在も積極的に開発が進んでいるようです。
Iteratee の概念や Iteratee like なライブラリの作り方については
- Lazy I/O must go! - Iteratee: 列挙ベースのI/O - 純粋関数型雑記帳
- Enumerator Package – Yet Another Iteratee Tutorial : Preferred Research
- iterateeとは何か、何が良いのか - www.kotha.netの裏
を、具体的な使い方については
を見ると良いと思います。Haskell Advent Calendar 2011 でも [twitter:@melponn] (id:melpon) さんが記事を書いていますので参考にしてください。
今回は、enumerator パッケージをベースに Twitter API ぺろぺろするライブラリを作りつつ、 Iteratee の使い方を覚えようという話をしようと思います。
概要
Twitter API の lists/members などの API には一度に取得できるデータ件数には上限があり、 それ以上のデータが必要な場合には、取得したデータの中に残りのデータをを取得するための next_cursorというパラメータが含まれているので、 その next_cursor を cursor パラメーターとして指定して GET リクエストを送信、次のページの cursor を取得という作業を繰り返します。 それ以上データが無い場合には next_cursor は 0 になるので、0 になったらそこで終了するようにします。
この、データ取得を繰り返す部分を Enumerator として抽象化して、 必要に応じてデータを取得するようにしたいと思います。
例えば、ユーザ thimura の haskell リストから、最初の 40 個を取得したい場合
lst <- run_ $ listsMenbers "thimura" "haskell" $$ EL.take 40
のように使えるようなライブラリを作ってみましょう。
今回は、enumerator パッケージの他に、http-enumerator パッケージと、 authenticate パッケージ、 aeson パッケージを使います。
下準備
まず、足周りの部分から作りましょう。 twitter の OAuth トークンや、HTTP の接続を管理する Manager は ReaderT モナドに保持させることにします。
type TW = ReaderT TWEnv IO data TWEnv = TWEnv { twOAuth :: OAuth , twCredential :: Credential , twManager :: Maybe Manager } runTW :: TWEnv -> TW a -> IO a runTW env st = case twManager env of Nothing -> withManager $ \mgr -> runReaderT st $ env { twManager = Just mgr } Just _ -> runReaderT st env
Twitter API へのアクセスはこの TW モナドの中で行ないます。 次に、この TW モナドの中で Twitter API にアクセスする関数たちを定義します。 Iteratee の内部のモナドとして先ほど定義した TW モナドを利用します。 型 a を入力として受けとり、型 b の結果を返す Iteratee の型は
Iteratee a TW b
となります。これを run_ した場合の戻り値は TW b 型になります。
http-enumerator パッケージの Network.HTTP.Enumerator.http 関数をラッピングする httpMgr 関数を次のように定義します。
httpMgr :: Request IO -> (HT.Status -> HT.ResponseHeaders -> Iteratee ByteString IO a) -> Iteratee ByteString TW a httpMgr req iter = do mgr <- lift . asks $ twManager liftTrans $ http req iter mgr
http 関数は MonadIO 型クラスのインスタンスであれば IO 以外でも使えますが、 authenticate パッケージの signOAuth 関数の型が
signOAuth :: OAuth -> Credential -> Request IO -> IO (Request IO)
になっているため httpMgr では Request IO 型を受け取るように定義にしています。 そのため http req iter mgr の型が
Iteratee ByteString IO a
となるため Iteratee の基盤のモナドを IO から持ち上げてやる操作が必要になります。 Iteratee は liftTrans を使うと持ち上げが可能です。
今作った httpMgr 関数を使って API へのアクセスを行なう関数を定義します。
api :: String -> HT.Query -> Iteratee ByteString IO a -> Iteratee ByteString TW a api url query iter = do req <- lift $ apiRequest url query httpMgr req (handleError iter) where handleError iter' st@(HT.Status sc _) _ = if 200 <= sc && sc < 300 then iter' else throwError $ HTTPStatusCodeException st signOAuthTW :: Request IO -> TW (Request IO) signOAuthTW req = do oa <- getOAuth cred <- getCredential liftIO $ signOAuth oa cred req apiRequest :: String -> HT.Query -> TW (Request IO) apiRequest uri query = do req <- liftIO $ parseUrl uri >>= \r -> return $ r { queryString = query } signOAuthTW req
ここで、ライブラリが投げる例外を次のように定義しておきます。
data TwitterException = HTTPStatusCodeException HT.Status | PerserException SomeException [ByteString] | TwitterErrorMessage T.Text Value deriving (Show, Typeable) instance Exception TwitterException
HTTPStatusCodeException は既に api 関数で使っているように、HTTP ステータスコードが 200 番台以外の場合に投げられる例外です。 他に、Twitter から送られてきた JSON のパースに失敗した場合の PerserException と、 Twitter がエラーメッセージを送ってきた場合に利用する TwitterErrorMessage を定義しています。
Cursor をパースする
冒頭でも書きましたが、 lists/members.json を含むいくつかの API は、全てのデータが必要な場合には cursor パラメータを指定する必要があります。
Twitter からは
{ "previous_cursor": 0, "previous_cursor_str": "0", "next_cursor": 0, "next_cursor_str": "0" "users": [ ...(略)... ], }
のような JSON が返ってきます。JSON のパースには aeson パッケージを使うことにします。
まず、
data Cursor a = Cursor { cursorCurrent :: [a] -- ^ データを格納する , cursorNext :: Maybe Integer -- ^ 次のデータを取得するための cursor } deriving (Show, Eq)
のように Cursor 型を定義しておきます。
通常 aeson を使う場合は Data.Aeson が便利すぎる件 - melpon日記 - HaskellもC++もまともに扱えないへたれのページ にもあるように Cursor a を FromJSON 型クラスのインスタンスにしますが、 データを格納している JSON のフィールド名は、"users" 以外にも "ids" などの場合があるので、 応用が効くように通常の関数として次のように定義します。
parseCursor :: FromJSON a => T.Text -> Value -> AE.Parser (Cursor a) parseCursor key (Object o) = checkError o <|> Cursor <$> o .: key <*> o .:? "next_cursor" parseCursor _ v@(Array _) = return $ Cursor (maybe [] id $ AE.parseMaybe parseJSON v) Nothing parseCursor _ o = fail $ "Error at parseCursor: unknown object " ++ show o checkError :: Object -> AE.Parser a checkError o = do err <- o .:? "error" case err of Just msg -> throw $ TwitterErrorMessage msg (Object o) Nothing -> mzero
リクエストに問題がある場合には error フィールドを持つ JSON が返されるので、 それをパースして例外を送出する checkError 関数を定義しておきます。 checkError 関数はエラー以外の場合は mzero を返すように定義しているので、 エラーではない場合の処理と Alternative (<|>) で自然に連結することができます。
いま定義した parseCursor を使って Cursor をパースする Iteratee を定義します。
iterCursor' :: (Monad m, FromJSON a) => T.Text -> Iteratee Value m (Maybe (Cursor a)) iterCursor' key = do ret <- EL.head case ret of Just v -> return . AE.parseMaybe (parseCursor key) $ v Nothing -> return Nothing iterCursor :: (Monad m, FromJSON a) => T.Text -> Iteratee ByteString m (Maybe (Cursor a)) iterCursor key = enumLine =$ enumJSON =$ iterCursor' key
ここで enumLine , enumJSON は Enumeratee で、
enumJSON :: Monad m => Enumeratee ByteString Value m a enumJSON = E.sequence $ iterParser json enumLine :: Monad m => Enumeratee ByteString ByteString m a enumLine = EB.splitWhen newline where newline x = (x == 10) || (x == 13)
のように定義してあります。 E.sequence は Iteratee を引数に与えると、入力をその Iteratee に与え、 その結果を次の Iteratee に渡す Enumeratee を作る関数です。
Iteratee のエラー処理をする
さきほどの iterCursor の定義でも十分動作するのですが、 aeson が JSON のパースに失敗した場合に返すエラーは分かりづらく、 パースに失敗したときの入力内容もわかりません。
そこで、iterCursor を変更して、JSON のパースに失敗した場合、 パースに失敗したときの入力内容をエラーに含めるようにします。
iterCursor :: (Monad m, FromJSON a) => T.Text -> Iteratee ByteString m (Maybe (Cursor a)) iterCursor key = enumLine =$ handleParseError (enumJSON =$ iterCursor' key) handleParseError :: Monad m => Iteratee ByteString m b -> Iteratee ByteString m b handleParseError iter = iter `catchError` hndl where getChunk = continue return hndl e = getChunk >>= \x -> case x of Chunks xs -> throwError $ PerserException e xs _ -> throwError $ PerserException e []
catchError は Iteratee がエラーを返した場合、第二引数に渡された handler を呼びだし、 handler が返した Iteratee に処理をまかせます。
私はちょっと悩みましたが、エラーを起こした場合の入力が必要な場合には、 handler が「入力をひとつだけ受けとってエラーを返す Iteratee」を返すことで達成することができます。
list を取得する Enumerator を作ってみる
下準備も済んだので、いよいよ今回の目的の Enumerator 部分を作ります。
cursor を返す API をラッピングして、Enumerator にする apiCursor を次のように定義します。
apiCursor :: (FromJSON a, Show a) => String --- ^ API の endpoint -> HT.Query --- ^ QueryString -> T.Text --- ^ データが格納されている JSON のフィールド名 -> Integer --- ^ 初期の cursor -> Enumerator a TW b apiCursor uri query cursorKey initCur = checkContinue1 go initCur where go loop cursor k = do let query' = insertQuery "cursor" (Just . B8.pack . show $ cursor) query res <- lift $ run_ $ api uri query' (iterCursor cursorKey) case res of Just r -> do let nextCur = cursorNext r chunks = Chunks . cursorCurrent $ r case nextCur of Just 0 -> k chunks Just nc -> k chunks >>== loop nc Nothing -> k chunks Nothing -> k EOF insertQuery :: ByteString -> Maybe ByteString -> HT.Query -> HT.Query insertQuery key value = mk where mk = M.toList . M.insert key value . M.fromList
この apiCursor を使えば、lists/members.json にアクセスする Enumerator は簡単に作ることができます。 apiCursor はちょっと複雑なので解説は後回しにして、先に API にアクセスするインターフェースを作ってしまいましょう。
listsMembers :: ByteString -> ByteString -> Enumerator User TW a listsMembers user listname = apiCursor "https://api.twitter.com/1/lists/members.json" query "users" (-1) where query = [("owner_screen_name", Just user), ("slug", Just listname)]
ここでは Enumerator が出力する型は User 型としています。
data User = User { userId :: UserId , userName :: UserName , userScreenName :: ScreenName , userDescription :: T.Text , userLocation :: T.Text , userProfileImageURL :: Maybe URLString , userURL :: Maybe URLString , userProtected :: Maybe Bool , userFollowers :: Maybe Int } deriving (Show, Eq) instance FromJSON User where parseJSON (Object o) = checkError o <|> User <$> o .: "id" <*> o .: "name" <*> o .: "screen_name" <*> o .: "description" <*> o .: "location" <*> o .:? "profile_image_url" <*> o .:? "url" <*> o .:? "protected" <*> o .:? "followers_count" parseJSON _ = mzero
他の細かい型の定義は省略しますが、任意の FromJSON の型クラスのインスタンスの型が使えるので、 parseCursor や apiCursor は lists/members 以外の API にも使いまわすことができます。
JSON の型である Value 型自体も FromJSON のインスタンスなので、 もし User 型などでラップせずに生の情報が欲しい場合は型シグネチャを変更するだけで実現できます。
listsMembersRaw :: ByteString -> ByteString -> Enumerator Value TW a listsMembersRaw user listname = apiCursor "https://api.twitter.com/1/lists/members.json" query "users" (-1) where query = [("owner_screen_name", Just user), ("slug", Just listname)]
Enumerator のハラワタの中を見る
では、apiCursor の中身を見てみましょう。
Enumerator を作る場合、 checkContinue0 や、引数を取る場合の checkContinue1 を用いて定義することがほとんどです。 はじめに、簡単な方の checkContinue0 を見てみましょう。checkContinue0 は次のように定義されています
checkContinue0 :: Monad m => (Enumerator a m b -> (Stream a -> Iteratee a m b) -> Iteratee a m b) -> Enumerator a m b checkContinue0 inner = loop where loop (Continue k) = inner loop k loop step = returnI step
checkContinue0 は、 Enumerator と合成された Iteratee がまだデータを受けとる状態である Continue である場合はループし、 これ以上データを受けとらずに値を返す Yield や、エラー状態 Error の場合には終了する Enumerator を作ります。 checkContinue0 のリファレンスにもあるように、ある値を永久に Iteratee に流し込む Enumerator は checkContinue0 を使うと
repeat :: Monad m => a -> Enumerator a m b repeat x = checkContinue0 $ \loop k -> k (Chunks [x]) >>== loop
のように書くことができます。 ここでの k の型は Stream a -> Iteratee a m b ですので、 入力 Chunk a もしくは EOF を受けとって Iteratee を返す関数です。
ここで、Iteratee の定義を復習しておきます。
newtype Iteratee a m b = Iteratee { runIteratee :: m (Step a m b) } data Step a m b = Continue (Stream a -> Iteratee a m b) | Yield b (Stream a) | Error Exc.SomeException
Iteratee の実体は m (Step a m b) で、runIteratee してやれば中身を取りだせることがわかります。 Step は Continue, Yield, Error のどれかの状態を保持しています。
(>>==) は、Iteratee と Enumerator を合成し、新たな Iteratee を返す演算子で、その定義を見てやると
(>>==) :: Monad m => Iteratee a m b -> (Step a m b -> Iteratee a' m b') -> Iteratee a' m b' i >>== f = Iteratee (runIteratee i >>= runIteratee . f)
となっています。(>>==) の中で使われている (>>=) 演算子は Iteratee a m b の基盤のモナド m の bind です。 したがって、Enumerator と Iteratee が合成され、基盤のモナド m の処理に落しこんでいるのは Enumerator 側で行なわれていると見なせます。 したがって、Enumerator 側では、基盤となるモナド m のエラー処理を意識し、ファイルハンドルやロックなどの資源管理を適切に行なう必要があります。
今回の例では Enumerator 側では特に資源管理を意識する必要はありませんが、 実際にファイルなどの資源を管理する必要がある場合は enumFile の実装を参考にすると良いと思います。
apiCursor の実装に使っている checkContinue1 の型を見ると一見複雑に見えますが、 checkContinue0 の違いは状態を保持するための引数を渡すか否かの違いしかありません。
checkContinue1 :: Monad m => ((s1 -> Enumerator a m b) -> s1 -> (Stream a -> Iteratee a m b) -> Iteratee a m b) -> s1 -> Enumerator a m b checkContinue1 inner = loop where loop s (Continue k) = inner loop s k loop _ step = returnI step
結合する Iteratee が終了しているかの判定は checkContinue1 が見てくれるので、 後は checkContinue1 の引数である inner の実装をするだけです。 apiCursor の実装では go が該当します。
go loop cursor k = do let query' = insertQuery "cursor" (Just . B8.pack . show $ cursor) query res <- lift $ run_ $ api uri query' (iterCursor cursorKey) case res of Just r -> do let nextCur = cursorNext r chunks = Chunks . cursorCurrent $ r case nextCur of Just 0 -> k chunks Just nc -> k chunks >>== loop nc Nothing -> k chunks Nothing -> k EOF
引数として与えられた cursor を使ってクエリを生成し API にアクセスします。 res には Maybe (Cursor a) 型の値が入っているので、 これが Just の場合は中のデータを k :: Stream a -> Iteratee a m b に渡して処理を行なった後、 nextCursor が存在し、その値が 0 でない場合は loop し、 それ以外の場合はデータを Iteratee に手渡して終了する形になっていることがわかると思います。
使ってみる
完成した listMembers を実際に使ってみましょう。 以下にサンプルを示しました。
tokens の中の oauthConsumerKey, oAuthConsumerSecret は適切なものに置き換えてください。
もし、OAuth の Consumer key などが無い場合は https://dev.twitter.com/apps/new から作成してください。
main :: IO () main = do (username:listname:_) <- getArgs withCred $ do run_ $ listsMembers (B8.pack username) (B8.pack listname) $$ EL.mapM_ (liftIO . putStrLn . userScreenName) tokens :: OAuth tokens = OAuth { oauthServerName = "twitter" , oauthRequestUri = "http://twitter.com/oauth/request_token" , oauthAccessTokenUri = "http://twitter.com/oauth/access_token" , oauthAuthorizeUri = "http://twitter.com/oauth/authorize" , oauthConsumerKey = "Consumer Key を取得して入力してください" , oauthConsumerSecret = "Consumer Secret を取得して入力してください" , oauthSignatureMethod = OA.HMACSHA1 , oauthCallback = Nothing } withCred :: TW a -> IO a withCred task = do cred <- authorize tokens getPIN let env = TWEnv { twOAuth = tokens , twCredential = cred , twManager = Nothing } runTW env task where getPIN url = do putStrLn $ "browse URL: " ++ url putStr "> what was the PIN twitter provided you with? " hFlush stdout getLine authorize :: OAuth -> (String -> IO String) -> IO Credential authorize oauth getPIN = do cred <- OA.getTemporaryCredential oauth let url = OA.authorizeUrl oauth cred pin <- getPIN url OA.getAccessToken oauth $ OA.insert "oauth_verifier" (B8.pack pin) cred
api 関数に print デバッグを仕込んで十分大きなリストを取得してみると、 必要に応じてデータが取得されている様子が分かるでしょう。
まとめ
ちょっと駆け足でしたが、Iteratee の理解するには実際に作ってみるのが一番だと思います。 最近は Conduit の開発が活発なので、これも機会があれば紹介したいです。誰かが Haskell Advent Calendar 2011 で書いてくれないかなー (ちらっ)。
今回作ったプログラムのソースコードは https://gist.github.com/1493378 にアップロードしてあります。
あと、今回作ったライブラリは twitter-enumerator という名前で Hackage に登録しています。 まだまだ開発の途中なので意見などがあれば github リポジトリ や [twitter:@thimura] 宛にでもどしどし送ってください。
明日は Template Haskell マスターこと [twitter:@mr_konn] (id:mr_konn) さんです!