diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 40f84ba46..0cf2e02cc 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -1104,7 +1104,6 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont defer ping.Stop() queue := make([][]byte, 0, maxMergeMessages) - merged := make([]byte, 0, writeBufferSize) var queueSize int var buf bytes.Buffer var wsw wsWriter @@ -1132,7 +1131,7 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont } return false } - if buf.Cap() > writeBufferSize*4 { + if buf.Cap() > writeBufferSize*8 { // Reset buffer if it gets too big, so we don't keep it around. buf = bytes.Buffer{} } @@ -1140,6 +1139,8 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont return true } + // Merge buffer to keep between calls + merged := make([]byte, 0, writeBufferSize) for { var toSend []byte select { @@ -1238,17 +1239,17 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont fmt.Println("Merging", len(queue), "messages") } - toSend = merged[:0] + merged = merged[:0] m := message{Op: OpMerged, Seq: uint32(len(queue))} var err error - toSend, err = m.MarshalMsg(toSend) + merged, err = m.MarshalMsg(merged) if err != nil { gridLogIf(ctx, fmt.Errorf("msg.MarshalMsg: %w", err)) return } // Append as byte slices. for _, q := range queue { - toSend = msgp.AppendBytes(toSend, q) + merged = msgp.AppendBytes(merged, q) PutByteBuffer(q) } queue = queue[:0] @@ -1256,14 +1257,17 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont // Combine writes. // Consider avoiding buffer copy. - err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) + err = wsw.writeMessage(&buf, c.side, ws.OpBinary, merged) if err != nil { if !xnet.IsNetworkOrHostDown(err, true) { gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err)) } return } - + if cap(merged) > writeBufferSize*8 { + // If we had to send an excessively large package, reset size. + merged = make([]byte, 0, writeBufferSize) + } if !writeBuffer() { return }