mirror of
				https://github.com/yggdrasil-network/yggdrasil-go.git
				synced 2025-11-04 03:05:07 +03:00 
			
		
		
		
	have the stream code use bufio instead of copying manually to an input buffer, slightly reduces total uses of memmove
This commit is contained in:
		
							parent
							
								
									75b931f37e
								
							
						
					
					
						commit
						0ba8c6a34f
					
				
					 2 changed files with 30 additions and 49 deletions
				
			
		| 
						 | 
					@ -172,7 +172,7 @@ func BoxOpen(shared *BoxSharedKey,
 | 
				
			||||||
	boxed []byte,
 | 
						boxed []byte,
 | 
				
			||||||
	nonce *BoxNonce) ([]byte, bool) {
 | 
						nonce *BoxNonce) ([]byte, bool) {
 | 
				
			||||||
	out := util.GetBytes()
 | 
						out := util.GetBytes()
 | 
				
			||||||
	return append(out, boxed...), true
 | 
						return append(out, boxed...), true //FIXME disabled crypto for benchmarking
 | 
				
			||||||
	s := (*[BoxSharedKeyLen]byte)(shared)
 | 
						s := (*[BoxSharedKeyLen]byte)(shared)
 | 
				
			||||||
	n := (*[BoxNonceLen]byte)(nonce)
 | 
						n := (*[BoxNonceLen]byte)(nonce)
 | 
				
			||||||
	unboxed, success := box.OpenAfterPrecomputation(out, boxed, n, s)
 | 
						unboxed, success := box.OpenAfterPrecomputation(out, boxed, n, s)
 | 
				
			||||||
| 
						 | 
					@ -185,7 +185,7 @@ func BoxSeal(shared *BoxSharedKey, unboxed []byte, nonce *BoxNonce) ([]byte, *Bo
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	nonce.Increment()
 | 
						nonce.Increment()
 | 
				
			||||||
	out := util.GetBytes()
 | 
						out := util.GetBytes()
 | 
				
			||||||
	return append(out, unboxed...), nonce
 | 
						return append(out, unboxed...), nonce // FIXME disabled crypto for benchmarking
 | 
				
			||||||
	s := (*[BoxSharedKeyLen]byte)(shared)
 | 
						s := (*[BoxSharedKeyLen]byte)(shared)
 | 
				
			||||||
	n := (*[BoxNonceLen]byte)(nonce)
 | 
						n := (*[BoxNonceLen]byte)(nonce)
 | 
				
			||||||
	boxed := box.SealAfterPrecomputation(out, unboxed, n, s)
 | 
						boxed := box.SealAfterPrecomputation(out, unboxed, n, s)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,6 +1,7 @@
 | 
				
			||||||
package yggdrasil
 | 
					package yggdrasil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"bufio"
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
| 
						 | 
					@ -14,9 +15,7 @@ var _ = linkInterfaceMsgIO(&stream{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type stream struct {
 | 
					type stream struct {
 | 
				
			||||||
	rwc          io.ReadWriteCloser
 | 
						rwc          io.ReadWriteCloser
 | 
				
			||||||
	inputBuffer []byte                  // Incoming packet stream
 | 
						inputBuffer  *bufio.Reader
 | 
				
			||||||
	frag        [2 * streamMsgSize]byte // Temporary data read off the underlying rwc, on its way to the inputBuffer
 | 
					 | 
				
			||||||
	//outputBuffer [2 * streamMsgSize]byte // Temporary data about to be written to the rwc
 | 
					 | 
				
			||||||
	outputBuffer net.Buffers
 | 
						outputBuffer net.Buffers
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -32,6 +31,7 @@ func (s *stream) init(rwc io.ReadWriteCloser) {
 | 
				
			||||||
	// TODO have this also do the metadata handshake and create the peer struct
 | 
						// TODO have this also do the metadata handshake and create the peer struct
 | 
				
			||||||
	s.rwc = rwc
 | 
						s.rwc = rwc
 | 
				
			||||||
	// TODO call something to do the metadata exchange
 | 
						// TODO call something to do the metadata exchange
 | 
				
			||||||
 | 
						s.inputBuffer = bufio.NewReaderSize(s.rwc, 2*streamMsgSize)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// writeMsg writes a message with stream padding, and is *not* thread safe.
 | 
					// writeMsg writes a message with stream padding, and is *not* thread safe.
 | 
				
			||||||
| 
						 | 
					@ -62,26 +62,11 @@ func (s *stream) writeMsg(bs []byte) (int, error) {
 | 
				
			||||||
// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe.
 | 
					// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe.
 | 
				
			||||||
func (s *stream) readMsg() ([]byte, error) {
 | 
					func (s *stream) readMsg() ([]byte, error) {
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		buf := s.inputBuffer
 | 
							bs, err := s.readMsgFromBuffer()
 | 
				
			||||||
		msg, ok, err := stream_chopMsg(&buf)
 | 
							if err != nil {
 | 
				
			||||||
		switch {
 | 
					 | 
				
			||||||
		case err != nil:
 | 
					 | 
				
			||||||
			// Something in the stream format is corrupt
 | 
					 | 
				
			||||||
			return nil, fmt.Errorf("message error: %v", err)
 | 
								return nil, fmt.Errorf("message error: %v", err)
 | 
				
			||||||
		case ok:
 | 
					 | 
				
			||||||
			// Copy the packet into bs, shift the buffer, and return
 | 
					 | 
				
			||||||
			msg = append(util.GetBytes(), msg...)
 | 
					 | 
				
			||||||
			s.inputBuffer = append(s.inputBuffer[:0], buf...)
 | 
					 | 
				
			||||||
			return msg, nil
 | 
					 | 
				
			||||||
		default:
 | 
					 | 
				
			||||||
			// Wait for the underlying reader to return enough info for us to proceed
 | 
					 | 
				
			||||||
			n, err := s.rwc.Read(s.frag[:])
 | 
					 | 
				
			||||||
			if n > 0 {
 | 
					 | 
				
			||||||
				s.inputBuffer = append(s.inputBuffer, s.frag[:n]...)
 | 
					 | 
				
			||||||
			} else if err != nil {
 | 
					 | 
				
			||||||
				return nil, err
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							return bs, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -113,34 +98,30 @@ func (s *stream) _recvMetaBytes() ([]byte, error) {
 | 
				
			||||||
	return metaBytes, nil
 | 
						return metaBytes, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// This takes a pointer to a slice as an argument. It checks if there's a
 | 
					// Reads bytes from the underlying rwc and returns 1 full message
 | 
				
			||||||
// complete message and, if so, slices out those parts and returns the message,
 | 
					func (s *stream) readMsgFromBuffer() ([]byte, error) {
 | 
				
			||||||
// true, and nil. If there's no error, but also no complete message, it returns
 | 
						pad := streamMsg // Copy
 | 
				
			||||||
// nil, false, and nil. If there's an error, it returns nil, false, and the
 | 
						_, err := io.ReadFull(s.inputBuffer, pad[:])
 | 
				
			||||||
// error, which the reader then handles (currently, by returning from the
 | 
						if err != nil {
 | 
				
			||||||
// reader, which causes the connection to close).
 | 
							return nil, err
 | 
				
			||||||
func stream_chopMsg(bs *[]byte) ([]byte, bool, error) {
 | 
						} else if pad != streamMsg {
 | 
				
			||||||
	// Returns msg, ok, err
 | 
							return nil, errors.New("bad message")
 | 
				
			||||||
	if len(*bs) < len(streamMsg) {
 | 
					 | 
				
			||||||
		return nil, false, nil
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for idx := range streamMsg {
 | 
						lenSlice := make([]byte, 0, 10)
 | 
				
			||||||
		if (*bs)[idx] != streamMsg[idx] {
 | 
						// FIXME this nextByte stuff depends on wire.go format, kind of ugly to have it here
 | 
				
			||||||
			return nil, false, errors.New("bad message")
 | 
						nextByte := byte(0xff)
 | 
				
			||||||
 | 
						for nextByte > 127 {
 | 
				
			||||||
 | 
							nextByte, err = s.inputBuffer.ReadByte()
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							lenSlice = append(lenSlice, nextByte)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	msgLen, msgLenLen := wire_decode_uint64((*bs)[len(streamMsg):])
 | 
						msgLen, _ := wire_decode_uint64(lenSlice)
 | 
				
			||||||
	if msgLen > streamMsgSize {
 | 
						if msgLen > streamMsgSize {
 | 
				
			||||||
		return nil, false, errors.New("oversized message")
 | 
							return nil, errors.New("oversized message")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	msgBegin := len(streamMsg) + msgLenLen
 | 
						msg := util.ResizeBytes(util.GetBytes(), int(msgLen))
 | 
				
			||||||
	msgEnd := msgBegin + int(msgLen)
 | 
						_, err = io.ReadFull(s.inputBuffer, msg)
 | 
				
			||||||
	if msgLenLen == 0 || len(*bs) < msgEnd {
 | 
						return msg, err
 | 
				
			||||||
		// We don't have the full message
 | 
					 | 
				
			||||||
		// Need to buffer this and wait for the rest to come in
 | 
					 | 
				
			||||||
		return nil, false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	msg := (*bs)[msgBegin:msgEnd]
 | 
					 | 
				
			||||||
	(*bs) = (*bs)[msgEnd:]
 | 
					 | 
				
			||||||
	return msg, true, nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue