-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
qps/rate limit & reduce expire seek range & expire-key hash #204
base: master
Are you sure you want to change the base?
Changes from all commits
db84a39
3e4b89e
c89b27f
43bd6c3
95bd04b
63b131d
a9c28fe
e5ff501
fb11c67
586b30d
27f54e7
e5018e4
ecd78a2
0648ab8
f86245a
157d70a
fd0d8cc
756a0e6
46bb6da
a839995
815efa4
9612571
cc4255a
f00d8c5
da49bfa
30f7d2c
bcbd5a7
1a50695
a7753c4
fda1e66
a474656
8ed0698
6ae93be
70b3f15
a22ca0f
8403a21
2d59108
b034581
3b71e69
0fb99b7
37c1c68
9de2ae9
6c2328d
bbfce19
93cd7df
7e2521b
ad7cff1
aeca4b3
bb617b0
003e6b6
4423eaf
cc15799
d9a1baf
83a370d
d75dd34
3519ff9
c4eea83
bd43ec2
fe6e84b
977b457
9161d7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,28 +61,27 @@ func Integer(w io.Writer, v int64) OnCommit { | |
// BytesArray replies a [][]byte when commit | ||
func BytesArray(w io.Writer, a [][]byte) OnCommit { | ||
return func() { | ||
start := time.Now() | ||
resp.ReplyArray(w, len(a)) | ||
zap.L().Debug("reply array size", zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000)) | ||
start = time.Now() | ||
if _, err := resp.ReplyArray(w, len(a)); err != nil { | ||
return | ||
} | ||
for i := range a { | ||
if a[i] == nil { | ||
resp.ReplyNullBulkString(w) | ||
if err := resp.ReplyNullBulkString(w); err != nil { | ||
return | ||
} | ||
continue | ||
} | ||
resp.ReplyBulkString(w, string(a[i])) | ||
if i % 10 == 9 { | ||
zap.L().Debug("reply 10 bulk string", zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000)) | ||
start = time.Now() | ||
if err := resp.ReplyBulkString(w, string(a[i])); err != nil { | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
func BytesArrayOnce(w io.Writer, a [][]byte) OnCommit { | ||
return func() { | ||
resp.ReplyStringArray(w, a) | ||
} | ||
return func() { | ||
resp.ReplyStringArray(w, a) | ||
} | ||
} | ||
|
||
// TxnCommand runs a command in transaction | ||
|
@@ -92,6 +91,10 @@ type TxnCommand func(ctx *Context, txn *db.Transaction) (OnCommit, error) | |
func Call(ctx *Context) { | ||
ctx.Name = strings.ToLower(ctx.Name) | ||
|
||
if _, ok := txnCommands[ctx.Name]; ok && ctx.Server.LimitersMgr != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
ctx.Server.LimitersMgr.CheckLimit(ctx.Client.Namespace, ctx.Name, ctx.Args) | ||
} | ||
|
||
if ctx.Name != "auth" && | ||
ctx.Server.RequirePass != "" && | ||
ctx.Client.Authenticated == false { | ||
|
@@ -182,18 +185,16 @@ func AutoCommit(cmd TxnCommand) Command { | |
return func(ctx *Context) { | ||
retry.Ensure(ctx, func() error { | ||
mt := metrics.GetMetrics() | ||
start := time.Now() | ||
start := time.Now() | ||
txn, err := ctx.Client.DB.Begin() | ||
key := "" | ||
if len(ctx.Args) > 0 { | ||
key = ctx.Args[0] | ||
if len(ctx.Args) > 1 { | ||
mt.CommandArgsNumHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(float64(len(ctx.Args)-1)) | ||
} | ||
mt.CommandArgsNumHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(float64(len(ctx.Args))) | ||
} | ||
cost := time.Since(start).Seconds() | ||
zap.L().Debug("transation begin", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000))) | ||
mt.TxnBeginHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost) | ||
mt.TxnBeginHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost) | ||
zap.L().Debug("transation begin", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000))) | ||
if err != nil { | ||
mt.TxnFailuresCounterVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Inc() | ||
resp.ReplyError(ctx.Out, "ERR "+err.Error()) | ||
|
@@ -208,8 +209,8 @@ func AutoCommit(cmd TxnCommand) Command { | |
start = time.Now() | ||
onCommit, err := cmd(ctx, txn) | ||
cost = time.Since(start).Seconds() | ||
zap.L().Debug("command done", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000))) | ||
mt.CommandFuncDoneHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost) | ||
zap.L().Debug("command done", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000))) | ||
if err != nil { | ||
mt.TxnFailuresCounterVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Inc() | ||
resp.ReplyError(ctx.Out, err.Error()) | ||
|
@@ -257,8 +258,8 @@ func AutoCommit(cmd TxnCommand) Command { | |
onCommit() | ||
} | ||
cost = time.Since(start).Seconds() | ||
zap.L().Debug("onCommit ", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000))) | ||
mt.ReplyFuncDoneHistogramVec.WithLabelValues(ctx.Client.Namespace, ctx.Name).Observe(cost) | ||
zap.L().Debug("onCommit ", zap.String("name", ctx.Name), zap.String("key", key), zap.Int64("cost(us)", int64(cost*1000000))) | ||
mtFunc() | ||
return nil | ||
}) | ||
|
@@ -277,9 +278,9 @@ func feedMonitors(ctx *Context) { | |
id := strconv.FormatInt(int64(ctx.Client.DB.ID), 10) | ||
|
||
line := ts + " [" + id + " " + ctx.Client.RemoteAddr + "]" + " " + ctx.Name + " " + strings.Join(ctx.Args, " ") | ||
start := time.Now() | ||
start := time.Now() | ||
err := resp.ReplySimpleString(mCtx.Out, line) | ||
zap.L().Debug("feedMonitors reply", zap.String("name", ctx.Name), zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000)) | ||
zap.L().Debug("feedMonitors reply", zap.String("name", ctx.Name), zap.Int64("cost(us)", time.Since(start).Nanoseconds()/1000)) | ||
if err != nil { | ||
ctx.Server.Monitors.Delete(k) | ||
} | ||
|
@@ -299,6 +300,12 @@ func NewExecutor() *Executor { | |
return &Executor{txnCommands: txnCommands, commands: commands} | ||
} | ||
|
||
func (e *Executor) CanExecute(cmd string) bool { | ||
lowerName := strings.ToLower(cmd) | ||
_, ok := commands[lowerName] | ||
return ok | ||
} | ||
|
||
// Execute a command | ||
func (e *Executor) Execute(ctx *Context) { | ||
start := time.Now() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,16 +28,16 @@ func init() { | |
"getset": GetSet, | ||
"getrange": GetRange, | ||
// "msetnx": MSetNx, | ||
"setnx": SetNx, | ||
"setex": SetEx, | ||
"psetex": PSetEx, | ||
"setrange": SetRange, | ||
"setbit": SetBit, | ||
"setnx": SetNx, | ||
"setex": SetEx, | ||
"psetex": PSetEx, | ||
//"setrange": SetRange, | ||
//"setbit": SetBit, | ||
// "bitop": BitOp, | ||
// "bitfield": BitField, | ||
"getbit": GetBit, | ||
"bitpos": BitPos, | ||
"bitcount": BitCount, | ||
//"getbit": GetBit, | ||
//"bitpos": BitPos, | ||
//"bitcount": BitCount, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 命令已经实现,可以开放使用 |
||
"incr": Incr, | ||
"incrby": IncrBy, | ||
"decr": Decr, | ||
|
@@ -117,6 +117,8 @@ func init() { | |
|
||
// transactions, exec and discard should called explicitly, so they are registered here | ||
"multi": Desc{Proc: Multi, Cons: Constraint{1, flags("sF"), 0, 0, 0}}, | ||
"exec": Desc{Proc: Exec, Cons: Constraint{1, flags("sF"), 0, 0, 0}}, | ||
"discard": Desc{Proc: Discard, Cons: Constraint{1, flags("sF"), 0, 0, 0}}, | ||
"watch": Desc{Proc: Watch, Cons: Constraint{-2, flags("sF"), 1, -1, 1}}, | ||
"unwatch": Desc{Proc: Unwatch, Cons: Constraint{1, flags("sF"), 0, 0, 0}}, | ||
|
||
|
@@ -133,29 +135,29 @@ func init() { | |
"rpushx": Desc{Proc: AutoCommit(RPushx), Cons: Constraint{-3, flags("wmF"), 1, 1, 1}}, | ||
|
||
// strings | ||
"get": Desc{Proc: AutoCommit(Get), Cons: Constraint{2, flags("rF"), 1, 1, 1}}, | ||
"set": Desc{Proc: AutoCommit(Set), Cons: Constraint{-3, flags("wm"), 1, 1, 1}}, | ||
"setnx": Desc{Proc: AutoCommit(SetNx), Cons: Constraint{3, flags("wmF"), 1, 1, 1}}, | ||
"setex": Desc{Proc: AutoCommit(SetEx), Cons: Constraint{4, flags("wm"), 1, 1, 1}}, | ||
"psetex": Desc{Proc: AutoCommit(PSetEx), Cons: Constraint{4, flags("wm"), 1, 1, 1}}, | ||
"mget": Desc{Proc: AutoCommit(MGet), Cons: Constraint{-2, flags("rF"), 1, -1, 1}}, | ||
"mset": Desc{Proc: AutoCommit(MSet), Cons: Constraint{-3, flags("wm"), 1, -1, 2}}, | ||
"msetnx": Desc{Proc: AutoCommit(MSetNx), Cons: Constraint{-3, flags("wm"), 1, -1, 2}}, | ||
"strlen": Desc{Proc: AutoCommit(Strlen), Cons: Constraint{2, flags("rF"), 1, 1, 1}}, | ||
"append": Desc{Proc: AutoCommit(Append), Cons: Constraint{3, flags("wm"), 1, 1, 1}}, | ||
"setrange": Desc{Proc: AutoCommit(SetRange), Cons: Constraint{4, flags("wm"), 1, 1, 1}}, | ||
"get": Desc{Proc: AutoCommit(Get), Cons: Constraint{2, flags("rF"), 1, 1, 1}}, | ||
"set": Desc{Proc: AutoCommit(Set), Cons: Constraint{-3, flags("wm"), 1, 1, 1}}, | ||
"setnx": Desc{Proc: AutoCommit(SetNx), Cons: Constraint{3, flags("wmF"), 1, 1, 1}}, | ||
"setex": Desc{Proc: AutoCommit(SetEx), Cons: Constraint{4, flags("wm"), 1, 1, 1}}, | ||
"psetex": Desc{Proc: AutoCommit(PSetEx), Cons: Constraint{4, flags("wm"), 1, 1, 1}}, | ||
"mget": Desc{Proc: AutoCommit(MGet), Cons: Constraint{-2, flags("rF"), 1, -1, 1}}, | ||
"mset": Desc{Proc: AutoCommit(MSet), Cons: Constraint{-3, flags("wm"), 1, -1, 2}}, | ||
//"msetnx": Desc{Proc: AutoCommit(MSetNx), Cons: Constraint{-3, flags("wm"), 1, -1, 2}}, //run test in tests/redis/unit/type/string failed | ||
"strlen": Desc{Proc: AutoCommit(Strlen), Cons: Constraint{2, flags("rF"), 1, 1, 1}}, | ||
"append": Desc{Proc: AutoCommit(Append), Cons: Constraint{3, flags("wm"), 1, 1, 1}}, | ||
//"setrange": Desc{Proc: AutoCommit(SetRange), Cons: Constraint{4, flags("wm"), 1, 1, 1}}, //run test in tests/redis/unit/type/string failed | ||
"getrange": Desc{Proc: AutoCommit(GetRange), Cons: Constraint{4, flags("r"), 1, 1, 1}}, | ||
"incr": Desc{Proc: AutoCommit(Incr), Cons: Constraint{2, flags("wmF"), 1, 1, 1}}, | ||
"decr": Desc{Proc: AutoCommit(Decr), Cons: Constraint{2, flags("wmF"), 1, 1, 1}}, | ||
"incrby": Desc{Proc: AutoCommit(IncrBy), Cons: Constraint{3, flags("wmF"), 1, 1, 1}}, | ||
"decrby": Desc{Proc: AutoCommit(DecrBy), Cons: Constraint{3, flags("wmF"), 1, 1, 1}}, | ||
"incrbyfloat": Desc{Proc: AutoCommit(IncrByFloat), Cons: Constraint{3, flags("wmF"), 1, 1, 1}}, | ||
"setbit": Desc{Proc: AutoCommit(SetBit), Cons: Constraint{4, flags("wm"), 1, 1, 1}}, | ||
//"setbit": Desc{Proc: AutoCommit(SetBit), Cons: Constraint{4, flags("wm"), 1, 1, 1}}, | ||
// "bitop": Desc{Proc: AutoCommit(BitOp), Cons: Constraint{-4, flags("wm"), 2, -1, 1}}, | ||
// "bitfield": Desc{Proc: AutoCommit(BitField), Cons: Constraint{-2, flags("wm"), 1, 1, 1}}, | ||
"getbit": Desc{Proc: AutoCommit(GetBit), Cons: Constraint{3, flags("r"), 1, 1, 1}}, | ||
"bitcount": Desc{Proc: AutoCommit(BitCount), Cons: Constraint{-2, flags("r"), 1, 1, 1}}, | ||
"bitpos": Desc{Proc: AutoCommit(BitPos), Cons: Constraint{-3, flags("r"), 1, 1, 1}}, | ||
//"getbit": Desc{Proc: AutoCommit(GetBit), Cons: Constraint{3, flags("r"), 1, 1, 1}}, | ||
//"bitcount": Desc{Proc: AutoCommit(BitCount), Cons: Constraint{-2, flags("r"), 1, 1, 1}}, | ||
//"bitpos": Desc{Proc: AutoCommit(BitPos), Cons: Constraint{-3, flags("r"), 1, 1, 1}}, | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 把实现的命令注释掉有什么考虑吗? |
||
// keys | ||
"type": Desc{Proc: AutoCommit(Type), Cons: Constraint{2, flags("rF"), 1, 1, 1}}, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CanExecute 这个逻辑设计的很好,在一些case 下返回的数据是否有问题。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multi
lpush key 1
xxx zz
exec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
可以理解这个是安全性方面的考虑?我又两个疑问: