代码之家  ›  专栏  ›  技术社区  ›  Lamda

pyaudio和pynput:按键时录制

  •  2
  • Lamda  · 技术社区  · 8 年前

    我目前正在尝试制作一个简单的脚本,在按键时进行记录。我需要生成一些数据,所以脚本的目的是用一个句子提示终端,当说话人按下一个键时,脚本将开始录制,说话人在录制时阅读句子。当句子被说出并录制时,应松开键停止录制,从而创建音频文件。。

    from pynput import keyboard
    import time
    import pyaudio
    import wave
    
    CHUNK = 8192
    FORMAT = pyaudio.paInt16
    CHANNELS = 2
    RATE = 44100
    RECORD_SECONDS = 5
    WAVE_OUTPUT_FILENAME = "output.wav"
    
    p = pyaudio.PyAudio()
    frames = []
    
    def callback(in_data, frame_count, time_info, status):
        return (in_data, pyaudio.paContinue)
    
    class MyListener(keyboard.Listener):
        def __init__(self):
            super(MyListener, self).__init__(self.on_press, self.on_release)
            self.key_pressed = None
    
            self.stream = p.open(format=FORMAT,
                                 channels=CHANNELS,
                                 rate=RATE,
                                 input=True,
                                 frames_per_buffer=CHUNK,
                                 stream_callback = self.callback)
            print self.stream.is_active()
    
        def on_press(self, key):
            if key == keyboard.Key.cmd_l:
                self.key_pressed = True
    
        def on_release(self, key):
            if key == keyboard.Key.cmd_l:
                self.key_pressed = False
    
        def callback(self,in_data, frame_count, time_info, status):
            if self.key_pressed == True:
                return (in_data, pyaudio.paContinue)
            elif self.key_pressed == False:
                return (in_data, pyaudio.paComplete)
            else:
                return (in_data,pyaudio.paAbort)
    
    
    listener = MyListener()
    listener.start()
    started = False
    
    while True:
        time.sleep(0.1)
        if listener.key_pressed == True and started == False:
            started = True
            listener.stream.start_stream()
            print "start Stream"
    
        elif listener.key_pressed == False and started == True:
            print "Something coocked"
            listener.stream.stop_stream()
            listener.stream.close()
            p.terminate()
    
            wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
            wf.setnchannels(CHANNELS)
            wf.setsampwidth(p.get_sample_size(FORMAT))
            wf.setframerate(RATE)
            wf.writeframes(b''.join(frames))
            wf.close()
    
            started = False
    

    IOerror

    3 回复  |  直到 8 年前
        1
  •  1
  •   Ron Norris    8 年前

    这似乎至少在Windows、Python 3.5上有效。最初代码的最大问题是: 它做了一个while循环,几乎阻止了一切, 帧未附加到帧列表。回调现在执行此操作。

    from pynput import keyboard
    import time
    import pyaudio
    import wave
    import sched
    import sys
    
    CHUNK = 8192
    FORMAT = pyaudio.paInt16
    CHANNELS = 2
    RATE = 44100
    RECORD_SECONDS = 5
    WAVE_OUTPUT_FILENAME = "output.wav"
    
    p = pyaudio.PyAudio()
    frames = []
    
    def callback(in_data, frame_count, time_info, status):
        frames.append(in_data)
        return (in_data, pyaudio.paContinue)
    
    class MyListener(keyboard.Listener):
        def __init__(self):
            super(MyListener, self).__init__(self.on_press, self.on_release)
            self.key_pressed = None
            self.wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
            self.wf.setnchannels(CHANNELS)
            self.wf.setsampwidth(p.get_sample_size(FORMAT))
            self.wf.setframerate(RATE)
        def on_press(self, key):
            if key.char == 'r':
                self.key_pressed = True
            return True
    
        def on_release(self, key):
            if key.char == 'r':
                self.key_pressed = False
            return True
    
    
    listener = MyListener()
    listener.start()
    started = False
    stream = None
    
    def recorder():
        global started, p, stream, frames
    
        if listener.key_pressed and not started:
            # Start the recording
            try:
                stream = p.open(format=FORMAT,
                                 channels=CHANNELS,
                                 rate=RATE,
                                 input=True,
                                 frames_per_buffer=CHUNK,
                                 stream_callback = callback)
                print("Stream active:", stream.is_active())
                started = True
                print("start Stream")
            except:
                raise
    
        elif not listener.key_pressed and started:
            print("Stop recording")
            stream.stop_stream()
            stream.close()
            p.terminate()
            listener.wf.writeframes(b''.join(frames))
            listener.wf.close()
            print "You should have a wav file in the current directory"
            sys.exit()
        # Reschedule the recorder function in 100 ms.
        task.enter(0.1, 1, recorder, ())
    
    
    print "Press and hold the 'r' key to begin recording"
    print "Release the 'r' key to end recording"
    task = sched.scheduler(time.time, time.sleep)
    task.enter(0.1, 1, recorder, ())
    task.run()
    
        2
  •  0
  •   obgnaw    8 年前

    def callback(self,in_data, frame_count, time_info, status):
        print("callback")
        if self.key_pressed == True:
            #stream_queue.put(in_data)
            print("record")
            frames.append(in_data)
            return (in_data, pyaudio.paContinue)
    
        elif self.key_pressed == False:
            #stream_queue.put(in_data)
            frames.append(in_data)
            return (in_data, pyaudio.paComplete)
    
        else:
            print("not record")
            return (in_data,pyaudio.paContinue)
    

        3
  •  0
  •   vishvAs vAsuki    7 年前

    以下内容适用于我的Arch Linux系统。 pyaudio and pynput: recording while a key is being pressed/held down , https://gist.github.com/sloria/5693955

    # Inspirations: https://stackoverflow.com/questions/44894796/pyaudio-and-pynput-recording-while-a-key-is-being-pressed-held-down, https://gist.github.com/sloria/5693955
    import logging
    import sched
    import time
    import wave
    
    import pyaudio
    from pynput import keyboard
    
    
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)
    logging.basicConfig(
        level=logging.DEBUG,
        format="%(levelname)s:%(asctime)s:%(module)s:%(lineno)d %(message)s"
    )
    
    
    class KeyPressTriggeredRecorder(object):
        '''Helps record audio during the duration of key-presses.
        Records in mono by default.
    
        Example usage:
            recorder.KeyPressTriggeredRecorder("test.wav").record()
        '''
    
        def __init__(self, trigger_key=keyboard.Key.space, channels=1, rate=44100, frames_per_buffer=1024):
            self.trigger_key = trigger_key
            self.key_pressed = False
            self.recording_started = False
            self.recording_stopped = False
            self.channels = channels
            self.rate = rate
            self.frames_per_buffer = frames_per_buffer
            self.key_listener = keyboard.Listener(self._on_press, self._on_release)
            self.task_scheduler = sched.scheduler(time.time, time.sleep)
    
        def reset(self):
            self.key_pressed = False
            self.recording_started = False
            self.recording_stopped = False
    
        def _on_press(self, key):
            # logging.info(key)
            if key == self.trigger_key:
                self.key_pressed = True
            return True
    
        def _on_release(self, key):
            # logging.info(key)
            if key == self.trigger_key:
                self.key_pressed = False
                # Close listener
                return False
            return True
    
        def record(self, fname):
            logging.info("Waiting for any key")
            self.reset()
            self.key_listener.start()
            recording_file = RecordingFile(
                fname=fname, mode='wb', channels=self.channels, rate=self.rate,
                frames_per_buffer=self.frames_per_buffer)
            def keychek_loop():
                if self.key_pressed and not self.recording_started:
                    logging.info("Speak while you keep the key pressed.")
                    recording_file.start_recording()
                    self.recording_started = True
                elif not self.key_pressed and self.recording_started:
                    recording_file.stop_recording()
                    self.recording_stopped = True
                if not self.recording_stopped:
                    self.task_scheduler.enter(delay=.1, priority=1, action=keychek_loop)
                    self.task_scheduler.run()
            keychek_loop()
    
    
    class RecordingFile(object):
        """"Type of object corresponding to a particular recording.
    
        See :py:class:KeyPressTriggeredRecorder for example usage.
        """
        def __init__(self, fname, mode, channels,
                     rate, frames_per_buffer):
            self.fname = fname
            self.mode = mode
            self.channels = channels
            self.rate = rate
            self.frames_per_buffer = frames_per_buffer
            self._pa = pyaudio.PyAudio()
            self.chosen_device_index = -1
            for x in range(0,self._pa.get_device_count()):
                info = self._pa.get_device_info_by_index(x)
                # logging.info(self._pa.get_device_info_by_index(x))
                if info["name"] == "pulse":
                    self.chosen_device_index = info["index"]
                    # logging.debug("Chosen index: %d", self.chosen_device_index)     
            self.wavefile = self._prepare_file(self.fname, self.mode)
            self._stream = None
    
        def __enter__(self):
            return self
    
        def __exit__(self, exception, value, traceback):
            self.close()
    
        def record(self, duration):
            # Use a stream with no callback function in blocking mode
            self._stream = self._pa.open(format=pyaudio.paInt16,
                                         channels=self.channels,
                                         rate=self.rate,
                                         input_device_index=self.chosen_device_index,
                                         input=True,
                                         frames_per_buffer=self.frames_per_buffer)
            for _ in range(int(self.rate / self.frames_per_buffer * duration)):
                audio = self._stream.read(self.frames_per_buffer)
                self.wavefile.writeframes(audio)
            return None
    
        def start_recording(self):
            # Use a stream with a callback in non-blocking mode
            # logging.info("Starting recording")
            self._stream = self._pa.open(format=pyaudio.paInt16,
                                         channels=self.channels,
                                         rate=self.rate,
                                         input=True,
                                         frames_per_buffer=self.frames_per_buffer,
                                         stream_callback=self._get_callback())
            self._stream.start_stream()
            return self
    
        def stop_recording(self):
            self._stream.stop_stream()
            return self
    
        def _get_callback(self):
            def callback(in_data, frame_count, time_info, status):
                self.wavefile.writeframes(in_data)
                return in_data, pyaudio.paContinue
            return callback
    
    
        def close(self):
            self._stream.close()
            self._pa.terminate()
            self.wavefile.close()
    
        def _prepare_file(self, fname, mode='wb'):
            import os
            os.makedirs(os.path.dirname(fname), exist_ok=True)
            wavefile = wave.open(fname, mode)
            wavefile.setnchannels(self.channels)
            wavefile.setsampwidth(self._pa.get_sample_size(pyaudio.paInt16))
            wavefile.setframerate(self.rate)
            return wavefile
    
    recorder.KeyPressTriggeredRecorder().record(FILEPATH)