diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 3048e84a9..f3ccf222e 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -854,6 +854,37 @@ func (c *Connection) handleIncoming(ctx context.Context, conn net.Conn, req conn // caller *must* hold reconnectMu. func (c *Connection) reconnected() { c.updateState(StateConnectionError) + + // Drain the outQueue, so any blocked messages can be sent. + // We keep the queue, but start draining it, if it gets full. + stopDraining := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + defer func() { + close(stopDraining) + wg.Wait() + }() + go func() { + defer wg.Done() + for { + select { + case <-stopDraining: + return + default: + if cap(c.outQueue)-len(c.outQueue) > 100 { + // Queue is not full, wait a bit. + time.Sleep(1 * time.Millisecond) + continue + } + select { + case v := <-c.outQueue: + PutByteBuffer(v) + case <-stopDraining: + return + } + } + } + }() // Close all active requests. if debugReqs { fmt.Println(c.String(), "Reconnected. Clearing outgoing.")