Skip to content

Commit

Permalink
avro: support logical types in union interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
jags9415 authored Dec 24, 2020
1 parent 06bedbc commit 9220149
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 1 deletion.
45 changes: 45 additions & 0 deletions decoder_union_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package avro_test

import (
"bytes"
"math/big"
"testing"
"time"

"github.com/hamba/avro"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -406,6 +408,49 @@ func TestDecoder_UnionInterfaceUnresolvableType(t *testing.T) {
assert.Equal(t, "foo", m["test"].(map[string]interface{})["b"])
}

func TestDecoder_UnionInterfaceWithTime(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x02, 0x80, 0xCD, 0xB7, 0xA2, 0xEE, 0xC7, 0xCD, 0x05}
schema := `{"type": "record", "name": "test", "fields" : [{"name": "a", "type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}]}`
dec, _ := avro.NewDecoder(schema, bytes.NewReader(data))

var got map[string]interface{}
err := dec.Decode(&got)

assert.NoError(t, err)
assert.Equal(t, time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC), got["a"])
}

func TestDecoder_UnionInterfaceWithDuration(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x02, 0xAA, 0xB4, 0xDE, 0x75}
schema := `{"type": "record", "name": "test", "fields" : [{"name": "a", "type": ["null", {"type": "int", "logicalType": "time-millis"}]}]}`
dec, _ := avro.NewDecoder(schema, bytes.NewReader(data))

var got map[string]interface{}
err := dec.Decode(&got)

assert.NoError(t, err)
assert.Equal(t, 123456789*time.Millisecond, got["a"])
}

func TestDecoder_UnionInterfaceWithDecimal(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x02, 0x6, 0x00, 0x87, 0x78}
schema := `{"type": "record", "name": "test", "fields" : [{"name": "a", "type": ["null", {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}]}]}`
dec, _ := avro.NewDecoder(schema, bytes.NewReader(data))

var got map[string]interface{}
err := dec.Decode(&got)
expected := big.NewRat(1734, 5)

assert.NoError(t, err)
assert.Equal(t, *expected, got["a"])
}

func TestDecoder_UnionInterfaceUnresolvableTypeWithError(t *testing.T) {
defer ConfigTeardown()

Expand Down
10 changes: 10 additions & 0 deletions resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package avro

import (
"fmt"
"math/big"
"time"

"github.com/modern-go/concurrent"
"github.com/modern-go/reflect2"
Expand Down Expand Up @@ -34,6 +36,14 @@ func NewTypeResolver() *TypeResolver {
r.Register(string(Bytes), []byte{})
r.Register(string(Boolean), bool(true))

// Register logical types
r.Register(string(Int)+"."+string(Date), time.Time{})
r.Register(string(Int)+"."+string(TimeMillis), time.Duration(0))
r.Register(string(Long)+"."+string(TimestampMillis), time.Time{})
r.Register(string(Long)+"."+string(TimestampMicros), time.Time{})
r.Register(string(Long)+"."+string(TimeMicros), time.Duration(0))
r.Register(string(Bytes)+"."+string(Decimal), big.Rat{})

return r
}

Expand Down
7 changes: 6 additions & 1 deletion schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,5 +1061,10 @@ func schemaTypeName(schema Schema) string {
return n.FullName()
}

return string(schema.Type())
name := string(schema.Type())
if lt := getLogicalType(schema); lt != "" {
name += "." + string(lt)
}

return name
}

0 comments on commit 9220149

Please sign in to comment.