1
0

conn_broadcast_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. // Copyright 2017 The Gorilla WebSocket Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // +build go1.7
  5. package websocket
  6. import (
  7. "io"
  8. "io/ioutil"
  9. "sync/atomic"
  10. "testing"
  11. )
  12. // broadcastBench allows to run broadcast benchmarks.
  13. // In every broadcast benchmark we create many connections, then send the same
  14. // message into every connection and wait for all writes complete. This emulates
  15. // an application where many connections listen to the same data - i.e. PUB/SUB
  16. // scenarios with many subscribers in one channel.
  17. type broadcastBench struct {
  18. w io.Writer
  19. message *broadcastMessage
  20. closeCh chan struct{}
  21. doneCh chan struct{}
  22. count int32
  23. conns []*broadcastConn
  24. compression bool
  25. usePrepared bool
  26. }
  27. type broadcastMessage struct {
  28. payload []byte
  29. prepared *PreparedMessage
  30. }
  31. type broadcastConn struct {
  32. conn *Conn
  33. msgCh chan *broadcastMessage
  34. }
  35. func newBroadcastConn(c *Conn) *broadcastConn {
  36. return &broadcastConn{
  37. conn: c,
  38. msgCh: make(chan *broadcastMessage, 1),
  39. }
  40. }
  41. func newBroadcastBench(usePrepared, compression bool) *broadcastBench {
  42. bench := &broadcastBench{
  43. w: ioutil.Discard,
  44. doneCh: make(chan struct{}),
  45. closeCh: make(chan struct{}),
  46. usePrepared: usePrepared,
  47. compression: compression,
  48. }
  49. msg := &broadcastMessage{
  50. payload: textMessages(1)[0],
  51. }
  52. if usePrepared {
  53. pm, _ := NewPreparedMessage(TextMessage, msg.payload)
  54. msg.prepared = pm
  55. }
  56. bench.message = msg
  57. bench.makeConns(10000)
  58. return bench
  59. }
  60. func (b *broadcastBench) makeConns(numConns int) {
  61. conns := make([]*broadcastConn, numConns)
  62. for i := 0; i < numConns; i++ {
  63. c := newConn(fakeNetConn{Reader: nil, Writer: b.w}, true, 1024, 1024)
  64. if b.compression {
  65. c.enableWriteCompression = true
  66. c.newCompressionWriter = compressNoContextTakeover
  67. }
  68. conns[i] = newBroadcastConn(c)
  69. go func(c *broadcastConn) {
  70. for {
  71. select {
  72. case msg := <-c.msgCh:
  73. if b.usePrepared {
  74. c.conn.WritePreparedMessage(msg.prepared)
  75. } else {
  76. c.conn.WriteMessage(TextMessage, msg.payload)
  77. }
  78. val := atomic.AddInt32(&b.count, 1)
  79. if val%int32(numConns) == 0 {
  80. b.doneCh <- struct{}{}
  81. }
  82. case <-b.closeCh:
  83. return
  84. }
  85. }
  86. }(conns[i])
  87. }
  88. b.conns = conns
  89. }
  90. func (b *broadcastBench) close() {
  91. close(b.closeCh)
  92. }
  93. func (b *broadcastBench) runOnce() {
  94. for _, c := range b.conns {
  95. c.msgCh <- b.message
  96. }
  97. <-b.doneCh
  98. }
  99. func BenchmarkBroadcast(b *testing.B) {
  100. benchmarks := []struct {
  101. name string
  102. usePrepared bool
  103. compression bool
  104. }{
  105. {"NoCompression", false, false},
  106. {"WithCompression", false, true},
  107. {"NoCompressionPrepared", true, false},
  108. {"WithCompressionPrepared", true, true},
  109. }
  110. for _, bm := range benchmarks {
  111. b.Run(bm.name, func(b *testing.B) {
  112. bench := newBroadcastBench(bm.usePrepared, bm.compression)
  113. defer bench.close()
  114. b.ResetTimer()
  115. for i := 0; i < b.N; i++ {
  116. bench.runOnce()
  117. }
  118. b.ReportAllocs()
  119. })
  120. }
  121. }