代码之家  ›  专栏  ›  技术社区  ›  Masse

读取文件时限制内存使用

  •  5
  • Masse  · 技术社区  · 15 年前

    我是一个哈斯克尔初学者,认为这是一个很好的练习。我有一个 当我需要读取线程A中的文件时,处理文件行 在线程b_i中,然后将结果输出到线程c。

    我已经实现了这一点,但其中一个要求是 无法信任整个文件是否适合内存。我希望那懒惰 IO和垃圾收集器可以为我这样做,但是遗憾的是内存使用率 不断上升。

    读卡器线程(A)使用 readFile 然后拉上拉链 用线号和包装。然后写下这些压缩线 到 Control.Concurrent.Chan . 每个消费线程B都有自己的通道。

    每个使用者在有数据时读取自己的通道,如果regex 匹配,输出到各自的输出通道包装 在“可能”(由列表组成)。

    打印机检查每个B线程的输出通道。如果没有 结果(行)为空,打印行。从现在起 不应该引用旧的行,我认为垃圾 收集者可以释放这些线,但唉,我似乎在 这里错了。

    .lhs文件在这里: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs

    所以问题是,如何限制内存使用,或者允许垃圾 收集器以拆下管路。

    根据要求的代码段。希望缩进不会破坏得太严重:)

    data Global = Global {done :: MVar Bool, consumers :: Consumers}
    type Done = Bool
    type Linenum = Int
    type Line = (Linenum, Maybe String)
    type Output = MVar [Line]
    type Input = Chan Line
    type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
    type State a = ReaderT Global IO a
    
    
    producer :: [Input] -> FilePath -> State ()
    producer c p = do
      liftIO $ Main.log "Starting producer"
      d <- asks done
      f <- liftIO $ readFile p
      mapM_ (\l -> mapM_
        (liftIO . flip writeChan l) c)
        $ zip [1..] $ map Just $ lines f
      liftIO $ modifyMVar_ d (return . not)
    
    printer :: State ()
    printer = do
      liftIO $ Main.log "Starting printer"
      c <- (fmap (map (snd . snd) . M.elems)
        (asks consumers >>= liftIO . readMVar))
      uniq' c
      where head' :: Output -> IO Line
        head' ch = fmap head (readMVar ch)
    
        tail' = mapM_ (liftIO . flip modifyMVar_
            (return . tail))
    
        cont ch = tail' ch >> uniq' ch
    
        printMsg ch = readMVar (head ch) >>=
            liftIO . putStrLn . fromJust . snd . head
    
        cempty :: [Output] -> IO Bool
        cempty ch = fmap (any id)
            (mapM (fmap ((==) 0 . length) . readMVar ) ch)
    
        {- Return false unless none are Nothing -}
        uniq :: [Output] -> IO Bool
        uniq ch = fmap (any id . map (isNothing . snd))
            (mapM (liftIO . head') ch)
    
        uniq' :: [Output] -> State ()
        uniq' ch = do
          d <- consumersDone
          e <- liftIO $ cempty ch
          if not e
            then  do
              u <- liftIO $ uniq ch
              if u then cont ch else do
            liftIO $ printMsg ch
            cont ch
              else unless d $ uniq' ch
    
    1 回复  |  直到 12 年前
        1
  •  6
  •   sclv    15 年前

    并发编程不提供定义的执行顺序,除非您自己使用MVAR等来强制执行。因此,很可能生产线会在任何消费者读出并传递之前将所有/大部分行粘贴到chan中。另一个应该满足需求的架构是让线程A调用lazy readfile并将结果粘贴到MVAR中。然后,每个使用者线程获取MVAR,读取一行,然后在继续处理该行之前替换MVAR。即使这样,如果输出线程不能跟上,那么存储在chan上的匹配行的数量也可以任意增加。

    您所拥有的是推送架构。要真正使它在恒定的空间中工作,就要以需求驱动的方式思考。找到一种机制,使输出线程向处理线程发出信号,表示它们应该做些什么,并使处理线程向读卡器线程发出信号,表示它们应该做些什么。

    另一种方法是使用有限大小的通道——这样,当处理器线程没有赶上时,读线程会阻塞,而当输出线程没有赶上时,处理器线程会阻塞。

    总的来说,这个问题实际上让我想起了TimBray的WideFinder基准,尽管需求有所不同。无论如何,它导致了关于实现多核grep的最佳方法的广泛讨论。最大的问题是这个问题是IO绑定的,您需要在mmap文件上有多个读线程。

    在这里看到的比你想知道的更多: http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder