代码之家  ›  专栏  ›  技术社区  ›  Armin Ronacher

使用Python获取文件的最后n行,类似于tail

  •  162
  • Armin Ronacher  · 技术社区  · 17 年前

    我正在为一个web应用程序编写一个日志文件查看器,为此我想对日志文件的行进行分页。文件中的项目是基于行的,最新的项目位于底部。

    所以我需要一个 tail() 可以读取的方法 n 从底部开始的线并支持偏移。这是我想出的帽子:

    def tail(f, n, offset=0):
        """Reads a n lines from f with an offset of offset lines."""
        avg_line_length = 74
        to_read = n + offset
        while 1:
            try:
                f.seek(-(avg_line_length * to_read), 2)
            except IOError:
                # woops.  apparently file is smaller than what we want
                # to step back, go to the beginning instead
                f.seek(0)
            pos = f.tell()
            lines = f.read().splitlines()
            if len(lines) >= to_read or pos == 0:
                return lines[-to_read:offset and -offset or None]
            avg_line_length *= 1.3
    

    这是否合理的做法?使用偏移量跟踪日志文件的推荐方法是什么?

    29 回复  |  直到 5 年前
        1
  •  5
  •   Zhen Wang    6 年前

    这可能比你的快。对线路长度不作任何假设。一次返回一个块,直到找到正确数量的'\n'字符。

    def tail( f, lines=20 ):
        total_lines_wanted = lines
    
        BLOCK_SIZE = 1024
        f.seek(0, 2)
        block_end_byte = f.tell()
        lines_to_go = total_lines_wanted
        block_number = -1
        blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                    # from the end of the file
        while lines_to_go > 0 and block_end_byte > 0:
            if (block_end_byte - BLOCK_SIZE > 0):
                # read the last block we haven't yet read
                f.seek(block_number*BLOCK_SIZE, 2)
                blocks.append(f.read(BLOCK_SIZE))
            else:
                # file too small, start from begining
                f.seek(0,0)
                # only read what was not read
                blocks.append(f.read(block_end_byte))
            lines_found = blocks[-1].count('\n')
            lines_to_go -= lines_found
            block_end_byte -= BLOCK_SIZE
            block_number -= 1
        all_read_text = ''.join(reversed(blocks))
        return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
    

    通常,这将在通过环路的第一个或第二个通道上定位最后20条线。如果你的74个字符的东西实际上是准确的,你使块大小2048,你会尾随20行几乎立即。

    另外,我也不会在尝试与物理操作系统模块协调时消耗大量的大脑热量。使用这些高级I/O包,我怀疑您是否会看到尝试在操作系统块边界上对齐的任何性能后果。如果使用较低级别的I/O,则可能会看到加速。


    使现代化

    对于Python3.2及更高版本,请按照文本文件(那些打开时未使用 “b” 在模式字符串中),仅允许相对于文件开头的查找(例外情况是使用查找(0,2)查找文件结尾):

    如: f = open('C:/.../../apache_logs.txt', 'rb')

     def tail(f, lines=20):
        total_lines_wanted = lines
    
        BLOCK_SIZE = 1024
        f.seek(0, 2)
        block_end_byte = f.tell()
        lines_to_go = total_lines_wanted
        block_number = -1
        blocks = []
        while lines_to_go > 0 and block_end_byte > 0:
            if (block_end_byte - BLOCK_SIZE > 0):
                f.seek(block_number*BLOCK_SIZE, 2)
                blocks.append(f.read(BLOCK_SIZE))
            else:
                f.seek(0,0)
                blocks.append(f.read(block_end_byte))
            lines_found = blocks[-1].count(b'\n')
            lines_to_go -= lines_found
            block_end_byte -= BLOCK_SIZE
            block_number -= 1
        all_read_text = b''.join(reversed(blocks))
        return b'\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
    
        2
  •  1
  •   itsjwala    6 年前

    import os
    def tail(f, n, offset=0):
      stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
      stdin.close()
      lines = stdout.readlines(); stdout.close()
      return lines[:,-offset]
    

    对于python 3,您可以执行以下操作:

    import subprocess
    def tail(f, n, offset=0):
        proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
        lines = proc.stdout.readlines()
        return lines[:, -offset]
    
        3
  •  0
  •   Blaine McMahon    6 年前

    这是我的答案。纯python。使用timeit看起来相当快。跟踪包含100000行的日志文件的100行:

    >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
    0.0014600753784179688
    >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
    0.00899195671081543
    >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
    0.05842900276184082
    >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
    0.5394978523254395
    >>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
    5.377126932144165
    

    代码如下:

    import os
    
    
    def tail(f, lines=1, _buffer=4098):
        """Tail a file and get X lines from the end"""
        # place holder for the lines found
        lines_found = []
    
        # block counter will be multiplied by buffer
        # to get the block size from the end
        block_counter = -1
    
        # loop until we find X lines
        while len(lines_found) < lines:
            try:
                f.seek(block_counter * _buffer, os.SEEK_END)
            except IOError:  # either file is too small, or too many lines requested
                f.seek(0)
                lines_found = f.readlines()
                break
    
            lines_found = f.readlines()
    
            # we found enough lines, get out
            # Removed this line because it was redundant the while will catch
            # it, I left it for history
            # if len(lines_found) > lines:
            #    break
    
            # decrement the block counter to get the
            # next X bytes
            block_counter -= 1
    
        return lines_found[-lines:]
    
        4
  •  0
  •   rish_hyun    4 年前

    如果可以读取整个文件,则使用deque。

    from collections import deque
    deque(f, maxlen=n)
    

    在2.6之前,deques没有maxlen选项,但它很容易实现。

    import itertools
    def maxque(items, size):
        items = iter(items)
        q = deque(itertools.islice(items, size))
        for item in items:
            del q[0]
            q.append(item)
        return q
    

    如果需要从末尾读取文件,则使用gallop(也称为指数)搜索。

    def tail(f, n):
        assert n >= 0
        pos, lines = n+1, []
        while len(lines) <= n:
            try:
                f.seek(-pos, 2)
            except IOError:
                f.seek(0)
                break
            finally:
                lines = list(f)
            pos *= 2
        return lines[-n:]
    
    推荐文章