io.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package io
  15. import (
  16. "io"
  17. "sync"
  18. "github.com/fatedier/frp/utils/crypto"
  19. "github.com/fatedier/frp/utils/pool"
  20. )
  21. // Join two io.ReadWriteCloser and do some operations.
  22. func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64) {
  23. var wait sync.WaitGroup
  24. pipe := func(to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {
  25. defer to.Close()
  26. defer from.Close()
  27. defer wait.Done()
  28. buf := pool.GetBuf(16 * 1024)
  29. defer pool.PutBuf(buf)
  30. *count, _ = io.CopyBuffer(to, from, buf)
  31. }
  32. wait.Add(2)
  33. go pipe(c1, c2, &inCount)
  34. go pipe(c2, c1, &outCount)
  35. wait.Wait()
  36. return
  37. }
  38. func WithEncryption(rwc io.ReadWriteCloser, key []byte) (io.ReadWriteCloser, error) {
  39. w, err := crypto.NewWriter(rwc, key)
  40. if err != nil {
  41. return nil, err
  42. }
  43. return WrapReadWriteCloser(crypto.NewReader(rwc, key), w, func() error {
  44. return rwc.Close()
  45. }), nil
  46. }
  47. func WithCompression(rwc io.ReadWriteCloser) io.ReadWriteCloser {
  48. sr := pool.GetSnappyReader(rwc)
  49. sw := pool.GetSnappyWriter(rwc)
  50. return WrapReadWriteCloser(sr, sw, func() error {
  51. err := rwc.Close()
  52. pool.PutSnappyReader(sr)
  53. pool.PutSnappyWriter(sw)
  54. return err
  55. })
  56. }
  57. type ReadWriteCloser struct {
  58. r io.Reader
  59. w io.Writer
  60. closeFn func() error
  61. closed bool
  62. mu sync.Mutex
  63. }
  64. // closeFn will be called only once
  65. func WrapReadWriteCloser(r io.Reader, w io.Writer, closeFn func() error) io.ReadWriteCloser {
  66. return &ReadWriteCloser{
  67. r: r,
  68. w: w,
  69. closeFn: closeFn,
  70. closed: false,
  71. }
  72. }
  73. func (rwc *ReadWriteCloser) Read(p []byte) (n int, err error) {
  74. return rwc.r.Read(p)
  75. }
  76. func (rwc *ReadWriteCloser) Write(p []byte) (n int, err error) {
  77. return rwc.w.Write(p)
  78. }
  79. func (rwc *ReadWriteCloser) Close() (errRet error) {
  80. rwc.mu.Lock()
  81. if rwc.closed {
  82. rwc.mu.Unlock()
  83. return
  84. }
  85. rwc.closed = true
  86. rwc.mu.Unlock()
  87. var err error
  88. if rc, ok := rwc.r.(io.Closer); ok {
  89. err = rc.Close()
  90. if err != nil {
  91. errRet = err
  92. }
  93. }
  94. if wc, ok := rwc.w.(io.Closer); ok {
  95. err = wc.Close()
  96. if err != nil {
  97. errRet = err
  98. }
  99. }
  100. if rwc.closeFn != nil {
  101. err = rwc.closeFn()
  102. if err != nil {
  103. errRet = err
  104. }
  105. }
  106. return
  107. }