代码之家  ›  专栏  ›  技术社区  ›  wishi

在Bash中使用fifo队列

  •  3
  • wishi  · 技术社区  · 10 年前

    我编写了一个使用inotify工具和inotify接口的小型Bash脚本。我的问题是,这个函数中的一个命令可以阻止执行,直到它完成。这样功能就会卡住。

    为了解决这个问题,我希望(通过关闭事件)对检测到的文件进行排队,并从另一个函数读取队列。有人知道如何在巴什做这件事吗?

    下面的变量是用于查找目录或分配文件名的简单字符串。

    inotifywait -mrq -e close --format %w%f /some/dir/ | while read FILE
    do
        NAME=$(echo $CAP)_"`date +"%F-%H-%M-%S"`.pcap"
        logger -i "$FILE was just closed"
        # cp "$FILE" "$DATA/$CAP/$ENV/$NAME"
        rsync -avz --stats --log-file=/root/rsync.log "$FILE" "$DATA/$CAP/$ENV/$NAME" >> /root/rsync_stats.log
        RESULT=$?
        if [ $RESULT -eq 0 ] ; then
            logger -i "Success: $FILE copied to SAN $DATA/$CAP/$ENV/$NAME, code $RESULT"
        else
            logger -i "Fail:    $FILE copy failed to SAN for $DATA/$CAP/$ENV/$NAME, code $RESULT"
        fi
    
        rm "$FILE"
        RESULT=$?
        if [ $RESULT -eq 0 ] ; then
            logger -i "Success: deletion successfull for $FILE, code $RESULT"
        else
            logger -i "Fail:    deletion failed for $FILE on SSD, code $RESULT"
        fi
    
        do_something()
        logger -i "$NAME was handled"
        # for stdout
        echo "`date`: Moved file" 
    done
    

    我正在将文件复制到SAN卷,该卷有时会有响应时间变化。这就是为什么这个功能会被卡住一段时间的原因。我用Rsync替换了cp,因为我需要吞吐量统计数据。Cp(来自coreutils)显然不这样做。

    2 回复  |  直到 10 年前
        1
  •  3
  •   Petr Skocik    10 年前

    几个想法:

    1) 可以将命名管道用作有限大小的队列:

    mkfifo pipe
    
    your_message_source | while read MSG
    do
      #collect files in a pipe 
      echo "$MSG" >> pipe
    done &
    
    while read MSG 
    do
     #Do your blocking work here
    done < pipe
    

    这将阻止 echo "$MSG" >> pipe 当管道的缓冲区被填充时(可以使用 ulimit -p (乘以512)。这在某些情况下可能就足够了。

    2) 您可以将文件用作消息队列,并在每次操作时对其进行文件锁定:

     #Feeder part
        your_message_source | while read MSG     
           do
                (
                flock 9
                echo "$MSG" >> file_based_queue 
                ) 9> file_based_queue 
           done &
    
       # Worker part
       while :
       do 
        #Lock shared queue and cut'n'paste it's content to the worker's private queue
        (
          flock 9
          cp file_based_queue workers_queue
          truncate -s0 file_based_queue   
        ) 9> file_based_queue
    
        #process private queue
        while read MSG 
        do
         #Do your blocking work here   
        done < workers_queue 
       done
    

    如果你在(群…)9>file_based_queue子shell和flock命令之后。您可以将队列放在RAM磁盘(/dev/shm)中,以尽量减少在那里花费的时间,这样就不会错过FS事件。

    3) 或者,您可以使用一些bash接口(或使用具有接口的语言执行脚本)数据库支持的消息队列或SysV消息队列。

        2
  •  2
  •   Quatro por Quatro    9 年前

    这是使用文件作为FIFO队列的示例, 具有无限大小,在系统重新启动时持续存在,并且 允许多个读写器。

    #!/bin/bash
    
    # manages a FIFO queue on a system file.
    #  every message is a line of text.
    #  supports multiple writers and multiple readers.
    #
    # Requires: bash, inotify-tools: /usr/bin/inotifywait,
    # ed, util-linux: /usr/bin/flock
    
    set -e
    
    # Retrieves one element
    # param:
    #  pipe_name
    # writes to stdout:
    #  element_string
    # returns:
    #  true on succes, false on error or end of data
    _pipe_pull() {
        local pipe="${1}"
        local msg pid
    
    _pipe_pop() {
        local fd1
        ( if ! flock --timeout 1 --exclusive ${fd1}; then
            echo "Error: _pipe_pop can't get a lock." >&2
            return 1
        fi
            [ ! -s "${pipe}" ] || \
                ed -s "${pipe}" <<< $'1p\n1d\nw'
        ) {fd1}< "${pipe}"
        :
    }
    
        msg=""
        while [ -z "${msg}" ]; do
            if [ ! -s "${pipe}" ]; then
                inotifywait -e modify "${pipe}" > /dev/null 2>&1 &
                pid="${!}"
                wait "${pid}" || return 1
            fi
    
            msg="$(_pipe_pop)" || \
                return 1
    
            if [ "${msg}" = $'\x04' ]; then
                echo "_pipe_pull: end of data." >&2
                return 1
            fi
        done
        printf '%s\n' "${msg}"
        :
    }
    
    # Adds multiple elements at once
    # param:
    #  pipe_name elem_1 ... elem_N
    # returns:
    #  true on succes, false on error
    _pipe_push() {
        local pipe="${1}"
        local fd1
        shift
    
        ( if ! flock --timeout 10 --exclusive ${fd1}; then
            echo "Error: _pipe_push can't get a lock." >&2
            return 1
        fi
            printf '%s\n' "${@}" >> "${pipe}"
        ) {fd1}< "${pipe}"
    }
    
    pipe_file_1="$(mktemp /tmp/pipe-XXXXXX.txt)"
    
    # submit first reader process
    while msg="$(_pipe_pull "${pipe_file_1}")"; do
        printf 'Reader 1:%s\n' "${msg}"
    done &
    
    # submit second reader process
    while msg="$(_pipe_pull "${pipe_file_1}")"; do
        printf 'Reader 2:%s\n' "${msg}"
    done &
    
    # submit first writer process
    for i in {1..10}; do
        _pipe_push "${pipe_file_1}" "Next comes ${i}" "${i}"
    done &
    pid1="${!}"
    
    # submit second writer process
    for i in {11..20}; do
        _pipe_push "${pipe_file_1}" "${i}" "Previous was ${i}"
    done &
    pid2="${!}"
    
    # submit third writer process
    for i in {21..30}; do
        _pipe_push "${pipe_file_1}" "${i}"
    done &
    pid3="${!}"
    
    # waiting for the end of writer processes
    wait ${pid1} ${pid2} ${pid3}
    
    # signal end of data to two readers
    _pipe_push "${pipe_file_1}" $'\x04' $'\x04'
    
    # waiting for the end of reader processes
    wait
    
    # cleaning
    rm -vf "${pipe_file_1}"
    :
    
    推荐文章