forked from go-avro/avro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
schema_prepared.go
145 lines (129 loc) · 3.5 KB
/
schema_prepared.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package avro
import (
"fmt"
"reflect"
"sync"
)
/*
Prepare optimizes a schema for decoding/encoding.
It makes a recursive copy of the schema given and returns an immutable
wrapper of the schema with some optimizations applied.
*/
func Prepare(schema Schema) Schema {
job := prepareJob{
seen: make(map[Schema]Schema),
}
return job.prepare(schema)
}
type prepareJob struct {
// the seen struct prevents infinite recursion by caching conversions.
seen map[Schema]Schema
}
func (job *prepareJob) prepare(schema Schema) Schema {
output := schema
switch schema := schema.(type) {
case *RecordSchema:
output = job.seen[schema] // avoid repetetive work
if output == nil {
output = job.prepareRecordSchema(schema)
}
case *RecursiveSchema:
if seen := job.seen[schema.Actual]; seen != nil {
return seen
} else {
return job.prepare(schema.Actual)
}
case *UnionSchema:
output = job.prepareUnionSchema(schema)
case *ArraySchema:
output = job.prepareArraySchema(schema)
default:
return schema
}
job.seen[schema] = output
return output
}
func (job *prepareJob) prepareUnionSchema(input *UnionSchema) Schema {
output := &UnionSchema{
Types: make([]Schema, len(input.Types)),
}
for i, t := range input.Types {
output.Types[i] = job.prepare(t)
}
return output
}
func (job *prepareJob) prepareArraySchema(input *ArraySchema) Schema {
return &ArraySchema{
Properties: input.Properties,
Items: job.prepare(input.Items),
}
}
func (job *prepareJob) prepareMapSchema(input *MapSchema) Schema {
return &MapSchema{
Properties: input.Properties,
Values: job.prepare(input.Values),
}
}
func (job *prepareJob) prepareRecordSchema(input *RecordSchema) *preparedRecordSchema {
output := &preparedRecordSchema{
RecordSchema: *input,
pool: sync.Pool{New: func() interface{} { return make(map[reflect.Type]*recordPlan) }},
}
job.seen[input] = output // put the in-progress output here before iterating fields, solves self-recursive and co-recursive.
output.Fields = nil
for _, field := range input.Fields {
output.Fields = append(output.Fields, &SchemaField{
Name: field.Name,
Doc: field.Doc,
Default: field.Default,
Type: job.prepare(field.Type),
})
}
return output
}
type preparedRecordSchema struct {
RecordSchema
pool sync.Pool
}
func (rs *preparedRecordSchema) getPlan(t reflect.Type) (plan *recordPlan, err error) {
cache := rs.pool.Get().(map[reflect.Type]*recordPlan)
if plan = cache[t]; plan != nil {
rs.pool.Put(cache)
return
}
// Use the reflectmap to get field info.
ri := reflectEnsureRi(t)
decodePlan := make([]structFieldPlan, len(rs.Fields))
for i, schemafield := range rs.Fields {
index, ok := ri.names[schemafield.Name]
if !ok {
err = fmt.Errorf("Type %v does not have field %s required for decoding schema", t, schemafield.Name)
}
entry := &decodePlan[i]
entry.schema = schemafield.Type
entry.name = schemafield.Name
entry.index = index
entry.dec = specificDecoder(entry)
}
plan = &recordPlan{
// Over time, we will create decode/encode plans for more things.
decodePlan: decodePlan,
}
cache[t] = plan
rs.pool.Put(cache)
return
}
// This is used
var sdr sDatumReader
type recordPlan struct {
decodePlan []structFieldPlan
}
// For right now, until we implement more optimizations,
// we have a lot of cases we want a *RecordSchema. This makes it a bit easier to deal with.
func assertRecordSchema(s Schema) *RecordSchema {
rs, ok := s.(*RecordSchema)
if !ok {
rs = &s.(*preparedRecordSchema).RecordSchema
}
return rs
}