代码之家  ›  专栏  ›  技术社区  ›  arlen

使用外部数据使用pyqtgraph打印

  •  2
  • arlen  · 技术社区  · 7 年前

    我正试图实时绘制不同传感器的数据,因此我决定在PyQt中使用PyQtGraph绘制数据,以使其能够处理来自不同来源的多个传感器数据。 在互联网上搜索示例,我找到了一个,并尝试对其进行改编, 因为QtGui。Q应用程序。实例()。exec\(),它带来了一个不便的副作用,即阻止后面其余代码的执行。我尝试使用多进程管理使用的线程。我可以让其余的代码正常工作,但如何使用外部数据(Plo2D.update2)更新绘图,我尝试使用多处理。排队,但我没有工作,反而出现按摩窗口必须关闭。

    from pyqtgraph.Qt import QtGui, QtCore
    import numpy as np
    from numpy import arange
    import pyqtgraph as pg
    import sys
    import multiprocessing
    
    class Plot2D():
        def __init__(self,):
            self.traces = dict()
            self.app = QtGui.QApplication([])
            self.win = pg.GraphicsWindow(title="Dibujar")
            self.win.resize(1000, 600)
            self.win.setWindowTitle('Ejemplo')
            pg.setConfigOptions(antialias=True)
            #self.canvas = self.win.addPlot(title="Pytelemetry")
            self.waveform1 = self.win.addPlot(title='WAVEFORM1', row=1, col=1)
            self.waveform2 = self.win.addPlot(title='WAVEFORM2', row=2, col=1)
    
        def start(self):
            if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
                QtGui.QApplication.instance().exec_()
    
        def set_plotdata(self, name, datax, datay):
            if name in self.traces:
                self.traces[name].setData(datax, datay)
            else:
                if name == '910D':
                    self.traces[name] = self.waveform1.plot(pen='c', width=3)
                if name == 'MPU':
                    self.traces[name] = self.waveform2.plot(pen='c', width=3)
    
        def update2(self):
            # Trying to get external data
            ptm1 = globals()['DatExt1']
            ptm2 = globals()['DatExt2']
            while ptm1.empty() is False:
                self.data1 = ptm1.get()
                self.set_plotdata('MPU', self.data1[0], self.data1[1])
                # csvWriterG910D.writerows(Informa)
                # file1.flush()
            while ptm2.empty() is False:
                self.data2 = ptm2.get()
                self.set_plotdata('910D', self.data1[0], self.data1[1])
    
        def animation(self):
            timer = QtCore.QTimer()
            timer.timeout.connect(self.update2)
            timer.start(60)
            self.start()
    
    # It is thread started from main.py
    def ShowData(Data1, Data2): # Data1,Data2 : multiprocessing.Queue
        DatExt1 = Data1
        DatExt2 = Data2
        p = Plot2D()
        p.animation()
    

    主要的。py:

        if __name__ == '__main__':
    
        Data1 = multiprocessing.Queue()
        Data2 = multiprocessing.Queue()
    
        Plottingdata = Process(target=PlotData.ShowData, args=(Data1, Data2, ))
        Plottingdata.start()
    
        t = np.arange(-3.0, 2.0, 0.01)
        i = 0.0
        while True:
            s = np.sin(2 * 2 * 3.1416 * t) / (2 * 3.1416 * t + i)
            time.sleep(1)
            Data1.put([t, s])
            i = i + 0.1
    

    感谢ind advanced的帮助

    1 回复  |  直到 7 年前
        1
  •  0
  •   eyllanesc Yonghwan Shin    7 年前

    您应该使用多线程,而不是使用多处理,即创建负责收集数据(在您的示例中为仿真数据)的线程,然后通过信号将数据发送到GUI。

    打印数据。py公司

    from pyqtgraph.Qt import QtGui, QtCore
    import numpy as np
    import pyqtgraph as pg
    import sys
    
    class Plot2D(pg.GraphicsWindow):
        def __init__(self):
            pg.GraphicsWindow.__init__(self, title="Dibujar")
            self.traces = dict()
            self.resize(1000, 600)
            pg.setConfigOptions(antialias=True)
            #self.canvas = self.win.addPlot(title="Pytelemetry")
            self.waveform1 = self.addPlot(title='WAVEFORM1', row=1, col=1)
            self.waveform2 = self.addPlot(title='WAVEFORM2', row=2, col=1)
    
        def set_plotdata(self, name, x, y):
            if name in self.traces:
                self.traces[name].setData(x, y)
            else:
                if name == "910D":
                    self.traces[name] = self.waveform1.plot(x, y, pen='y', width=3)
                elif name == "MPU":
                    self.traces[name] = self.waveform2.plot(x, y, pen='y', width=3)
    
        @QtCore.pyqtSlot(str, tuple)
        def updateData(self, name, ptm):
            x, y = ptm
            self.set_plotdata(name, x, y)
    
    if __name__ == '__main__':
        app = QtGui.QApplication(sys.argv)
        plot = Plot2D()
        if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
            QtGui.QApplication.instance().exec_()
    

    主要的py公司

    import sys
    
    from pyqtgraph.Qt import QtCore, QtGui
    import threading
    import numpy as np
    import time
    
    from PlotData import Plot2D
    
    class Helper(QtCore.QObject):
        changedSignal = QtCore.pyqtSignal(str, tuple)
    
    def create_data1(helper, name):
        t = np.arange(-3.0, 2.0, 0.01)
        i = 0.0
        while True:
            s = np.sin(2 * 2 * 3.1416 * t) / (2 * 3.1416 * t + i)
            time.sleep(.1)
            helper.changedSignal.emit(name, (t, s))
            i = i + 0.1
    
    def create_data2(helper, name):
        t = np.arange(-3.0, 2.0, 0.01)
        i = 0.0
        while True:
            s = np.cos(2 * 2 * 3.1416 * t) / (2 * 3.1416 * t - i)
            time.sleep(.1)
            helper.changedSignal.emit(name, (t, s))
            i = i + 0.1
    
    if __name__ == '__main__':
        app = QtGui.QApplication(sys.argv)
        helper = Helper()
        plot = Plot2D()
        helper.changedSignal.connect(plot.updateData, QtCore.Qt.QueuedConnection)
        threading.Thread(target=create_data1, args=(helper, "910D"), daemon=True).start()
        threading.Thread(target=create_data2, args=(helper, "MPU"), daemon=True).start()
        if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
            QtGui.QApplication.instance().exec_()