-
Notifications
You must be signed in to change notification settings - Fork 850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Propagate Nexus Operation-Timeout header #6676
base: main
Are you sure you want to change the base?
Propagate Nexus Operation-Timeout header #6676
Conversation
@@ -110,6 +111,14 @@ func (ch *commandHandler) HandleScheduleCommand( | |||
} | |||
} | |||
|
|||
if err := timestamp.ValidateProtoDuration(attrs.ScheduleToCloseTimeout); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exactly related to this PR but I realized I missed this validation in #6631
// The following logic handles the operation ScheduleToCloseTimout parameter and Operation-Timeout header. | ||
// The minimum of the resolved operation timeout and the configured request timeout is used for the context timeout | ||
// when making the StartOperation request, according to the following logic: | ||
// (ScheduleToClose set, Operation-Timeout set) -> no changes, use ScheduleToClose |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// (ScheduleToClose set, Operation-Timeout set) -> no changes, use ScheduleToClose | |
// (ScheduleToClose set, Operation-Timeout set) -> Use Operation-Timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we discussed that ScheduleToClose
should always be used if it is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean just the Operation-Timeout
header, not the context deadline.
// (ScheduleToClose unset, Operation-Timeout set) -> no changes, use Operation-Timeout | ||
header := nexus.Header(args.header) | ||
opTimeout := args.scheduleToCloseTimeout | ||
opTimeoutHeader, set := header["Operation-Timeout"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use .Get()
which is case insensitive.
if header == nil { | ||
header = make(nexus.Header, 1) | ||
} | ||
header["Operation-Timeout"] = args.scheduleToCloseTimeout.String() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to export this from the Nexus SDK and use it here and for Request-Timeout
: https://github.com/nexus-rpc/sdk-go/pull/20/files#diff-98737c3c4b252474548805ea1be712acf49b5514a96e5ffdbc642c366482f282R363
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please use time left, not time since the operation was originally scheduled by the workflow. If this is the 10th attempt and an hour has already gone by, it should be reflected in the header.
header = make(nexus.Header, 1) | ||
} | ||
header["Operation-Timeout"] = args.scheduleToCloseTimeout.String() | ||
} else if set && args.scheduleToCloseTimeout == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just ignore the user header and not enforce this. It's not technically supported and doesn't affect what the caller machinery scheduleToClose timeout.
@@ -423,7 +423,18 @@ func (c *physicalTaskQueueManagerImpl) DispatchNexusTask( | |||
request *matchingservice.DispatchNexusTaskRequest, | |||
) (*matchingservice.DispatchNexusTaskResponse, error) { | |||
deadline, _ := ctx.Deadline() // If not set by user, our client will set a default. | |||
task := newInternalNexusTask(taskId, deadline, request) | |||
var opDeadline time.Time | |||
opTimeoutHeader, set := request.GetRequest().GetHeader()["Operation-Timeout"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if header is nil
here? What if the header is user provided (e.g. via curl) and is in lower case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also note that this is only applicable for StartOperation
requests but I don't think it hurts to manipulate on the other requests.
opTimeout, err := time.ParseDuration(opTimeoutHeader) | ||
if err != nil { | ||
// Operation-Timeout header is not required so don't fail request on parsing errors. | ||
c.logger.Warn(fmt.Sprintf("unable to parse Operation-Timeout header: %v", opTimeoutHeader), tag.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe log the request ID here if one is present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need another short round here but this is very close.
if header == nil { | ||
header = make(nexus.Header, 1) | ||
} | ||
header[nexus.HeaderOperationTimeout] = opTimeout.String() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized we need a Set
method on nexus.Header and a doc clarification that headers set directly should be in lower case. Not blocking this PR though.
header := nexus.Header(args.header) | ||
callTimeout := e.Config.RequestTimeout(ns.Name().String(), task.EndpointName) | ||
if args.scheduleToCloseTimeout > 0 { | ||
opTimeout := max(time.Duration(0), args.scheduleToCloseTimeout-time.Since(args.scheduledTime)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a dynamic config to control the minimum time allowed for a single operation call. The default will be 0 and instead of doing this max
here, you'd basically fail the request with a non-retryable timeout and avoid sending the RPC.
Also, for the cancel request, set the context deadline to the minimum and fail the cancelation with a non-retryable timeout and avoid sending the RPC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And add test coverage for this please.
name: "sync start", | ||
requestTimeout: time.Hour, | ||
schedToCloseTimeout: time.Hour, | ||
headers: nexus.Header{nexus.HeaderOperationTimeout: time.Microsecond.String()}, // to test this value is ignored when ScheduleToCloseTimeout is set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to test this IMHO, but you can check that we don't override this user header if they set it.
@@ -105,11 +105,13 @@ func TestProcessInvocationTask(t *testing.T) { | |||
endpointNotFound bool | |||
eventHasNoEndpointID bool | |||
operationIsCanceled bool | |||
headers nexus.Header |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
headers nexus.Header | |
header nexus.Header |
checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) { | ||
require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State()) | ||
require.NotNil(t, op.LastAttemptFailure.GetApplicationFailureInfo()) | ||
require.Regexp(t, "Post \"http://localhost:\\d+/service/operation\\?callback=http%3A%2F%2Flocalhost%2Fcallback\": context deadline exceeded", op.LastAttemptFailure.Message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... we should probably not show the HTTP endpoint in the error message. But that's out of scope for this PR.
@@ -423,7 +424,19 @@ func (c *physicalTaskQueueManagerImpl) DispatchNexusTask( | |||
request *matchingservice.DispatchNexusTaskRequest, | |||
) (*matchingservice.DispatchNexusTaskResponse, error) { | |||
deadline, _ := ctx.Deadline() // If not set by user, our client will set a default. | |||
task := newInternalNexusTask(taskId, deadline, request) | |||
var opDeadline time.Time | |||
if headers := nexus.Header(request.GetRequest().GetHeader()); headers != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: call this header
as we do in the rest of the codebase.
What changed?
Added support to propagate the new
Operation-Timeout
header. If this timeout is shorter than the context timeout, the context timeout will be reduced.Why?
How did you test it?
New tests