1
0

streaming_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. /**
  2. * Unit tests for ReedSolomon Streaming API
  3. *
  4. * Copyright 2015, Klaus Post
  5. */
  6. package reedsolomon
  7. import (
  8. "bytes"
  9. "io"
  10. "io/ioutil"
  11. "math/rand"
  12. "testing"
  13. )
  14. func TestStreamEncoding(t *testing.T) {
  15. perShard := 10 << 20
  16. if testing.Short() {
  17. perShard = 50000
  18. }
  19. r, err := NewStream(10, 3)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. rand.Seed(0)
  24. input := randomBytes(10, perShard)
  25. data := toBuffers(input)
  26. par := emptyBuffers(3)
  27. err = r.Encode(toReaders(data), toWriters(par))
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. // Reset Data
  32. data = toBuffers(input)
  33. all := append(toReaders(data), toReaders(par)...)
  34. ok, err := r.Verify(all)
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. if !ok {
  39. t.Fatal("Verification failed")
  40. }
  41. err = r.Encode(toReaders(emptyBuffers(1)), toWriters(emptyBuffers(1)))
  42. if err != ErrTooFewShards {
  43. t.Errorf("expected %v, got %v", ErrTooFewShards, err)
  44. }
  45. err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(1)))
  46. if err != ErrTooFewShards {
  47. t.Errorf("expected %v, got %v", ErrTooFewShards, err)
  48. }
  49. err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(3)))
  50. if err != ErrShardNoData {
  51. t.Errorf("expected %v, got %v", ErrShardNoData, err)
  52. }
  53. badShards := emptyBuffers(10)
  54. badShards[0] = randomBuffer(123)
  55. err = r.Encode(toReaders(badShards), toWriters(emptyBuffers(3)))
  56. if err != ErrShardSize {
  57. t.Errorf("expected %v, got %v", ErrShardSize, err)
  58. }
  59. }
  60. func TestStreamEncodingConcurrent(t *testing.T) {
  61. perShard := 10 << 20
  62. if testing.Short() {
  63. perShard = 50000
  64. }
  65. r, err := NewStreamC(10, 3, true, true)
  66. if err != nil {
  67. t.Fatal(err)
  68. }
  69. rand.Seed(0)
  70. input := randomBytes(10, perShard)
  71. data := toBuffers(input)
  72. par := emptyBuffers(3)
  73. err = r.Encode(toReaders(data), toWriters(par))
  74. if err != nil {
  75. t.Fatal(err)
  76. }
  77. // Reset Data
  78. data = toBuffers(input)
  79. all := append(toReaders(data), toReaders(par)...)
  80. ok, err := r.Verify(all)
  81. if err != nil {
  82. t.Fatal(err)
  83. }
  84. if !ok {
  85. t.Fatal("Verification failed")
  86. }
  87. err = r.Encode(toReaders(emptyBuffers(1)), toWriters(emptyBuffers(1)))
  88. if err != ErrTooFewShards {
  89. t.Errorf("expected %v, got %v", ErrTooFewShards, err)
  90. }
  91. err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(1)))
  92. if err != ErrTooFewShards {
  93. t.Errorf("expected %v, got %v", ErrTooFewShards, err)
  94. }
  95. err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(3)))
  96. if err != ErrShardNoData {
  97. t.Errorf("expected %v, got %v", ErrShardNoData, err)
  98. }
  99. badShards := emptyBuffers(10)
  100. badShards[0] = randomBuffer(123)
  101. badShards[1] = randomBuffer(123)
  102. err = r.Encode(toReaders(badShards), toWriters(emptyBuffers(3)))
  103. if err != ErrShardSize {
  104. t.Errorf("expected %v, got %v", ErrShardSize, err)
  105. }
  106. }
  107. func randomBuffer(length int) *bytes.Buffer {
  108. b := make([]byte, length)
  109. fillRandom(b)
  110. return bytes.NewBuffer(b)
  111. }
  112. func randomBytes(n, length int) [][]byte {
  113. bufs := make([][]byte, n)
  114. for j := range bufs {
  115. bufs[j] = make([]byte, length)
  116. fillRandom(bufs[j])
  117. }
  118. return bufs
  119. }
  120. func toBuffers(in [][]byte) []*bytes.Buffer {
  121. out := make([]*bytes.Buffer, len(in))
  122. for i := range in {
  123. out[i] = bytes.NewBuffer(in[i])
  124. }
  125. return out
  126. }
  127. func toReaders(in []*bytes.Buffer) []io.Reader {
  128. out := make([]io.Reader, len(in))
  129. for i := range in {
  130. out[i] = in[i]
  131. }
  132. return out
  133. }
  134. func toWriters(in []*bytes.Buffer) []io.Writer {
  135. out := make([]io.Writer, len(in))
  136. for i := range in {
  137. out[i] = in[i]
  138. }
  139. return out
  140. }
  141. func nilWriters(n int) []io.Writer {
  142. out := make([]io.Writer, n)
  143. for i := range out {
  144. out[i] = nil
  145. }
  146. return out
  147. }
  148. func emptyBuffers(n int) []*bytes.Buffer {
  149. b := make([]*bytes.Buffer, n)
  150. for i := range b {
  151. b[i] = &bytes.Buffer{}
  152. }
  153. return b
  154. }
  155. func toBytes(in []*bytes.Buffer) [][]byte {
  156. b := make([][]byte, len(in))
  157. for i := range in {
  158. b[i] = in[i].Bytes()
  159. }
  160. return b
  161. }
  162. func TestStreamReconstruct(t *testing.T) {
  163. perShard := 10 << 20
  164. if testing.Short() {
  165. perShard = 50000
  166. }
  167. r, err := NewStream(10, 3)
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. rand.Seed(0)
  172. shards := randomBytes(10, perShard)
  173. parb := emptyBuffers(3)
  174. err = r.Encode(toReaders(toBuffers(shards)), toWriters(parb))
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. parity := toBytes(parb)
  179. all := append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
  180. fill := make([]io.Writer, 13)
  181. // Reconstruct with all shards present, all fill nil
  182. err = r.Reconstruct(all, fill)
  183. if err != nil {
  184. t.Fatal(err)
  185. }
  186. all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
  187. // Reconstruct with 10 shards present
  188. all[0] = nil
  189. fill[0] = emptyBuffers(1)[0]
  190. all[7] = nil
  191. fill[7] = emptyBuffers(1)[0]
  192. all[11] = nil
  193. fill[11] = emptyBuffers(1)[0]
  194. err = r.Reconstruct(all, fill)
  195. if err != nil {
  196. t.Fatal(err)
  197. }
  198. shards[0] = fill[0].(*bytes.Buffer).Bytes()
  199. shards[7] = fill[7].(*bytes.Buffer).Bytes()
  200. parity[1] = fill[11].(*bytes.Buffer).Bytes()
  201. all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
  202. ok, err := r.Verify(all)
  203. if err != nil {
  204. t.Fatal(err)
  205. }
  206. if !ok {
  207. t.Fatal("Verification failed")
  208. }
  209. all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
  210. // Reconstruct with 9 shards present (should fail)
  211. all[0] = nil
  212. fill[0] = emptyBuffers(1)[0]
  213. all[4] = nil
  214. fill[4] = emptyBuffers(1)[0]
  215. all[7] = nil
  216. fill[7] = emptyBuffers(1)[0]
  217. all[11] = nil
  218. fill[11] = emptyBuffers(1)[0]
  219. err = r.Reconstruct(all, fill)
  220. if err != ErrTooFewShards {
  221. t.Errorf("expected %v, got %v", ErrTooFewShards, err)
  222. }
  223. err = r.Reconstruct(toReaders(emptyBuffers(3)), toWriters(emptyBuffers(3)))
  224. if err != ErrTooFewShards {
  225. t.Errorf("expected %v, got %v", ErrTooFewShards, err)
  226. }
  227. err = r.Reconstruct(toReaders(emptyBuffers(13)), toWriters(emptyBuffers(3)))
  228. if err != ErrTooFewShards {
  229. t.Errorf("expected %v, got %v", ErrTooFewShards, err)
  230. }
  231. err = r.Reconstruct(toReaders(emptyBuffers(13)), toWriters(emptyBuffers(13)))
  232. if err != ErrReconstructMismatch {
  233. t.Errorf("expected %v, got %v", ErrReconstructMismatch, err)
  234. }
  235. err = r.Reconstruct(toReaders(emptyBuffers(13)), nilWriters(13))
  236. if err != ErrShardNoData {
  237. t.Errorf("expected %v, got %v", ErrShardNoData, err)
  238. }
  239. }
  240. func TestStreamVerify(t *testing.T) {
  241. perShard := 10 << 20
  242. if testing.Short() {
  243. perShard = 50000
  244. }
  245. r, err := NewStream(10, 4)
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. shards := randomBytes(10, perShard)
  250. parb := emptyBuffers(4)
  251. err = r.Encode(toReaders(toBuffers(shards)), toWriters(parb))
  252. if err != nil {
  253. t.Fatal(err)
  254. }
  255. parity := toBytes(parb)
  256. all := append(toReaders(toBuffers(shards)), toReaders(parb)...)
  257. ok, err := r.Verify(all)
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. if !ok {
  262. t.Fatal("Verification failed")
  263. }
  264. // Flip bits in a random byte
  265. parity[0][len(parity[0])-20000] = parity[0][len(parity[0])-20000] ^ 0xff
  266. all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
  267. ok, err = r.Verify(all)
  268. if err != nil {
  269. t.Fatal(err)
  270. }
  271. if ok {
  272. t.Fatal("Verification did not fail")
  273. }
  274. // Re-encode
  275. err = r.Encode(toReaders(toBuffers(shards)), toWriters(parb))
  276. if err != nil {
  277. t.Fatal(err)
  278. }
  279. // Fill a data segment with random data
  280. shards[0][len(shards[0])-30000] = shards[0][len(shards[0])-30000] ^ 0xff
  281. all = append(toReaders(toBuffers(shards)), toReaders(parb)...)
  282. ok, err = r.Verify(all)
  283. if err != nil {
  284. t.Fatal(err)
  285. }
  286. if ok {
  287. t.Fatal("Verification did not fail")
  288. }
  289. _, err = r.Verify(toReaders(emptyBuffers(10)))
  290. if err != ErrTooFewShards {
  291. t.Errorf("expected %v, got %v", ErrTooFewShards, err)
  292. }
  293. _, err = r.Verify(toReaders(emptyBuffers(14)))
  294. if err != ErrShardNoData {
  295. t.Errorf("expected %v, got %v", ErrShardNoData, err)
  296. }
  297. }
  298. func TestStreamOneEncode(t *testing.T) {
  299. codec, err := NewStream(5, 5)
  300. if err != nil {
  301. t.Fatal(err)
  302. }
  303. shards := [][]byte{
  304. {0, 1},
  305. {4, 5},
  306. {2, 3},
  307. {6, 7},
  308. {8, 9},
  309. }
  310. parb := emptyBuffers(5)
  311. codec.Encode(toReaders(toBuffers(shards)), toWriters(parb))
  312. parity := toBytes(parb)
  313. if parity[0][0] != 12 || parity[0][1] != 13 {
  314. t.Fatal("shard 5 mismatch")
  315. }
  316. if parity[1][0] != 10 || parity[1][1] != 11 {
  317. t.Fatal("shard 6 mismatch")
  318. }
  319. if parity[2][0] != 14 || parity[2][1] != 15 {
  320. t.Fatal("shard 7 mismatch")
  321. }
  322. if parity[3][0] != 90 || parity[3][1] != 91 {
  323. t.Fatal("shard 8 mismatch")
  324. }
  325. if parity[4][0] != 94 || parity[4][1] != 95 {
  326. t.Fatal("shard 9 mismatch")
  327. }
  328. all := append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
  329. ok, err := codec.Verify(all)
  330. if err != nil {
  331. t.Fatal(err)
  332. }
  333. if !ok {
  334. t.Fatal("did not verify")
  335. }
  336. shards[3][0]++
  337. all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
  338. ok, err = codec.Verify(all)
  339. if err != nil {
  340. t.Fatal(err)
  341. }
  342. if ok {
  343. t.Fatal("verify did not fail as expected")
  344. }
  345. }
  346. func benchmarkStreamEncode(b *testing.B, dataShards, parityShards, shardSize int) {
  347. r, err := NewStream(dataShards, parityShards)
  348. if err != nil {
  349. b.Fatal(err)
  350. }
  351. shards := make([][]byte, dataShards)
  352. for s := range shards {
  353. shards[s] = make([]byte, shardSize)
  354. }
  355. rand.Seed(0)
  356. for s := 0; s < dataShards; s++ {
  357. fillRandom(shards[s])
  358. }
  359. b.SetBytes(int64(shardSize * dataShards))
  360. b.ResetTimer()
  361. out := make([]io.Writer, parityShards)
  362. for i := range out {
  363. out[i] = ioutil.Discard
  364. }
  365. for i := 0; i < b.N; i++ {
  366. err = r.Encode(toReaders(toBuffers(shards)), out)
  367. if err != nil {
  368. b.Fatal(err)
  369. }
  370. }
  371. }
  372. func BenchmarkStreamEncode10x2x10000(b *testing.B) {
  373. benchmarkStreamEncode(b, 10, 2, 10000)
  374. }
  375. func BenchmarkStreamEncode100x20x10000(b *testing.B) {
  376. benchmarkStreamEncode(b, 100, 20, 10000)
  377. }
  378. func BenchmarkStreamEncode17x3x1M(b *testing.B) {
  379. benchmarkStreamEncode(b, 17, 3, 1024*1024)
  380. }
  381. // Benchmark 10 data shards and 4 parity shards with 16MB each.
  382. func BenchmarkStreamEncode10x4x16M(b *testing.B) {
  383. benchmarkStreamEncode(b, 10, 4, 16*1024*1024)
  384. }
  385. // Benchmark 5 data shards and 2 parity shards with 1MB each.
  386. func BenchmarkStreamEncode5x2x1M(b *testing.B) {
  387. benchmarkStreamEncode(b, 5, 2, 1024*1024)
  388. }
  389. // Benchmark 1 data shards and 2 parity shards with 1MB each.
  390. func BenchmarkStreamEncode10x2x1M(b *testing.B) {
  391. benchmarkStreamEncode(b, 10, 2, 1024*1024)
  392. }
  393. // Benchmark 10 data shards and 4 parity shards with 1MB each.
  394. func BenchmarkStreamEncode10x4x1M(b *testing.B) {
  395. benchmarkStreamEncode(b, 10, 4, 1024*1024)
  396. }
  397. // Benchmark 50 data shards and 20 parity shards with 1MB each.
  398. func BenchmarkStreamEncode50x20x1M(b *testing.B) {
  399. benchmarkStreamEncode(b, 50, 20, 1024*1024)
  400. }
  401. // Benchmark 17 data shards and 3 parity shards with 16MB each.
  402. func BenchmarkStreamEncode17x3x16M(b *testing.B) {
  403. benchmarkStreamEncode(b, 17, 3, 16*1024*1024)
  404. }
  405. func benchmarkStreamVerify(b *testing.B, dataShards, parityShards, shardSize int) {
  406. r, err := NewStream(dataShards, parityShards)
  407. if err != nil {
  408. b.Fatal(err)
  409. }
  410. shards := make([][]byte, parityShards+dataShards)
  411. for s := range shards {
  412. shards[s] = make([]byte, shardSize)
  413. }
  414. rand.Seed(0)
  415. for s := 0; s < dataShards; s++ {
  416. fillRandom(shards[s])
  417. }
  418. err = r.Encode(toReaders(toBuffers(shards[:dataShards])), toWriters(toBuffers(shards[dataShards:])))
  419. if err != nil {
  420. b.Fatal(err)
  421. }
  422. b.SetBytes(int64(shardSize * dataShards))
  423. b.ResetTimer()
  424. for i := 0; i < b.N; i++ {
  425. _, err = r.Verify(toReaders(toBuffers(shards)))
  426. if err != nil {
  427. b.Fatal(err)
  428. }
  429. }
  430. }
  431. // Benchmark 10 data slices with 2 parity slices holding 10000 bytes each
  432. func BenchmarkStreamVerify10x2x10000(b *testing.B) {
  433. benchmarkStreamVerify(b, 10, 2, 10000)
  434. }
  435. // Benchmark 50 data slices with 5 parity slices holding 100000 bytes each
  436. func BenchmarkStreamVerify50x5x50000(b *testing.B) {
  437. benchmarkStreamVerify(b, 50, 5, 100000)
  438. }
  439. // Benchmark 10 data slices with 2 parity slices holding 1MB bytes each
  440. func BenchmarkStreamVerify10x2x1M(b *testing.B) {
  441. benchmarkStreamVerify(b, 10, 2, 1024*1024)
  442. }
  443. // Benchmark 5 data slices with 2 parity slices holding 1MB bytes each
  444. func BenchmarkStreamVerify5x2x1M(b *testing.B) {
  445. benchmarkStreamVerify(b, 5, 2, 1024*1024)
  446. }
  447. // Benchmark 10 data slices with 4 parity slices holding 1MB bytes each
  448. func BenchmarkStreamVerify10x4x1M(b *testing.B) {
  449. benchmarkStreamVerify(b, 10, 4, 1024*1024)
  450. }
  451. // Benchmark 5 data slices with 2 parity slices holding 1MB bytes each
  452. func BenchmarkStreamVerify50x20x1M(b *testing.B) {
  453. benchmarkStreamVerify(b, 50, 20, 1024*1024)
  454. }
  455. // Benchmark 10 data slices with 4 parity slices holding 16MB bytes each
  456. func BenchmarkStreamVerify10x4x16M(b *testing.B) {
  457. benchmarkStreamVerify(b, 10, 4, 16*1024*1024)
  458. }
  459. func TestStreamSplitJoin(t *testing.T) {
  460. var data = make([]byte, 250000)
  461. rand.Seed(0)
  462. fillRandom(data)
  463. enc, _ := NewStream(5, 3)
  464. split := emptyBuffers(5)
  465. err := enc.Split(bytes.NewBuffer(data), toWriters(split), int64(len(data)))
  466. if err != nil {
  467. t.Fatal(err)
  468. }
  469. splits := toBytes(split)
  470. expect := len(data) / 5
  471. // Beware, if changing data size
  472. if split[0].Len() != expect {
  473. t.Errorf("unexpected size. expected %d, got %d", expect, split[0].Len())
  474. }
  475. err = enc.Split(bytes.NewBuffer([]byte{}), toWriters(emptyBuffers(3)), 0)
  476. if err != ErrShortData {
  477. t.Errorf("expected %v, got %v", ErrShortData, err)
  478. }
  479. buf := new(bytes.Buffer)
  480. err = enc.Join(buf, toReaders(toBuffers(splits)), int64(len(data)))
  481. if err != nil {
  482. t.Fatal(err)
  483. }
  484. joined := buf.Bytes()
  485. if !bytes.Equal(joined, data) {
  486. t.Fatal("recovered data does match original", joined[:8], data[:8], "... lengths:", len(joined), len(data))
  487. }
  488. err = enc.Join(buf, toReaders(emptyBuffers(2)), 0)
  489. if err != ErrTooFewShards {
  490. t.Errorf("expected %v, got %v", ErrTooFewShards, err)
  491. }
  492. bufs := toReaders(emptyBuffers(5))
  493. bufs[2] = nil
  494. err = enc.Join(buf, bufs, 0)
  495. if se, ok := err.(StreamReadError); ok {
  496. if se.Err != ErrShardNoData {
  497. t.Errorf("expected %v, got %v", ErrShardNoData, se.Err)
  498. }
  499. if se.Stream != 2 {
  500. t.Errorf("Expected error on stream 2, got %d", se.Stream)
  501. }
  502. } else {
  503. t.Errorf("expected error type %T, got %T", StreamReadError{}, err)
  504. }
  505. err = enc.Join(buf, toReaders(toBuffers(splits)), int64(len(data)+1))
  506. if err != ErrShortData {
  507. t.Errorf("expected %v, got %v", ErrShortData, err)
  508. }
  509. }
  510. func TestNewStream(t *testing.T) {
  511. tests := []struct {
  512. data, parity int
  513. err error
  514. }{
  515. {127, 127, nil},
  516. {256, 256, ErrMaxShardNum},
  517. {0, 1, ErrInvShardNum},
  518. {1, 0, ErrInvShardNum},
  519. {257, 1, ErrMaxShardNum},
  520. // overflow causes r.Shards to be negative
  521. {256, int(^uint(0) >> 1), errInvalidRowSize},
  522. }
  523. for _, test := range tests {
  524. _, err := NewStream(test.data, test.parity)
  525. if err != test.err {
  526. t.Errorf("New(%v, %v): expected %v, got %v", test.data, test.parity, test.err, err)
  527. }
  528. }
  529. }