streaming.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. /**
  2. * Reed-Solomon Coding over 8-bit values.
  3. *
  4. * Copyright 2015, Klaus Post
  5. * Copyright 2015, Backblaze, Inc.
  6. */
  7. package reedsolomon
  8. import (
  9. "bytes"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "sync"
  14. )
  15. // StreamEncoder is an interface to encode Reed-Salomon parity sets for your data.
  16. // It provides a fully streaming interface, and processes data in blocks of up to 4MB.
  17. //
  18. // For small shard sizes, 10MB and below, it is recommended to use the in-memory interface,
  19. // since the streaming interface has a start up overhead.
  20. //
  21. // For all operations, no readers and writers should not assume any order/size of
  22. // individual reads/writes.
  23. //
  24. // For usage examples, see "stream-encoder.go" and "streamdecoder.go" in the examples
  25. // folder.
  26. type StreamEncoder interface {
  27. // Encodes parity shards for a set of data shards.
  28. //
  29. // Input is 'shards' containing readers for data shards followed by parity shards
  30. // io.Writer.
  31. //
  32. // The number of shards must match the number given to NewStream().
  33. //
  34. // Each reader must supply the same number of bytes.
  35. //
  36. // The parity shards will be written to the writer.
  37. // The number of bytes written will match the input size.
  38. //
  39. // If a data stream returns an error, a StreamReadError type error
  40. // will be returned. If a parity writer returns an error, a
  41. // StreamWriteError will be returned.
  42. Encode(data []io.Reader, parity []io.Writer) error
  43. // Verify returns true if the parity shards contain correct data.
  44. //
  45. // The number of shards must match the number total data+parity shards
  46. // given to NewStream().
  47. //
  48. // Each reader must supply the same number of bytes.
  49. // If a shard stream returns an error, a StreamReadError type error
  50. // will be returned.
  51. Verify(shards []io.Reader) (bool, error)
  52. // Reconstruct will recreate the missing shards if possible.
  53. //
  54. // Given a list of valid shards (to read) and invalid shards (to write)
  55. //
  56. // You indicate that a shard is missing by setting it to nil in the 'valid'
  57. // slice and at the same time setting a non-nil writer in "fill".
  58. // An index cannot contain both non-nil 'valid' and 'fill' entry.
  59. // If both are provided 'ErrReconstructMismatch' is returned.
  60. //
  61. // If there are too few shards to reconstruct the missing
  62. // ones, ErrTooFewShards will be returned.
  63. //
  64. // The reconstructed shard set is complete, but integrity is not verified.
  65. // Use the Verify function to check if data set is ok.
  66. Reconstruct(valid []io.Reader, fill []io.Writer) error
  67. // Split a an input stream into the number of shards given to the encoder.
  68. //
  69. // The data will be split into equally sized shards.
  70. // If the data size isn't dividable by the number of shards,
  71. // the last shard will contain extra zeros.
  72. //
  73. // You must supply the total size of your input.
  74. // 'ErrShortData' will be returned if it is unable to retrieve the
  75. // number of bytes indicated.
  76. Split(data io.Reader, dst []io.Writer, size int64) (err error)
  77. // Join the shards and write the data segment to dst.
  78. //
  79. // Only the data shards are considered.
  80. //
  81. // You must supply the exact output size you want.
  82. // If there are to few shards given, ErrTooFewShards will be returned.
  83. // If the total data size is less than outSize, ErrShortData will be returned.
  84. Join(dst io.Writer, shards []io.Reader, outSize int64) error
  85. }
  86. // StreamReadError is returned when a read error is encountered
  87. // that relates to a supplied stream.
  88. // This will allow you to find out which reader has failed.
  89. type StreamReadError struct {
  90. Err error // The error
  91. Stream int // The stream number on which the error occurred
  92. }
  93. // Error returns the error as a string
  94. func (s StreamReadError) Error() string {
  95. return fmt.Sprintf("error reading stream %d: %s", s.Stream, s.Err)
  96. }
  97. // String returns the error as a string
  98. func (s StreamReadError) String() string {
  99. return s.Error()
  100. }
  101. // StreamWriteError is returned when a write error is encountered
  102. // that relates to a supplied stream. This will allow you to
  103. // find out which reader has failed.
  104. type StreamWriteError struct {
  105. Err error // The error
  106. Stream int // The stream number on which the error occurred
  107. }
  108. // Error returns the error as a string
  109. func (s StreamWriteError) Error() string {
  110. return fmt.Sprintf("error writing stream %d: %s", s.Stream, s.Err)
  111. }
  112. // String returns the error as a string
  113. func (s StreamWriteError) String() string {
  114. return s.Error()
  115. }
  116. // rsStream contains a matrix for a specific
  117. // distribution of datashards and parity shards.
  118. // Construct if using NewStream()
  119. type rsStream struct {
  120. r *reedSolomon
  121. bs int // Block size
  122. // Shard reader
  123. readShards func(dst [][]byte, in []io.Reader) error
  124. // Shard writer
  125. writeShards func(out []io.Writer, in [][]byte) error
  126. creads bool
  127. cwrites bool
  128. }
  129. // NewStream creates a new encoder and initializes it to
  130. // the number of data shards and parity shards that
  131. // you want to use. You can reuse this encoder.
  132. // Note that the maximum number of data shards is 256.
  133. func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) {
  134. enc, err := New(dataShards, parityShards, o...)
  135. if err != nil {
  136. return nil, err
  137. }
  138. rs := enc.(*reedSolomon)
  139. r := rsStream{r: rs, bs: 4 << 20}
  140. r.readShards = readShards
  141. r.writeShards = writeShards
  142. return &r, err
  143. }
  144. // NewStreamC creates a new encoder and initializes it to
  145. // the number of data shards and parity shards given.
  146. //
  147. // This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes.
  148. func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) {
  149. enc, err := New(dataShards, parityShards, o...)
  150. if err != nil {
  151. return nil, err
  152. }
  153. rs := enc.(*reedSolomon)
  154. r := rsStream{r: rs, bs: 4 << 20}
  155. r.readShards = readShards
  156. r.writeShards = writeShards
  157. if conReads {
  158. r.readShards = cReadShards
  159. }
  160. if conWrites {
  161. r.writeShards = cWriteShards
  162. }
  163. return &r, err
  164. }
  165. func createSlice(n, length int) [][]byte {
  166. out := make([][]byte, n)
  167. for i := range out {
  168. out[i] = make([]byte, length)
  169. }
  170. return out
  171. }
  172. // Encodes parity shards for a set of data shards.
  173. //
  174. // Input is 'shards' containing readers for data shards followed by parity shards
  175. // io.Writer.
  176. //
  177. // The number of shards must match the number given to NewStream().
  178. //
  179. // Each reader must supply the same number of bytes.
  180. //
  181. // The parity shards will be written to the writer.
  182. // The number of bytes written will match the input size.
  183. //
  184. // If a data stream returns an error, a StreamReadError type error
  185. // will be returned. If a parity writer returns an error, a
  186. // StreamWriteError will be returned.
  187. func (r rsStream) Encode(data []io.Reader, parity []io.Writer) error {
  188. if len(data) != r.r.DataShards {
  189. return ErrTooFewShards
  190. }
  191. if len(parity) != r.r.ParityShards {
  192. return ErrTooFewShards
  193. }
  194. all := createSlice(r.r.Shards, r.bs)
  195. in := all[:r.r.DataShards]
  196. out := all[r.r.DataShards:]
  197. read := 0
  198. for {
  199. err := r.readShards(in, data)
  200. switch err {
  201. case nil:
  202. case io.EOF:
  203. if read == 0 {
  204. return ErrShardNoData
  205. }
  206. return nil
  207. default:
  208. return err
  209. }
  210. out = trimShards(out, shardSize(in))
  211. read += shardSize(in)
  212. err = r.r.Encode(all)
  213. if err != nil {
  214. return err
  215. }
  216. err = r.writeShards(parity, out)
  217. if err != nil {
  218. return err
  219. }
  220. }
  221. }
  222. // Trim the shards so they are all the same size
  223. func trimShards(in [][]byte, size int) [][]byte {
  224. for i := range in {
  225. if in[i] != nil {
  226. in[i] = in[i][0:size]
  227. }
  228. if len(in[i]) < size {
  229. in[i] = nil
  230. }
  231. }
  232. return in
  233. }
  234. func readShards(dst [][]byte, in []io.Reader) error {
  235. if len(in) != len(dst) {
  236. panic("internal error: in and dst size does not match")
  237. }
  238. size := -1
  239. for i := range in {
  240. if in[i] == nil {
  241. dst[i] = nil
  242. continue
  243. }
  244. n, err := io.ReadFull(in[i], dst[i])
  245. // The error is EOF only if no bytes were read.
  246. // If an EOF happens after reading some but not all the bytes,
  247. // ReadFull returns ErrUnexpectedEOF.
  248. switch err {
  249. case io.ErrUnexpectedEOF, io.EOF:
  250. if size < 0 {
  251. size = n
  252. } else if n != size {
  253. // Shard sizes must match.
  254. return ErrShardSize
  255. }
  256. dst[i] = dst[i][0:n]
  257. case nil:
  258. continue
  259. default:
  260. return StreamReadError{Err: err, Stream: i}
  261. }
  262. }
  263. if size == 0 {
  264. return io.EOF
  265. }
  266. return nil
  267. }
  268. func writeShards(out []io.Writer, in [][]byte) error {
  269. if len(out) != len(in) {
  270. panic("internal error: in and out size does not match")
  271. }
  272. for i := range in {
  273. if out[i] == nil {
  274. continue
  275. }
  276. n, err := out[i].Write(in[i])
  277. if err != nil {
  278. return StreamWriteError{Err: err, Stream: i}
  279. }
  280. //
  281. if n != len(in[i]) {
  282. return StreamWriteError{Err: io.ErrShortWrite, Stream: i}
  283. }
  284. }
  285. return nil
  286. }
  287. type readResult struct {
  288. n int
  289. size int
  290. err error
  291. }
  292. // cReadShards reads shards concurrently
  293. func cReadShards(dst [][]byte, in []io.Reader) error {
  294. if len(in) != len(dst) {
  295. panic("internal error: in and dst size does not match")
  296. }
  297. var wg sync.WaitGroup
  298. wg.Add(len(in))
  299. res := make(chan readResult, len(in))
  300. for i := range in {
  301. if in[i] == nil {
  302. dst[i] = nil
  303. wg.Done()
  304. continue
  305. }
  306. go func(i int) {
  307. defer wg.Done()
  308. n, err := io.ReadFull(in[i], dst[i])
  309. // The error is EOF only if no bytes were read.
  310. // If an EOF happens after reading some but not all the bytes,
  311. // ReadFull returns ErrUnexpectedEOF.
  312. res <- readResult{size: n, err: err, n: i}
  313. }(i)
  314. }
  315. wg.Wait()
  316. close(res)
  317. size := -1
  318. for r := range res {
  319. switch r.err {
  320. case io.ErrUnexpectedEOF, io.EOF:
  321. if size < 0 {
  322. size = r.size
  323. } else if r.size != size {
  324. // Shard sizes must match.
  325. return ErrShardSize
  326. }
  327. dst[r.n] = dst[r.n][0:r.size]
  328. case nil:
  329. default:
  330. return StreamReadError{Err: r.err, Stream: r.n}
  331. }
  332. }
  333. if size == 0 {
  334. return io.EOF
  335. }
  336. return nil
  337. }
  338. // cWriteShards writes shards concurrently
  339. func cWriteShards(out []io.Writer, in [][]byte) error {
  340. if len(out) != len(in) {
  341. panic("internal error: in and out size does not match")
  342. }
  343. var errs = make(chan error, len(out))
  344. var wg sync.WaitGroup
  345. wg.Add(len(out))
  346. for i := range in {
  347. go func(i int) {
  348. defer wg.Done()
  349. if out[i] == nil {
  350. errs <- nil
  351. return
  352. }
  353. n, err := out[i].Write(in[i])
  354. if err != nil {
  355. errs <- StreamWriteError{Err: err, Stream: i}
  356. return
  357. }
  358. if n != len(in[i]) {
  359. errs <- StreamWriteError{Err: io.ErrShortWrite, Stream: i}
  360. }
  361. }(i)
  362. }
  363. wg.Wait()
  364. close(errs)
  365. for err := range errs {
  366. if err != nil {
  367. return err
  368. }
  369. }
  370. return nil
  371. }
  372. // Verify returns true if the parity shards contain correct data.
  373. //
  374. // The number of shards must match the number total data+parity shards
  375. // given to NewStream().
  376. //
  377. // Each reader must supply the same number of bytes.
  378. // If a shard stream returns an error, a StreamReadError type error
  379. // will be returned.
  380. func (r rsStream) Verify(shards []io.Reader) (bool, error) {
  381. if len(shards) != r.r.Shards {
  382. return false, ErrTooFewShards
  383. }
  384. read := 0
  385. all := createSlice(r.r.Shards, r.bs)
  386. for {
  387. err := r.readShards(all, shards)
  388. if err == io.EOF {
  389. if read == 0 {
  390. return false, ErrShardNoData
  391. }
  392. return true, nil
  393. }
  394. if err != nil {
  395. return false, err
  396. }
  397. read += shardSize(all)
  398. ok, err := r.r.Verify(all)
  399. if !ok || err != nil {
  400. return ok, err
  401. }
  402. }
  403. }
  404. // ErrReconstructMismatch is returned by the StreamEncoder, if you supply
  405. // "valid" and "fill" streams on the same index.
  406. // Therefore it is impossible to see if you consider the shard valid
  407. // or would like to have it reconstructed.
  408. var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutually exclusive")
  409. // Reconstruct will recreate the missing shards if possible.
  410. //
  411. // Given a list of valid shards (to read) and invalid shards (to write)
  412. //
  413. // You indicate that a shard is missing by setting it to nil in the 'valid'
  414. // slice and at the same time setting a non-nil writer in "fill".
  415. // An index cannot contain both non-nil 'valid' and 'fill' entry.
  416. //
  417. // If there are too few shards to reconstruct the missing
  418. // ones, ErrTooFewShards will be returned.
  419. //
  420. // The reconstructed shard set is complete, but integrity is not verified.
  421. // Use the Verify function to check if data set is ok.
  422. func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
  423. if len(valid) != r.r.Shards {
  424. return ErrTooFewShards
  425. }
  426. if len(fill) != r.r.Shards {
  427. return ErrTooFewShards
  428. }
  429. all := createSlice(r.r.Shards, r.bs)
  430. for i := range valid {
  431. if valid[i] != nil && fill[i] != nil {
  432. return ErrReconstructMismatch
  433. }
  434. }
  435. read := 0
  436. for {
  437. err := r.readShards(all, valid)
  438. if err == io.EOF {
  439. if read == 0 {
  440. return ErrShardNoData
  441. }
  442. return nil
  443. }
  444. if err != nil {
  445. return err
  446. }
  447. read += shardSize(all)
  448. all = trimShards(all, shardSize(all))
  449. err = r.r.Reconstruct(all)
  450. if err != nil {
  451. return err
  452. }
  453. err = r.writeShards(fill, all)
  454. if err != nil {
  455. return err
  456. }
  457. }
  458. }
  459. // Join the shards and write the data segment to dst.
  460. //
  461. // Only the data shards are considered.
  462. //
  463. // You must supply the exact output size you want.
  464. // If there are to few shards given, ErrTooFewShards will be returned.
  465. // If the total data size is less than outSize, ErrShortData will be returned.
  466. func (r rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
  467. // Do we have enough shards?
  468. if len(shards) < r.r.DataShards {
  469. return ErrTooFewShards
  470. }
  471. // Trim off parity shards if any
  472. shards = shards[:r.r.DataShards]
  473. for i := range shards {
  474. if shards[i] == nil {
  475. return StreamReadError{Err: ErrShardNoData, Stream: i}
  476. }
  477. }
  478. // Join all shards
  479. src := io.MultiReader(shards...)
  480. // Copy data to dst
  481. n, err := io.CopyN(dst, src, outSize)
  482. if err == io.EOF {
  483. return ErrShortData
  484. }
  485. if err != nil {
  486. return err
  487. }
  488. if n != outSize {
  489. return ErrShortData
  490. }
  491. return nil
  492. }
  493. // Split a an input stream into the number of shards given to the encoder.
  494. //
  495. // The data will be split into equally sized shards.
  496. // If the data size isn't dividable by the number of shards,
  497. // the last shard will contain extra zeros.
  498. //
  499. // You must supply the total size of your input.
  500. // 'ErrShortData' will be returned if it is unable to retrieve the
  501. // number of bytes indicated.
  502. func (r rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
  503. if size == 0 {
  504. return ErrShortData
  505. }
  506. if len(dst) != r.r.DataShards {
  507. return ErrInvShardNum
  508. }
  509. for i := range dst {
  510. if dst[i] == nil {
  511. return StreamWriteError{Err: ErrShardNoData, Stream: i}
  512. }
  513. }
  514. // Calculate number of bytes per shard.
  515. perShard := (size + int64(r.r.DataShards) - 1) / int64(r.r.DataShards)
  516. // Pad data to r.Shards*perShard.
  517. padding := make([]byte, (int64(r.r.Shards)*perShard)-size)
  518. data = io.MultiReader(data, bytes.NewBuffer(padding))
  519. // Split into equal-length shards and copy.
  520. for i := range dst {
  521. n, err := io.CopyN(dst[i], data, perShard)
  522. if err != io.EOF && err != nil {
  523. return err
  524. }
  525. if n != perShard {
  526. return ErrShortData
  527. }
  528. }
  529. return nil
  530. }