Skip to content

Commit

Permalink
feat: DynamicSchema add opt NotNullable
Browse files Browse the repository at this point in the history
  • Loading branch information
exfly authored and yuzhichang committed Dec 10, 2023
1 parent bbc8b5b commit e21fa86
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ type TaskConfig struct {
} `json:"dims"`
// DynamicSchema will add columns present in message to clickhouse. Requires AutoSchema be true.
DynamicSchema struct {
Enable bool
MaxDims int // the upper limit of dynamic columns number, <=0 means math.MaxInt16. protecting dirty data attack
Enable bool
NotNullable bool
MaxDims int // the upper limit of dynamic columns number, <=0 means math.MaxInt16. protecting dirty data attack
// A column is added for new key K if all following conditions are true:
// - K isn't in ExcludeColumns
// - number of existing columns doesn't reach MaxDims-1
Expand Down
15 changes: 10 additions & 5 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,21 +512,26 @@ func (c *ClickHouse) ChangeSchema(newKeys *sync.Map) (err error) {
var strVal string
switch intVal {
case model.Bool:
strVal = "Nullable(Bool)"
strVal = "Bool"
case model.Int64:
strVal = "Nullable(Int64)"
strVal = "Int64"
case model.Float64:
strVal = "Nullable(Float64)"
strVal = "Float64"
case model.String:
strVal = "Nullable(String)"
strVal = "String"
case model.DateTime:
strVal = "Nullable(DateTime64(3))"
strVal = "DateTime64(3)"
case model.Object:
strVal = model.GetTypeName(intVal)
default:
err = errors.Newf("%s: BUG: unsupported column type %s", taskCfg.Name, model.GetTypeName(intVal))
return false
}

if !taskCfg.DynamicSchema.NotNullable {
strVal = fmt.Sprintf("Nullable(%v)", strVal)
}

if c.taskCfg.PrometheusSchema && intVal == model.String {
alterSeries = append(alterSeries, fmt.Sprintf("ADD COLUMN IF NOT EXISTS `%s` %s", strKey, strVal))
} else {
Expand Down

0 comments on commit e21fa86

Please sign in to comment.