Skip to content

Commit

Permalink
fix: canSyncRun should consider header processing when handling buffe…
Browse files Browse the repository at this point in the history
…red data

Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander committed Nov 22, 2024
1 parent 0d60076 commit a068c06
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 16 deletions.
33 changes: 17 additions & 16 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,14 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) (streamF
fm.canSkipEncodeTrailers = fm.canSkipMethods["EncodeTrailers"] && fm.canSkipMethods["EncodeResponse"]
fm.canSkipOnLog = fm.canSkipMethods["OnLog"]

// Similar to the skip check
// Similar to the skip check, but the canSyncRun check is more granular as
// it will consider if the request/response is fully buffered.
fm.canSyncRunDecodeHeaders = fm.canSyncRunMethods["DecodeHeaders"] && fm.canSyncRunMethods["DecodeRequest"] && fm.config.initOnce == nil
fm.canSyncRunDecodeData = fm.canSyncRunMethods["DecodeData"] && fm.canSyncRunMethods["DecodeRequest"]
fm.canSyncRunDecodeTrailers = fm.canSyncRunMethods["DecodeTrailers"] && fm.canSyncRunMethods["DecodeRequest"]
fm.canSyncRunEncodeHeaders = fm.canSyncRunMethods["EncodeHeaders"]
fm.canSyncRunEncodeData = fm.canSyncRunMethods["EncodeData"] && fm.canSyncRunMethods["EncodeResponse"]
fm.canSyncRunEncodeTrailers = fm.canSyncRunMethods["EncodeTrailers"] && fm.canSyncRunMethods["EncodeResponse"]
fm.canSyncRunDecodeData = fm.canSyncRunMethods["DecodeData"]
fm.canSyncRunDecodeTrailers = fm.canSyncRunMethods["DecodeTrailers"]
fm.canSyncRunEncodeHeaders = fm.canSyncRunMethods["EncodeHeaders"] && fm.canSyncRunMethods["EncodeResponse"]
fm.canSyncRunEncodeData = fm.canSyncRunMethods["EncodeData"]
fm.canSyncRunEncodeTrailers = fm.canSyncRunMethods["EncodeTrailers"]

return wrapFilterManager(fm)
}
Expand Down Expand Up @@ -483,13 +484,13 @@ func (m *filterManager) decodeHeaders(headers capi.RequestHeaderMap, endStream b
m.canSkipEncodeTrailers = m.canSkipEncodeTrailers && canSkipMethods["EncodeTrailers"] && canSkipMethods["EncodeResponse"]
m.canSkipOnLog = m.canSkipOnLog && canSkipMethods["OnLog"]

// Similar to the skip check
canSyncRunMethods := c.CanSyncRunMethod
m.canSyncRunDecodeData = m.canSyncRunDecodeData && canSyncRunMethods["DecodeData"] && canSyncRunMethods["DecodeRequest"]
m.canSyncRunDecodeTrailers = m.canSyncRunDecodeTrailers && canSyncRunMethods["DecodeTrailers"] && canSyncRunMethods["DecodeRequest"]
m.canSyncRunEncodeHeaders = m.canSyncRunEncodeData && canSyncRunMethods["EncodeHeaders"]
m.canSyncRunEncodeData = m.canSyncRunEncodeData && canSyncRunMethods["EncodeData"] && canSyncRunMethods["EncodeResponse"]
m.canSyncRunEncodeTrailers = m.canSyncRunEncodeTrailers && canSyncRunMethods["EncodeTrailers"] && canSyncRunMethods["EncodeResponse"]
m.canSkipDecodeHeaders = m.canSkipDecodeHeaders && canSkipMethods["DecodeHeaders"] && canSkipMethods["DecodeRequest"]
m.canSyncRunDecodeData = m.canSyncRunDecodeData && canSyncRunMethods["DecodeData"]
m.canSyncRunDecodeTrailers = m.canSyncRunDecodeTrailers && canSyncRunMethods["DecodeTrailers"]
m.canSyncRunEncodeHeaders = m.canSyncRunEncodeHeaders && canSyncRunMethods["EncodeHeaders"] && canSyncRunMethods["EncodeResponse"]
m.canSyncRunEncodeData = m.canSyncRunEncodeData && canSyncRunMethods["EncodeData"]
m.canSyncRunEncodeTrailers = m.canSyncRunEncodeTrailers && canSyncRunMethods["EncodeTrailers"]

// TODO: add field to control if merging is allowed
i := 0
Expand Down Expand Up @@ -642,7 +643,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi
return capi.Continue
}

