代码之家  ›  专栏  ›  技术社区  ›  Simon Hessner

在Swift 4中从InputStream读取n个字节

  •  1
  • Simon Hessner  · 技术社区  · 7 年前

    我有一个通过TCP向我发送消息的服务器,其中前4个字节决定消息其余部分的长度。所以我需要

    1) 将4个字节读入UInt32(works)并将其存储到 应为bytes\u

    2) 阅读 应为bytes\u 字节到 消息

    现在,我的代码如下所示:

    private let inputStreamAccessQueue  = DispatchQueue(label: "SynchronizedInputStreamAccess")
    

    func inputStreamHandler(_ event: Stream.Event) {
        switch event {
            case Stream.Event.hasBytesAvailable:
                self.handleInput()
    
            ...
        }
    }
    

    func handleInput() {
        // **QUESTION: Do I use this barrier wrong?**
        self.inputStreamAccessQueue.sync(flags: .barrier) {            
            guard let istr = self.inputStream else {
                log.error(self.buildLogMessage("InputStream is nil"))
                return
            }
    
            guard istr.hasBytesAvailable else {
                log.error(self.buildLogMessage("handleInput() called when inputstream has no bytes available"))
                return
            }
    
            let lengthbuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: 4)
            defer { lengthbuffer.deallocate(capacity: 4) }
            let lenbytes_read = istr.read(lengthbuffer, maxLength: 4)
    
            guard lenbytes_read == 4 else {
                self.errorHandler(NetworkingError.InputError("Input Stream received \(lenbytes_read) (!=4) bytes"))
                return
            }
    
            let bytes_expected = Int(UnsafeRawPointer(lengthbuffer).load(as: UInt32.self).bigEndian)
            log.info(self.buildLogMessage("expect \(bytes_expected) bytes"))
    
            print("::DEBUG", call, "bytes_expected", bytes_expected)
    
            var message = ""
            var bytes_missing = bytes_expected
            while bytes_missing > 0 {
                //print("::DEBUG", call, "bytes_missing", bytes_missing)
                let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bytes_missing)
                let bytes_read = istr.read(buffer, maxLength: bytes_missing)
    
                print("::DEBUG", call, "bytes_read", bytes_read)
    
                guard bytes_read > 0 else {
                    print("bytes_read not > 0: \(bytes_read)")
                    return
                }
    
                guard bytes_read <= bytes_missing else {
                    print("Read more bytes than expected. missing=\(bytes_missing), read=\(bytes_read)")
                    return
                }
    
                guard let partial_message = String(bytesNoCopy: buffer, length: bytes_read, encoding: .utf8, freeWhenDone: true) else {
                    log.error("ERROR WHEN READING")
                    return
                }
    
                message = message + partial_message
                bytes_missing -= bytes_read
            }
    
            self.handleMessage(message)
        }
    }
    

    我的问题是istr。read(buffer,maxLength:bytes\u missing)有时不会一次读取所有消息,所以我会循环,直到读取完所有需要的消息。但我仍然看到我的应用程序崩溃(很少),因为再次调用handleInput(),而对该方法的另一个调用仍在运行。在这种情况下,bytes\u expected包含随机值,应用程序因非法内存分配而崩溃。

    我想我可以通过使用屏障来避免这种情况。但这似乎不起作用。。。我用错屏障了吗?

    2 回复  |  直到 7 年前
        1
  •  2
  •   Martin R    7 年前

    我的建议是不要反对网络I/O的异步特性。 每当 Stream.Event.hasBytesAvailable 事件 已发出信号。如果缓冲区包含足够的数据(4个长度字节加上 预期的消息长度),然后处理数据并将其删除。否则什么也不做 等待更多数据。

    以下(未测试的)代码旨在演示。 它只显示与此特定问题相关的部分。 为简洁起见,省略了初始化、事件处理程序等。

    class MessageReader {
    
        var buffer = Data(count: 1024) // Must be large enough for largest message + 4
        var bytesRead = 0 // Number of bytes read so far
    
        // Called from `handleInput` with a complete message.
        func processMessage(message: Data) {
            // ...
        }
    
        // Called from event handler if `Stream.Event.hasBytesAvailable` is signalled.
        func handleInput(istr: InputStream) {
            assert(bytesRead < buffer.count)
    
            // Read from input stream, appending to previously read data:
            let maxRead = buffer.count - bytesRead
            let amount = buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
                istr.read(p + bytesRead, maxLength: maxRead)
            }
            guard amount > 0 else {
                // handle EOF or read error ...
                fatalError()
            }
            bytesRead += amount
    
            while bytesRead >= 4 {
                // Read message size:
                let messageSize = buffer.withUnsafeBytes { (p: UnsafePointer<UInt32>) in
                    Int(UInt32(bigEndian: p.pointee))
                }
                let totalSize = 4 + messageSize
                guard totalSize <= buffer.count else {
                    // Handle buffer too small for message situation ...
                    fatalError()
                }
    
                if bytesRead < totalSize {
                    break // Not enough data to read message.
                }
    
                // Buffer contains complete message now. Process it ...
                processMessage(message: buffer[4 ..< totalSize])
    
                // ... and remove it from the buffer:
                if totalSize < bytesRead {
                    // Move remaining data to the front:
                    buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
                        _ = memmove(p, p + totalSize, bytesRead - totalSize)
                    }
                }
                bytesRead -= totalSize
            }
        }
    }
    
        2
  •  1
  •   Simon Hessner    7 年前

    灵感来自Martin R( https://stackoverflow.com/a/48344040/3827381 -非常感谢!)我想出了这个解决方案:

    var buffer = Data(count: 4096)
    var offset = 0 // the index of the first byte that can be overridden
    var readState = 0
    var missingMsgBytes = 0
    var msg = ""
    
    func handleInput(_ istr: InputStream) {
        assert(buffer.count >= 5, "buffer must be large enough to contain length info (4 bytes) and at least one payload byte => min 5 bytes buffer required")
        assert(offset < buffer.count, "offset \(offset) is not smaller than \(buffer.count)")
    
        let toRead = buffer.count - offset
        let read = buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in istr.read(p + offset, maxLength: toRead) }
        guard read > 0 else {
            self.errorHandler(NetworkingError.InputError("Input Stream received \(read) bytes which is smaller than 0 => Network error"))
            return
        }
        offset += read
        var msgStart = 0
        var msgEnd = 0
    
        if readState == 0 {
            if offset < 4 {
                return
            }
            missingMsgBytes = buffer[0...3].withUnsafeBytes { (p: UnsafePointer<UInt32>) in Int(UInt32(bigEndian: p.pointee)) }
            msgStart = 4
            msgEnd = offset
            readState = 1
        } else {
            msgStart = 0
            msgEnd = offset
        }
    
        var fullMessageRead = false
    
        if readState == 1 {
            let payloadRead = msgEnd - msgStart
            if payloadRead <= missingMsgBytes {
                assert(msgEnd > msgStart, "msgEnd (\(msgEnd) <= msgStart \(msgStart). This should not happen")
                if msgEnd > msgStart {
                    msg += String(data: buffer[msgStart..<msgEnd], encoding: .utf8)!
                    missingMsgBytes -= payloadRead
                    offset = 0
                }
                fullMessageRead = (missingMsgBytes == 0)
            } else { // read more than was missing
                msg += String(data: buffer[msgStart..<msgStart+missingMsgBytes], encoding: .utf8)!
                fullMessageRead = true
                buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
                    _ = memmove(p, p + missingMsgBytes, read - missingMsgBytes) // dst, src, number
                }
                offset = read-missingMsgBytes
            }
        }
    
        if fullMessageRead {
            handleMessage(msg)
            readState = 0
             msg = ""
            missingMsgBytes = 0
        }
    }
    

    此解决方案能够读取任意大小的消息。缓冲区大小仅决定一次可以读取的量=>缓冲区越大,应用程序运行速度越快。

    我测试了大约一个小时的代码,它没有崩溃。旧代码在1-2分钟后崩溃。现在似乎终于起作用了。

    但是,当我想提高编程知识时,我想问一下,我的代码中是否有一些不必要的复杂内容,或者是否有人看到了一个可能仍然会导致应用程序崩溃或读取错误数据的错误?