2020-11-06 13:41:42 -05:00
|
|
|
package lz4
|
|
|
|
|
|
|
|
import (
|
|
|
|
"io"
|
|
|
|
|
|
|
|
"github.com/pierrec/lz4/v4/internal/lz4block"
|
|
|
|
"github.com/pierrec/lz4/v4/internal/lz4errors"
|
|
|
|
"github.com/pierrec/lz4/v4/internal/lz4stream"
|
|
|
|
)
|
|
|
|
|
|
|
|
var readerStates = []aState{
|
|
|
|
noState: newState,
|
|
|
|
errorState: newState,
|
|
|
|
newState: readState,
|
|
|
|
readState: closedState,
|
|
|
|
closedState: newState,
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewReader returns a new LZ4 frame decoder.
|
|
|
|
func NewReader(r io.Reader) *Reader {
|
|
|
|
return newReader(r, false)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newReader(r io.Reader, legacy bool) *Reader {
|
|
|
|
zr := &Reader{frame: lz4stream.NewFrame()}
|
|
|
|
zr.state.init(readerStates)
|
|
|
|
_ = zr.Apply(DefaultConcurrency, defaultOnBlockDone)
|
|
|
|
zr.Reset(r)
|
|
|
|
return zr
|
|
|
|
}
|
|
|
|
|
|
|
|
// Reader allows reading an LZ4 stream.
|
|
|
|
type Reader struct {
|
|
|
|
state _State
|
|
|
|
src io.Reader // source reader
|
|
|
|
num int // concurrency level
|
|
|
|
frame *lz4stream.Frame // frame being read
|
|
|
|
data []byte // block buffer allocated in non concurrent mode
|
|
|
|
reads chan []byte // pending data
|
|
|
|
idx int // size of pending data
|
|
|
|
handler func(int)
|
|
|
|
cum uint32
|
2021-06-10 10:44:25 -04:00
|
|
|
dict []byte
|
2020-11-06 13:41:42 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
func (*Reader) private() {}
|
|
|
|
|
|
|
|
func (r *Reader) Apply(options ...Option) (err error) {
|
|
|
|
defer r.state.check(&err)
|
|
|
|
switch r.state.state {
|
|
|
|
case newState:
|
|
|
|
case errorState:
|
|
|
|
return r.state.err
|
|
|
|
default:
|
|
|
|
return lz4errors.ErrOptionClosedOrError
|
|
|
|
}
|
|
|
|
for _, o := range options {
|
|
|
|
if err = o(r); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Size returns the size of the underlying uncompressed data, if set in the stream.
|
|
|
|
func (r *Reader) Size() int {
|
|
|
|
switch r.state.state {
|
|
|
|
case readState, closedState:
|
|
|
|
if r.frame.Descriptor.Flags.Size() {
|
|
|
|
return int(r.frame.Descriptor.ContentSize)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Reader) isNotConcurrent() bool {
|
|
|
|
return r.num == 1
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Reader) init() error {
|
2021-06-10 10:44:25 -04:00
|
|
|
err := r.frame.ParseHeaders(r.src)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if !r.frame.Descriptor.Flags.BlockIndependence() {
|
|
|
|
// We can't decompress dependent blocks concurrently.
|
|
|
|
// Instead of throwing an error to the user, silently drop concurrency
|
|
|
|
r.num = 1
|
|
|
|
}
|
2020-11-06 13:41:42 -05:00
|
|
|
data, err := r.frame.InitR(r.src, r.num)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
r.reads = data
|
|
|
|
r.idx = 0
|
|
|
|
size := r.frame.Descriptor.Flags.BlockSizeIndex()
|
|
|
|
r.data = size.Get()
|
|
|
|
r.cum = 0
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Reader) Read(buf []byte) (n int, err error) {
|
|
|
|
defer r.state.check(&err)
|
|
|
|
switch r.state.state {
|
|
|
|
case readState:
|
|
|
|
case closedState, errorState:
|
|
|
|
return 0, r.state.err
|
|
|
|
case newState:
|
|
|
|
// First initialization.
|
|
|
|
if err = r.init(); r.state.next(err) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return 0, r.state.fail()
|
|
|
|
}
|
|
|
|
for len(buf) > 0 {
|
|
|
|
var bn int
|
|
|
|
if r.idx == 0 {
|
|
|
|
if r.isNotConcurrent() {
|
|
|
|
bn, err = r.read(buf)
|
|
|
|
} else {
|
|
|
|
lz4block.Put(r.data)
|
|
|
|
r.data = <-r.reads
|
|
|
|
if len(r.data) == 0 {
|
|
|
|
// No uncompressed data: something went wrong or we are done.
|
|
|
|
err = r.frame.Blocks.ErrorR()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
switch err {
|
|
|
|
case nil:
|
|
|
|
case io.EOF:
|
|
|
|
if er := r.frame.CloseR(r.src); er != nil {
|
|
|
|
err = er
|
|
|
|
}
|
|
|
|
lz4block.Put(r.data)
|
|
|
|
r.data = nil
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if bn == 0 {
|
|
|
|
// Fill buf with buffered data.
|
|
|
|
bn = copy(buf, r.data[r.idx:])
|
|
|
|
r.idx += bn
|
|
|
|
if r.idx == len(r.data) {
|
|
|
|
// All data read, get ready for the next Read.
|
|
|
|
r.idx = 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
buf = buf[bn:]
|
|
|
|
n += bn
|
|
|
|
r.handler(bn)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// read uncompresses the next block as follow:
|
|
|
|
// - if buf has enough room, the block is uncompressed into it directly
|
|
|
|
// and the lenght of used space is returned
|
|
|
|
// - else, the uncompress data is stored in r.data and 0 is returned
|
|
|
|
func (r *Reader) read(buf []byte) (int, error) {
|
|
|
|
block := r.frame.Blocks.Block
|
|
|
|
_, err := block.Read(r.frame, r.src, r.cum)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
var direct bool
|
|
|
|
dst := r.data[:cap(r.data)]
|
|
|
|
if len(buf) >= len(dst) {
|
|
|
|
// Uncompress directly into buf.
|
|
|
|
direct = true
|
|
|
|
dst = buf
|
|
|
|
}
|
2021-06-10 10:44:25 -04:00
|
|
|
dst, err = block.Uncompress(r.frame, dst, r.dict, true)
|
2020-11-06 13:41:42 -05:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2021-06-10 10:44:25 -04:00
|
|
|
if !r.frame.Descriptor.Flags.BlockIndependence() {
|
|
|
|
if len(r.dict)+len(dst) > 128*1024 {
|
|
|
|
preserveSize := 64*1024 - len(dst)
|
|
|
|
if preserveSize < 0 {
|
|
|
|
preserveSize = 0
|
|
|
|
}
|
|
|
|
r.dict = r.dict[len(r.dict)-preserveSize:]
|
|
|
|
}
|
|
|
|
r.dict = append(r.dict, dst...)
|
|
|
|
}
|
2020-11-06 13:41:42 -05:00
|
|
|
r.cum += uint32(len(dst))
|
|
|
|
if direct {
|
|
|
|
return len(dst), nil
|
|
|
|
}
|
|
|
|
r.data = dst
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Reset clears the state of the Reader r such that it is equivalent to its
|
|
|
|
// initial state from NewReader, but instead writing to writer.
|
|
|
|
// No access to reader is performed.
|
|
|
|
//
|
|
|
|
// w.Close must be called before Reset.
|
|
|
|
func (r *Reader) Reset(reader io.Reader) {
|
|
|
|
if r.data != nil {
|
|
|
|
lz4block.Put(r.data)
|
|
|
|
r.data = nil
|
|
|
|
}
|
|
|
|
r.frame.Reset(r.num)
|
|
|
|
r.state.reset()
|
|
|
|
r.src = reader
|
|
|
|
r.reads = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// WriteTo efficiently uncompresses the data from the Reader underlying source to w.
|
|
|
|
func (r *Reader) WriteTo(w io.Writer) (n int64, err error) {
|
|
|
|
switch r.state.state {
|
|
|
|
case closedState, errorState:
|
|
|
|
return 0, r.state.err
|
|
|
|
case newState:
|
|
|
|
if err = r.init(); r.state.next(err) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return 0, r.state.fail()
|
|
|
|
}
|
|
|
|
defer r.state.nextd(&err)
|
|
|
|
|
|
|
|
var data []byte
|
|
|
|
if r.isNotConcurrent() {
|
|
|
|
size := r.frame.Descriptor.Flags.BlockSizeIndex()
|
|
|
|
data = size.Get()
|
|
|
|
defer lz4block.Put(data)
|
|
|
|
}
|
|
|
|
for {
|
|
|
|
var bn int
|
|
|
|
var dst []byte
|
|
|
|
if r.isNotConcurrent() {
|
|
|
|
bn, err = r.read(data)
|
|
|
|
dst = data[:bn]
|
|
|
|
} else {
|
|
|
|
lz4block.Put(dst)
|
|
|
|
dst = <-r.reads
|
|
|
|
bn = len(dst)
|
|
|
|
if bn == 0 {
|
|
|
|
// No uncompressed data: something went wrong or we are done.
|
|
|
|
err = r.frame.Blocks.ErrorR()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
switch err {
|
|
|
|
case nil:
|
|
|
|
case io.EOF:
|
|
|
|
err = r.frame.CloseR(r.src)
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
r.handler(bn)
|
|
|
|
bn, err = w.Write(dst)
|
|
|
|
n += int64(bn)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|