Conduitを使ってみる

なんかIO扱ったりするのにConduitが熱いらしいので使ってみた。まだよく分かってないのでたぶん色々間違ってる。
ConduitではSourceから一つずつ流れてくるデータをConduitで流れ方を変えたり加工したりしてSinkに流す。SourceとSinkがファイルでConduitが無い場合(つまりファイルの中身を全部コピーするだけ)の例は以下の通り。

import Data.Conduit (($$))
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB

main :: IO ()
  C.runResourceT
  $ CB.sourceFile "in.txt"
  $$ CB.sinkFile "out.txt"

sourceFileでファイルの中身をまとめて流すSourceを作り、sinkFileでファイルに書き出すSinkを作り、それらを$$で繋ぎ合わせて、runResourceTで実行する。次はConduitを挟んで加工する例。(ちなみにこのコード、out.txtに余計な改行が一つ入ってしまうのでどこか間違えてるはずなんだけどよく分からない……)

import Data.Conduit (($=),($$))
import qualified Data.Conduit as C
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import qualified Data.ByteString as DB
import qualified Data.ByteString.UTF8 as BU
    
main :: IO ()
main =
  C.runResourceT
  $ CB.sourceFile "in.txt"
  $= CB.lines --一行ごとに区切って流す
  $= CL.isolate 6 --六個の要素(=六行)だけ流して残りは無視(その時点で終了)
  $= (C.sequence $ CL.take 2) --二個ずつ纏めたリストにして流す
  $= CL.map (foldl DB.append (BU.fromString "")) --各要素について、リストの中身を結合
  $= CL.map (`DB.append` (BU.fromString "\n")) --各要素について、末尾に改行を付ける
  $$ CB.sinkFile "out.txt"

新しく追加された行で$=の右側にあるのは全部Conduit。$=はSourceとConduitを引数に取って新しいSourceを返すので、結局CB.sourceFileからCL.map (`DB.append` (BU.fromString "\n"))までがSourceということになる。
in.txtの中身が一行目に1、二行目に2……と十行続くようなファイルだったとする(改行コードはLF)と、各SourceとConduitの後で以下のように流れてる。なお「a→b→c ...」という表記は要素a, b, c,...が順に流れているということ。

main =
  C.runResourceT
  $ CB.sourceFile "in.txt" --「"1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n"」
  $= CB.lines --「"1"→"2"→"3"→"4"→"5"→"6"→"7"→"8"→"9"→"10"」
  $= CL.isolate 6 --「"1"→"2"→"3"→"4"→"5"→"6"」
  $= (C.sequence $ CL.take 2) --「["1","2"]→["3","4"]→["5","6"]」
  $= CL.map (foldl DB.append (BU.fromString "")) --「"12"→"34"→"56"」
  $= CL.map (`DB.append` (BU.fromString "\n")) --「"12\n"→"34\n"→"56\n"」
  $$ CB.sinkFile "out.txt"

network-conduitのrunTCPServerというのを使うと、今ファイルでやっていることをTCPサーバにすることもできる(クライアントからメッセージを受け取って(Source)、クライアントにメッセージを返す(Sink))。

import Data.Conduit (($=),($$))
import qualified Data.Conduit as C
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import qualified Data.ByteString as DB
import qualified Data.ByteString.UTF8 as BU
import Text.Regex
import Data.Conduit.Network

main :: IO ()
main =
  runTCPServer (ServerSettings 20017 (Host "127.0.0.1"))
    (\src -> \sink ->
      src
      $= CB.lines
      $= CL.isolate 6
      $= (C.sequence $ CL.take 2)
      $= CL.map (foldl DB.append (BU.fromString ""))
      $= CL.map (`DB.append` (BU.fromString "\n"))
      $$ sink
    )

ここに接続して1,2,3...と順にメッセージを送っていけば、ファイルの例と同じ結果が得られるはず。