代码之家  ›  专栏  ›  技术社区  ›  tzaman

对于Java中的协议缓冲区I/O函数,是否存在C++等价物?

  •  58
  • tzaman  · 技术社区  · 15 年前

    我尝试从C/C++和Java文件中读取/写入多个协议缓冲区消息。谷歌建议在消息前写长度前缀,但默认情况下没有办法(我可以看到)。

    但是,版本2.1.0中的Java API收到了一组“定界”的I/O函数,它们显然完成了这项工作:

    parseDelimitedFrom
    mergeDelimitedFrom
    writeDelimitedTo
    

    有C++等价物吗?如果不是,JavaAPI附带的大小前缀的线格式是什么,所以我可以在C++中解析这些消息吗?


    更新:

    这些现在存在于 google/protobuf/util/delimited_message_util.h 从3.3.0版开始。

    10 回复  |  直到 6 年前
        1
  •  68
  •   Kenton Varda    11 年前

    我来参加聚会有点晚了,但是下面的实现包括其他答案中缺少的一些优化,并且在64MB输入之后不会失败(尽管它仍然强制 the 64MB limit 在每个单独的消息上,而不是在整个流上)。

    (我是C++和JavaOrthBuf库的作者,但我不再为谷歌工作了。很抱歉,这段代码从未进入官方库。这就是它如果有的话的样子。)

    bool writeDelimitedTo(
        const google::protobuf::MessageLite& message,
        google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
      // We create a new coded stream for each message.  Don't worry, this is fast.
      google::protobuf::io::CodedOutputStream output(rawOutput);
    
      // Write the size.
      const int size = message.ByteSize();
      output.WriteVarint32(size);
    
      uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
      if (buffer != NULL) {
        // Optimization:  The message fits in one buffer, so use the faster
        // direct-to-array serialization path.
        message.SerializeWithCachedSizesToArray(buffer);
      } else {
        // Slightly-slower path when the message is multiple buffers.
        message.SerializeWithCachedSizes(&output);
        if (output.HadError()) return false;
      }
    
      return true;
    }
    
    bool readDelimitedFrom(
        google::protobuf::io::ZeroCopyInputStream* rawInput,
        google::protobuf::MessageLite* message) {
      // We create a new coded stream for each message.  Don't worry, this is fast,
      // and it makes sure the 64MB total size limit is imposed per-message rather
      // than on the whole stream.  (See the CodedInputStream interface for more
      // info on this limit.)
      google::protobuf::io::CodedInputStream input(rawInput);
    
      // Read the size.
      uint32_t size;
      if (!input.ReadVarint32(&size)) return false;
    
      // Tell the stream not to read beyond that size.
      google::protobuf::io::CodedInputStream::Limit limit =
          input.PushLimit(size);
    
      // Parse the message.
      if (!message->MergeFromCodedStream(&input)) return false;
      if (!input.ConsumedEntireMessage()) return false;
    
      // Release the limit.
      input.PopLimit(limit);
    
      return true;
    }
    
        2
  •  17
  •   tzaman    15 年前

    好的,所以我还没有找到实现我所需要的顶级C++函数,但是通过Java API引用的一些拼写发现如下 MessageLite 接口:

    void writeDelimitedTo(OutputStream output)
    /*  Like writeTo(OutputStream), but writes the size of 
        the message as a varint before writing the data.   */
    

    因此,Java大小前缀是(协议缓冲区)VARTET!

    通过这些信息,我通过C++ API挖掘并找到了 CodedStream 标题,其中包括:

    bool CodedInputStream::ReadVarint32(uint32 * value)
    void CodedOutputStream::WriteVarint32(uint32 value)
    

    使用这些,我应该能够滚动自己的C++函数来完成这项工作。

    他们确实应该把它添加到主消息API中,考虑到Java有它的功能缺失,Marc Gravell优秀的原Buffnet NET C端口也是如此(通过SerialZeIn RealthPiFixE和DeXialIZE.LangthPosiFiX)。

        3
  •  12
  •   Yukiko    15 年前

    我使用codedoutputstream/arrayoutputstream编写消息(使用大小),并使用codedinputstream/arrayinputstream读取消息(使用大小)。

    例如,以下伪代码将消息大小写在消息后面:

    const unsigned bufLength = 256;
    unsigned char buffer[bufLength];
    Message protoMessage;
    
    google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
    google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);
    
    codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
    protoMessage.SerializeToCodedStream(&codedOutput);
    

    在编写时,还应该检查缓冲区是否足够大,以适合消息(包括大小)。在阅读时,您应该检查缓冲区是否包含一个完整的消息(包括大小)。

    如果他们将方便的方法添加到类似于Java API提供的C++ API中,那肯定会很方便。

        4
  •  7
  •   fireboot    9 年前

    我在C++和Python中都遇到了同样的问题。

    对于C++版本,我使用了在这个线程上发布的代码Kton瓦尔达和从他发送到ToBuFF团队的拉请求的代码(因为这里发布的版本不处理EOF,而他发送给Github的版本)。

    #include <google/protobuf/message_lite.h>
    #include <google/protobuf/io/zero_copy_stream.h>
    #include <google/protobuf/io/coded_stream.h>
    
    
    bool writeDelimitedTo(const google::protobuf::MessageLite& message,
        google::protobuf::io::ZeroCopyOutputStream* rawOutput)
    {
        // We create a new coded stream for each message.  Don't worry, this is fast.
        google::protobuf::io::CodedOutputStream output(rawOutput);
    
        // Write the size.
        const int size = message.ByteSize();
        output.WriteVarint32(size);
    
        uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
        if (buffer != NULL)
        {
            // Optimization:  The message fits in one buffer, so use the faster
            // direct-to-array serialization path.
            message.SerializeWithCachedSizesToArray(buffer);
        }
    
        else
        {
            // Slightly-slower path when the message is multiple buffers.
            message.SerializeWithCachedSizes(&output);
            if (output.HadError())
                return false;
        }
    
        return true;
    }
    
    bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
    {
        // We create a new coded stream for each message.  Don't worry, this is fast,
        // and it makes sure the 64MB total size limit is imposed per-message rather
        // than on the whole stream.  (See the CodedInputStream interface for more
        // info on this limit.)
        google::protobuf::io::CodedInputStream input(rawInput);
        const int start = input.CurrentPosition();
        if (clean_eof)
            *clean_eof = false;
    
    
        // Read the size.
        uint32_t size;
        if (!input.ReadVarint32(&size))
        {
            if (clean_eof)
                *clean_eof = input.CurrentPosition() == start;
            return false;
        }
        // Tell the stream not to read beyond that size.
        google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);
    
        // Parse the message.
        if (!message->MergeFromCodedStream(&input)) return false;
        if (!input.ConsumedEntireMessage()) return false;
    
        // Release the limit.
        input.PopLimit(limit);
    
        return true;
    }
    

    下面是我的python2实现:

    from google.protobuf.internal import encoder
    from google.protobuf.internal import decoder
    
    #I had to implement this because the tools in google.protobuf.internal.decoder
    #read from a buffer, not from a file-like objcet
    def readRawVarint32(stream):
        mask = 0x80 # (1 << 7)
        raw_varint32 = []
        while 1:
            b = stream.read(1)
            #eof
            if b == "":
                break
            raw_varint32.append(b)
            if not (ord(b) & mask):
                #we found a byte starting with a 0, which means it's the last byte of this varint
                break
        return raw_varint32
    
    def writeDelimitedTo(message, stream):
        message_str = message.SerializeToString()
        delimiter = encoder._VarintBytes(len(message_str))
        stream.write(delimiter + message_str)
    
    def readDelimitedFrom(MessageType, stream):
        raw_varint32 = readRawVarint32(stream)
        message = None
    
        if raw_varint32:
            size, _ = decoder._DecodeVarint32(raw_varint32, 0)
    
            data = stream.read(size)
            if len(data) < size:
                raise Exception("Unexpected end of file")
    
            message = MessageType()
            message.ParseFromString(data)
    
        return message
    
    #In place version that takes an already built protobuf object
    #In my tests, this is around 20% faster than the other version 
    #of readDelimitedFrom()
    def readDelimitedFrom_inplace(message, stream):
        raw_varint32 = readRawVarint32(stream)
    
        if raw_varint32:
            size, _ = decoder._DecodeVarint32(raw_varint32, 0)
    
            data = stream.read(size)
            if len(data) < size:
                raise Exception("Unexpected end of file")
    
            message.ParseFromString(data)
    
            return message
        else:
            return None
    

    它可能不是最漂亮的代码,我相信它可以被重构一点,但至少这可以向您展示一种实现它的方法。

    现在的大问题是 缓慢的 .

    即使在使用Python原型BuffF的C++实现时,它也比纯C++慢一个数量级。我有一个基准测试,我从一个文件中读取10M条大约30字节的protobuf消息。在C++中使用0.9s,在Python中使用35s。

    使其更快的一种方法是重新实现varint解码器,使其从文件中读取并一次性解码,而不是像当前的代码那样从文件中读取然后解码。(分析显示变量编码器/解码器花费了大量时间)。但不用说,仅仅是不足以缩小Python版本和C++版本之间的差距。

    任何加快速度的想法都是受欢迎的:)

        5
  •  6
  •   jaybny    12 年前

    干得好:

    #include <google/protobuf/io/zero_copy_stream_impl.h>
    #include <google/protobuf/io/coded_stream.h>
    
    using namespace google::protobuf::io;
    
    class FASWriter 
    {
        std::ofstream mFs;
        OstreamOutputStream *_OstreamOutputStream;
        CodedOutputStream *_CodedOutputStream;
    public:
        FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
        {
            assert(mFs.good());
    
            _OstreamOutputStream = new OstreamOutputStream(&mFs);
            _CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
        }
    
        inline void operator()(const ::google::protobuf::Message &msg)
        {
            _CodedOutputStream->WriteVarint32(msg.ByteSize());
    
            if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
                std::cout << "SerializeToCodedStream error " << std::endl;
        }
    
        ~FASWriter()
        {
            delete _CodedOutputStream;
            delete _OstreamOutputStream;
            mFs.close();
        }
    };
    
    class FASReader
    {
        std::ifstream mFs;
    
        IstreamInputStream *_IstreamInputStream;
        CodedInputStream *_CodedInputStream;
    public:
        FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
        {
            assert(mFs.good());
    
            _IstreamInputStream = new IstreamInputStream(&mFs);
            _CodedInputStream = new CodedInputStream(_IstreamInputStream);      
        }
    
        template<class T>
        bool ReadNext()
        {
            T msg;
            unsigned __int32 size;
    
            bool ret;
            if ( ret = _CodedInputStream->ReadVarint32(&size) )
            {   
                CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
                if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
                {
                    _CodedInputStream->PopLimit(msgLimit);      
                    std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
                }
            }
    
            return ret;
        }
    
        ~FASReader()
        {
            delete _CodedInputStream;
            delete _IstreamInputStream;
            mFs.close();
        }
    };
    
        6
  •  6
  •   Fulkerson    12 年前

    iTestPixStand对EOFS非常脆弱,与STD:ISTRAMAM一起使用时容易发生的其他错误。在此之后,protobuf流将永久损坏,并且任何已使用的缓冲区数据都将被销毁。对从Protobuf中的传统流中读取数据有适当的支持。

    实施 google::protobuf::io::CopyingInputStream 和它一起使用 CopyingInputStreamAdapter . 对输出变量执行相同的操作。

    在实践中,解析调用以 google::protobuf::io::CopyingInputStream::Read(void* buffer, int size) 其中给出了一个缓冲区。唯一剩下要做的就是读它。

    下面是一个用于ASIO同步流的示例( SyncReadStream / SyncWriteStream ):

    #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
    
    using namespace google::protobuf::io;
    
    
    template <typename SyncReadStream>
    class AsioInputStream : public CopyingInputStream {
        public:
            AsioInputStream(SyncReadStream& sock);
            int Read(void* buffer, int size);
        private:
            SyncReadStream& m_Socket;
    };
    
    
    template <typename SyncReadStream>
    AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
        m_Socket(sock) {}
    
    
    template <typename SyncReadStream>
    int
    AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
    {
        std::size_t bytes_read;
        boost::system::error_code ec;
        bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);
    
        if(!ec) {
            return bytes_read;
        } else if (ec == boost::asio::error::eof) {
            return 0;
        } else {
            return -1;
        }
    }
    
    
    template <typename SyncWriteStream>
    class AsioOutputStream : public CopyingOutputStream {
        public:
            AsioOutputStream(SyncWriteStream& sock);
            bool Write(const void* buffer, int size);
        private:
            SyncWriteStream& m_Socket;
    };
    
    
    template <typename SyncWriteStream>
    AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
        m_Socket(sock) {}
    
    
    template <typename SyncWriteStream>
    bool
    AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
    {   
        boost::system::error_code ec;
        m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
        return !ec;
    }
    

    用途:

    AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
    CopyingInputStreamAdaptor cis_adp(&ais);
    CodedInputStream cis(&cis_adp);
    
    Message protoMessage;
    uint32_t msg_size;
    
    /* Read message size */
    if(!cis.ReadVarint32(&msg_size)) {
        // Handle error
     }
    
    /* Make sure not to read beyond limit of message */
    CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
    if(!msg.ParseFromCodedStream(&cis)) {
        // Handle error
    }
    
    /* Remove limit */
    cis.PopLimit(msg_limit);
    
        7
  •  3
  •   Kim Laurio    14 年前

    也在寻找解决方案。这里是我们解决方案的核心,假设一些Java代码编写了许多MyScript消息 writeDelimitedTo 文件。打开文件并循环,执行以下操作:

    if(someCodedInputStream->ReadVarint32(&bytes)) {
      CodedInputStream::Limit msgLimit = someCodedInputStream->PushLimit(bytes);
      if(myRecord->ParseFromCodedStream(someCodedInputStream)) {
        //do your stuff with the parsed MyRecord instance
      } else {
        //handle parse error
      }
      someCodedInputStream->PopLimit(msgLimit);
    } else {
      //maybe end of file
    }
    

    希望有帮助。

        8
  •  0
  •   gp-coder    11 年前

    使用一个目标C版本的协议缓冲区,我遇到了这个确切的问题。在从iOS客户端发送到使用基于PARSEDLIMIDEDO的Java服务器时,该服务器希望将长度作为第一字节,首先需要调用WraveAdWEnter到CODIDOutPutsFipe。在这里张贴希望帮助其他人遇到这个问题。在解决这个问题的过程中,有人会认为Google Proto Bufs会带有一个简单的标志,它可以为您做到这一点…

        Request* request = [rBuild build];
    
        [self sendMessage:request];
    } 
    
    
    - (void) sendMessage:(Request *) request {
    
        //** get length
        NSData* n = [request data];
        uint8_t len = [n length];
    
        PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
        //** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
        [os writeRawByte:len];
        [request writeToCodedOutputStream:os];
        [os flush];
    }
    
        9
  •  0
  •   ciphersimian    9 年前

    因为我不被允许写这篇文章作为对肯顿·瓦尔达上述答案的评论;我相信他发布的代码(以及其他已经提供的答案)中有一个错误。以下代码:

    ...
    google::protobuf::io::CodedInputStream input(rawInput);
    
    // Read the size.
    uint32_t size;
    if (!input.ReadVarint32(&size)) return false;
    
    // Tell the stream not to read beyond that size.
    google::protobuf::io::CodedInputStream::Limit limit =
        input.PushLimit(size);
    ...
    

    设置错误的限制,因为它不考虑已从输入中读取的变量32的大小。这可能会导致数据丢失/损坏,因为从可能是下一条消息的一部分的流中读取额外的字节。正确处理此问题的通常方法是删除用于读取大小的codedinputstream,并创建一个新的用于读取有效负载的codedinputstream:

    ...
    uint32_t size;
    {
      google::protobuf::io::CodedInputStream input(rawInput);
    
      // Read the size.
      if (!input.ReadVarint32(&size)) return false;
    }
    
    google::protobuf::io::CodedInputStream input(rawInput);
    
    // Tell the stream not to read beyond that size.
    google::protobuf::io::CodedInputStream::Limit limit =
        input.PushLimit(size);
    ...
    
        10
  •  -6
  •   Jan    15 年前

    可以使用getline从流中读取字符串,使用指定的分隔符:

    istream& getline ( istream& is, string& str, char delim );
    

    (在标题中定义)