forked from fraugster/parquet-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile_reader.go
212 lines (180 loc) · 6.77 KB
/
file_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package goparquet
import (
"context"
"fmt"
"io"
"strings"
"github.com/fraugster/parquet-go/parquet"
"github.com/pkg/errors"
)
// FileReader is used to read data from a parquet file. Always use NewFileReader to create
// such an object.
type FileReader struct {
meta *parquet.FileMetaData
SchemaReader
reader io.ReadSeeker
rowGroupPosition int
currentRecord int64
skipRowGroup bool
ctx context.Context
}
// NewFileReader creates a new FileReader. You can limit the columns that are read by providing
// the names of the specific columns to read using dotted notation. If no columns are provided,
// then all columns are read.
func NewFileReader(r io.ReadSeeker, columns ...string) (*FileReader, error) {
return NewFileReaderWithContext(context.Background(), r, columns...)
}
// NewFileReaderWithContext creates a new FileReader. You can limit the columns that are read by providing
// the names of the specific columns to read using dotted notation. If no columns are provided,
// then all columns are read. The provided context.Context overrides the default context (which is a context.Background())
// for use in other methods of the *FileReader type.
func NewFileReaderWithContext(ctx context.Context, r io.ReadSeeker, columns ...string) (*FileReader, error) {
meta, err := ReadFileMetaData(r, true)
if err != nil {
return nil, errors.Wrap(err, "reading file meta data failed")
}
schema, err := makeSchema(meta)
if err != nil {
return nil, errors.Wrap(err, "creating schema failed")
}
schema.SetSelectedColumns(columns...)
// Reset the reader to the beginning of the file
if _, err := r.Seek(4, io.SeekStart); err != nil {
return nil, err
}
return &FileReader{
meta: meta,
SchemaReader: schema,
reader: r,
ctx: ctx,
}, nil
}
// NewFileReaderWithMetaData creates a new FileReader with custom file meta data. You can limit the columns that
// are read by providing the names of the specific columns to read using dotted notation. If no columns are provided,
// then all columns are read.
func NewFileReaderWithMetaData(r io.ReadSeeker, meta *parquet.FileMetaData, columns ...string) (*FileReader, error) {
var err error
if meta == nil {
meta, err = ReadFileMetaData(r, true)
if err != nil {
return nil, errors.Wrap(err, "reading file meta data failed")
}
}
schema, err := makeSchema(meta)
if err != nil {
return nil, errors.Wrap(err, "creating schema failed")
}
schema.SetSelectedColumns(columns...)
// Reset the reader to the beginning of the file
if _, err := r.Seek(4, io.SeekStart); err != nil {
return nil, err
}
return &FileReader{
meta: meta,
SchemaReader: schema,
reader: r,
}, nil
}
// SeekToRowGroup seeks to a particular row group, identified by its index.
func (f *FileReader) SeekToRowGroup(rowGroupPosition int) error {
return f.SeekToRowGroupWithContext(f.ctx, rowGroupPosition)
}
// SeekToRowGroupWithContext seeks to a particular row group, identified by its index.
func (f *FileReader) SeekToRowGroupWithContext(ctx context.Context, rowGroupPosition int) error {
f.rowGroupPosition = rowGroupPosition - 1
f.currentRecord = 0
return f.readRowGroup(ctx)
}
// readRowGroup read the next row group into memory
func (f *FileReader) readRowGroup(ctx context.Context) error {
if len(f.meta.RowGroups) <= f.rowGroupPosition {
return io.EOF
}
f.rowGroupPosition++
return readRowGroup(ctx, f.reader, f.SchemaReader, f.meta.RowGroups[f.rowGroupPosition-1])
}
// CurrentRowGroup returns information about the current row group.
func (f *FileReader) CurrentRowGroup() *parquet.RowGroup {
if f == nil || f.meta == nil || f.meta.RowGroups == nil || f.rowGroupPosition-1 >= len(f.meta.RowGroups) {
return nil
}
return f.meta.RowGroups[f.rowGroupPosition-1]
}
// RowGroupCount returns the number of row groups in the parquet file.
func (f *FileReader) RowGroupCount() int {
return len(f.meta.RowGroups)
}
// NumRows returns the number of rows in the parquet file. This information is directly taken from
// the file's meta data.
func (f *FileReader) NumRows() int64 {
return f.meta.NumRows
}
func (f *FileReader) advanceIfNeeded(ctx context.Context) error {
if f.rowGroupPosition == 0 || f.currentRecord >= f.SchemaReader.rowGroupNumRecords() || f.skipRowGroup {
if err := f.readRowGroup(ctx); err != nil {
f.skipRowGroup = true
return err
}
f.currentRecord = 0
f.skipRowGroup = false
}
return nil
}
// RowGroupNumRows returns the number of rows in the current RowGroup.
func (f *FileReader) RowGroupNumRows() (int64, error) {
return f.RowGroupNumRowsWithContext(f.ctx)
}
// RowGroupNumRowsWithContext returns the number of rows in the current RowGroup.
func (f *FileReader) RowGroupNumRowsWithContext(ctx context.Context) (int64, error) {
if err := f.advanceIfNeeded(ctx); err != nil {
return 0, err
}
return f.SchemaReader.rowGroupNumRecords(), nil
}
// NextRow reads the next row from the parquet file. If required, it will load the next row group.
func (f *FileReader) NextRow() (map[string]interface{}, error) {
return f.NextRowWithContext(f.ctx)
}
// NextRowWithContext reads the next row from the parquet file. If required, it will load the next row group.
func (f *FileReader) NextRowWithContext(ctx context.Context) (map[string]interface{}, error) {
if err := f.advanceIfNeeded(ctx); err != nil {
return nil, err
}
f.currentRecord++
return f.SchemaReader.getData()
}
// SkipRowGroup skips the currently loaded row group and advances to the next row group.
func (f *FileReader) SkipRowGroup() {
f.skipRowGroup = true
}
// PreLoad is used to load the row group if required. It does nothing if the row group is already loaded.
func (f *FileReader) PreLoad() error {
return f.PreLoadWithContext(f.ctx)
}
// PreLoadWithContext is used to load the row group if required. It does nothing if the row group is already loaded.
func (f *FileReader) PreLoadWithContext(ctx context.Context) error {
return f.advanceIfNeeded(ctx)
}
// MetaData returns a map of metadata key-value pairs stored in the parquet file.
func (f *FileReader) MetaData() map[string]string {
return keyValueMetaDataToMap(f.meta.KeyValueMetadata)
}
// ColumnMetaData returns a map of metadata key-value pairs for the provided column in the current
// row group. The column name has to be provided in its dotted notation.
func (f *FileReader) ColumnMetaData(colName string) (map[string]string, error) {
for _, col := range f.CurrentRowGroup().Columns {
if colName == strings.Join(col.MetaData.PathInSchema, ".") {
return keyValueMetaDataToMap(col.MetaData.KeyValueMetadata), nil
}
}
return nil, fmt.Errorf("column %q not found", colName)
}
func keyValueMetaDataToMap(kvMetaData []*parquet.KeyValue) map[string]string {
data := make(map[string]string)
for _, kv := range kvMetaData {
if kv.Value != nil {
data[kv.Key] = *kv.Value
}
}
return data
}