if m.canSyncRunDecodeData {
if m.canSyncRunDecodeData && (m.decodeIdx == -1 || (m.canSyncRunDecodeHeaders && m.canSyncRunDecodeTrailers)) {
return m.decodeData(buf, endStream)
}

Expand Down Expand Up @@ -706,7 +707,7 @@ func (m *filterManager) DecodeTrailers(trailers capi.RequestTrailerMap) capi.Sta
return capi.Continue
}

if m.canSyncRunDecodeTrailers {
if m.canSyncRunDecodeTrailers && (m.decodeIdx == -1 || (m.canSyncRunDecodeHeaders && m.canSyncRunDecodeTrailers)) {
return m.decodeTrailers(trailers)
}

Expand Down Expand Up @@ -894,7 +895,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi
return capi.Continue
}

if m.canSyncRunEncodeData {
if m.canSyncRunEncodeData && (m.encodeIdx == -1 || (m.canSyncRunEncodeHeaders && m.canSyncRunEncodeTrailers)) {
return m.encodeData(buf, endStream)
}

Expand Down Expand Up @@ -944,7 +945,7 @@ func (m *filterManager) EncodeTrailers(trailers capi.ResponseTrailerMap) capi.St
return capi.Continue
}

if m.canSyncRunEncodeTrailers {
if m.canSyncRunEncodeTrailers && (m.encodeIdx == -1 || (m.canSyncRunEncodeHeaders && m.canSyncRunEncodeData)) {
return m.encodeTrailers(trailers)
}

Expand Down
59 changes: 59 additions & 0 deletions api/pkg/filtermanager/filtermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"

"github.com/agiledragon/gomonkey/v2"
capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/stretchr/testify/assert"

internalConsumer "mosn.io/htnn/api/internal/consumer"
Expand Down Expand Up @@ -512,6 +513,7 @@ func TestFiltersFromConsumer(t *testing.T) {
assert.Equal(t, 3, len(m.filters))
}
assert.Equal(t, true, m.canSyncRunEncodeHeaders)
assert.Equal(t, false, m.canSyncRunDecodeTrailers)

_, ok := hdr.Get("x-htnn-route")
assert.False(t, ok)
Expand Down Expand Up @@ -727,3 +729,60 @@ func TestSyncRunWhenThereAreMultiFilters(t *testing.T) {
assert.Equal(t, true, m.canSyncRunEncodeTrailers)
}
}

func TestSyncRunWhenProcessingBufferedData(t *testing.T) {
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.parsed = []*model.ParsedFilterConfig{
{
Name: "add_req",
Factory: addReqFactory,
ParsedConfig: addReqConf{
hdrName: "x-htnn-route",
},
SyncRunPhases: api.PhaseDecodeTrailers,
},
{
Name: "access_field_on_log",
Factory: accessFieldOnLogFactory,
SyncRunPhases: api.AllPhases,
},
}

m := unwrapFilterManager(FilterManagerFactory(config, cb))
assert.Equal(t, false, m.canSyncRunDecodeHeaders)
assert.Equal(t, true, m.canSyncRunDecodeData)
assert.Equal(t, true, m.canSyncRunDecodeTrailers)
assert.Equal(t, true, m.canSyncRunEncodeHeaders)
assert.Equal(t, true, m.canSyncRunEncodeData)
assert.Equal(t, true, m.canSyncRunEncodeTrailers)

m.decodeIdx = -1 // simulate processing request streamingly
buf := envoy.NewBufferInstance([]byte{})
res := m.DecodeData(buf, false)
assert.Equal(t, capi.Continue, res)
reqTrailer := envoy.NewRequestTrailerMap(http.Header{})
res = m.DecodeTrailers(reqTrailer)
assert.Equal(t, capi.Continue, res)

m.encodeIdx = -1 // simulate processing response streamingly
res = m.EncodeData(buf, false)
assert.Equal(t, capi.Continue, res)
rspTrailer := envoy.NewResponseTrailerMap(http.Header{})
res = m.EncodeTrailers(rspTrailer)
assert.Equal(t, capi.Continue, res)

m.decodeIdx = 0 // simulate processing buffered request
res = m.DecodeData(buf, false)
assert.Equal(t, capi.Running, res)
cb.WaitContinued()
res = m.DecodeTrailers(reqTrailer)
assert.Equal(t, capi.Running, res)
cb.WaitContinued()

m.encodeIdx = 0 // simulate processing buffered response
res = m.EncodeData(buf, false)
assert.Equal(t, capi.Continue, res)
res = m.EncodeTrailers(rspTrailer)
assert.Equal(t, capi.Continue, res)
}

0 comments on commit a068c06

Please sign in to comment.