Skip to content

Commit

Permalink
feat: support UPv4/IPv6 data types
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Jun 4, 2024
1 parent 7da534a commit 756c312
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 1 deletion.
2 changes: 2 additions & 0 deletions model/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Metric interface {
GetObject(key string, nullable bool) (val interface{})
GetMap(key string, typeinfo *TypeInfo) (val interface{})
GetArray(key string, t int) (val interface{})
GetIPv4(key string, nullable bool) (val interface{})
GetIPv6(key string, nullable bool) (val interface{})
GetNewKeys(knownKeys, newKeys, warnKeys *sync.Map, white, black *regexp.Regexp, partition int, offset int64) bool
}

Expand Down
12 changes: 11 additions & 1 deletion model/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
String
Object
Map
IPv4
IPv6
)

type TypeInfo struct {
Expand Down Expand Up @@ -91,6 +93,10 @@ func GetTypeName(typ int) (name string) {
name = "Object('json')"
case Map:
name = "Map"
case IPv4:
name = "IPv4"
case IPv6:
name = "IPv6"
default:
name = "Unknown"
}
Expand Down Expand Up @@ -135,6 +141,10 @@ func GetValueByType(metric Metric, cwt *ColumnWithType) (val interface{}) {
val = metric.GetMap(name, cwt.Type)
case Object:
val = metric.GetObject(name, cwt.Type.Nullable)
case IPv4:
val = metric.GetIPv4(name, cwt.Type.Nullable)
case IPv6:
val = metric.GetIPv6(name, cwt.Type.Nullable)
default:
util.Logger.Fatal("LOGIC ERROR: reached switch default condition")
}
Expand Down Expand Up @@ -190,7 +200,7 @@ func WhichType(typ string) (ti *TypeInfo) {

func init() {
typeInfo = make(map[string]*TypeInfo)
for _, t := range []int{Bool, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64, DateTime, String, Object} {
for _, t := range []int{Bool, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64, DateTime, String, Object, IPv4, IPv6} {
tn := GetTypeName(t)
typeInfo[tn] = &TypeInfo{Type: t}
nullTn := fmt.Sprintf("Nullable(%s)", tn)
Expand Down
15 changes: 15 additions & 0 deletions parser/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/csv"
"fmt"
"math"
"net"
"regexp"
"strconv"
"sync"
Expand Down Expand Up @@ -154,6 +155,20 @@ func (c *CsvMetric) GetFloat64(key string, nullable bool) (val interface{}) {
return CsvGetFloat[float64](c, key, nullable, math.MaxFloat64)
}

func (c *CsvMetric) GetIPv4(key string, nullable bool) (val interface{}) {
return c.GetUint32(key, nullable)
}

func (c *CsvMetric) GetIPv6(key string, nullable bool) (val interface{}) {
s := c.GetString(key, nullable).(string)
if net.ParseIP(s) != nil {
val = s
} else {
val = net.IPv6zero.String()
}
return val
}

func CsvGetInt[T constraints.Signed](c *CsvMetric, key string, nullable bool, min, max int64) (val interface{}) {
var idx int
var ok bool
Expand Down
75 changes: 75 additions & 0 deletions parser/fastjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package parser
import (
"fmt"
"math"
"net"
"regexp"
"strconv"
"sync"
Expand Down Expand Up @@ -117,6 +118,14 @@ func (c *FastjsonMetric) GetFloat64(key string, nullable bool) (val interface{})
return FastjsonGetFloat[float64](c.value.Get(key), nullable, math.MaxFloat64)
}

func (c *FastjsonMetric) GetIPv4(key string, nullable bool) (val interface{}) {
return getIPv4(c.value.Get(key), nullable)
}

func (c *FastjsonMetric) GetIPv6(key string, nullable bool) (val interface{}) {
return getIPv6(c.value.Get(key), nullable)
}

func FastjsonGetInt[T constraints.Signed](v *fastjson.Value, nullable bool, min, max int64) (val interface{}) {
if !fjCompatibleInt(v) {
val = getDefaultInt[T](nullable)
Expand Down Expand Up @@ -271,6 +280,54 @@ func getBool(v *fastjson.Value, nullable bool) (val interface{}) {
return
}

func getIPv4(v *fastjson.Value, nullable bool) (val interface{}) {
if v == nil || v.Type() == fastjson.TypeNull {
if nullable {
return
}
val = ""
return
}
switch v.Type() {
case fastjson.TypeString:
b, _ := v.StringBytes()
s := string(b)
if net.ParseIP(s) != nil {
val = s
} else {
val = net.IPv4zero.String()
}
case fastjson.TypeNumber:
val = FastjsonGetUint[uint32](v, nullable, math.MaxUint32)
default:
val = net.IPv4zero.String()
}
return
}

func getIPv6(v *fastjson.Value, nullable bool) (val interface{}) {
if v == nil || v.Type() == fastjson.TypeNull {
if nullable {
return
}
val = ""
return
}
switch v.Type() {
case fastjson.TypeString:
b, _ := v.StringBytes()
s := string(b)
if net.ParseIP(s) != nil {
val = s
} else {
val = net.IPv6zero.String()
}
default:
val = net.IPv6zero.String()
}
return
}

func getDecimal(v *fastjson.Value, nullable bool) (val interface{}) {
if !fjCompatibleFloat(v) {
val = getDefaultDecimal(nullable)
Expand Down Expand Up @@ -404,6 +461,20 @@ func getArray(c *FastjsonMetric, sourcename string, v *fastjson.Value, typ int)
}
}
val = arr
case model.IPv4:
arr := make([]interface{}, 0)
for _, e := range array {
v := getIPv4(e, false)
arr = append(arr, v)
}
val = arr
case model.IPv6:
arr := make([]interface{}, 0)
for _, e := range array {
v := getIPv6(e, false)
arr = append(arr, v)
}
val = arr
default:
util.Logger.Fatal(fmt.Sprintf("LOGIC ERROR: unsupported array type %v", typ))
}
Expand Down Expand Up @@ -515,6 +586,10 @@ func (c *FastjsonMetric) castMapValueByType(sourcename string, value *fastjson.V
val = FastjsonGetUint[uint32](value, typeinfo.Nullable, math.MaxUint32)
case model.UInt64:
val = FastjsonGetUint[uint64](value, typeinfo.Nullable, math.MaxUint64)
case model.IPv4:
val = getIPv4(value, typeinfo.Nullable)
case model.IPv6:
val = getIPv6(value, typeinfo.Nullable)
case model.Float32:
val = FastjsonGetFloat[float32](value, typeinfo.Nullable, math.MaxFloat32)
case model.Float64:
Expand Down
77 changes: 77 additions & 0 deletions parser/gjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package parser
import (
"fmt"
"math"
"net"
"regexp"
"strconv"
"sync"
Expand Down Expand Up @@ -108,6 +109,14 @@ func (c *GjsonMetric) GetFloat64(key string, nullable bool) (val interface{}) {
return GjsonGetFloat[float64](c, c.getField(key), nullable, math.MaxFloat64)
}

func (c *GjsonMetric) GetIPv4(key string, nullable bool) (val interface{}) {
return getGJsonIPv4(c, c.getField(key), nullable)
}

func (c *GjsonMetric) GetIPv6(key string, nullable bool) (val interface{}) {
return getGJsonIPv6(c, c.getField(key), nullable)
}

func GjsonGetInt[T constraints.Signed](c *GjsonMetric, r gjson.Result, nullable bool, min, max int64) (val interface{}) {
if !gjCompatibleInt(r) {
val = getDefaultInt[T](nullable)
Expand Down Expand Up @@ -404,6 +413,10 @@ func (c *GjsonMetric) castResultByType(sourcename string, value gjson.Result, ty
val = GjsonGetUint[uint32](c, value, typeinfo.Nullable, math.MaxUint32)
case model.UInt64:
val = GjsonGetUint[uint64](c, value, typeinfo.Nullable, math.MaxUint64)
case model.IPv6:
val = getGJsonIPv6(c, value, typeinfo.Nullable)
case model.IPv4:
val = getGJsonIPv4(c, value, typeinfo.Nullable)
case model.Float32:
val = GjsonGetFloat[float32](c, value, typeinfo.Nullable, math.MaxFloat32)
case model.Float64:
Expand Down Expand Up @@ -465,6 +478,56 @@ func getGJsonString(r gjson.Result, nullable bool) (val interface{}) {
return
}

func getGJsonIPv4(c *GjsonMetric, r gjson.Result, nullable bool) (val interface{}) {
if !r.Exists() || r.Type == gjson.Null {
if nullable {
return
}
val = ""
return
}
switch r.Type {
case gjson.Null:
val = ""
case gjson.String:
s := r.Str
if net.ParseIP(s) != nil {
val = s
} else {
val = net.IPv4zero.String()
}
case gjson.Number:
val = GjsonGetUint[uint32](c, r, nullable, math.MaxUint32)
default:
val = net.IPv4zero.String()
}
return
}

func getGJsonIPv6(c *GjsonMetric, r gjson.Result, nullable bool) (val interface{}) {
if !r.Exists() || r.Type == gjson.Null {
if nullable {
return
}
val = ""
return
}
switch r.Type {
case gjson.Null:
val = ""
case gjson.String:
s := r.Str
if net.ParseIP(s) != nil {
val = s
} else {
val = net.IPv6zero.String()
}
default:
val = net.IPv6zero.String()
}
return
}

func getGJsonDateTime(c *GjsonMetric, key string, r gjson.Result, nullable bool) (val interface{}) {
if !gjCompatibleDateTime(r) {
val = getDefaultDateTime(nullable)
Expand Down Expand Up @@ -563,6 +626,20 @@ func getGJsonArray(c *GjsonMetric, key string, r gjson.Result, typ int) (val int
results = append(results, t)
}
val = results
case model.IPv4:
arr := make([]interface{}, 0)
for _, e := range array {
v := getGJsonIPv4(c, e, false)
arr = append(arr, v)
}
val = arr
case model.IPv6:
arr := make([]interface{}, 0)
for _, e := range array {
v := getGJsonIPv6(c, e, false)
arr = append(arr, v)
}
val = arr
default:
util.Logger.Fatal(fmt.Sprintf("LOGIC ERROR: unsupported array type %v", typ))
}
Expand Down

0 comments on commit 756c312

Please sign in to comment.