Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23429 Support cancellation tokens in IgniteCompute #4738

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.ignite.compute;

import org.apache.ignite.lang.CancellationToken;
import org.jetbrains.annotations.Nullable;

/**
* Job execution options.
*/
Expand All @@ -31,15 +34,20 @@ public class JobExecutionOptions {

private final int maxRetries;

@Nullable private final CancellationToken cancellationToken;

/**
* Constructor.
*
* @param priority Job execution priority.
* @param maxRetries Number of times to retry job execution in case of failure, 0 to not retry.
* @param cancellationToken Cancellation token or {@code null}.
*
*/
private JobExecutionOptions(int priority, int maxRetries) {
private JobExecutionOptions(int priority, int maxRetries, @Nullable CancellationToken cancellationToken) {
this.priority = priority;
this.maxRetries = maxRetries;
this.cancellationToken = cancellationToken;
}

public static Builder builder() {
Expand All @@ -54,24 +62,41 @@ public int maxRetries() {
return maxRetries;
}

public @Nullable CancellationToken cancellationToken() {
return cancellationToken;
}

/** JobExecutionOptions builder. */
public static class Builder {
private int priority;

private int maxRetries;

private CancellationToken cancellationToken;

public Builder priority(int priority) {
this.priority = priority;
return this;
}

/**
* Bind {@link CancellationToken} with current job execution.
*
* @param cancellationToken Cancellation token or {@code null}.
* @return {@code this} for chaining.
*/
public Builder cancellationToken(CancellationToken cancellationToken) {
this.cancellationToken = cancellationToken;
return this;
}

public Builder maxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}

public JobExecutionOptions build() {
return new JobExecutionOptions(priority, maxRetries);
return new JobExecutionOptions(priority, maxRetries, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.Objects;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;

/**
* Compute task descriptor.
Expand All @@ -37,18 +39,22 @@ public class TaskDescriptor<T, R> {

private final Class<R> reduceJobResultClass;

@Nullable private final CancellationToken cancellationToken;

private TaskDescriptor(
String taskClassName,
List<DeploymentUnit> units,
Marshaller<T, byte[]> splitJobArgumentMarshaller,
Marshaller<R, byte[]> reduceJobResultMarshaller,
Class<R> reduceJobResultClass
Class<R> reduceJobResultClass,
@Nullable CancellationToken cancellationToken
) {
this.taskClassName = taskClassName;
this.units = units;
this.splitJobArgumentMarshaller = splitJobArgumentMarshaller;
this.reduceJobResultMarshaller = reduceJobResultMarshaller;
this.reduceJobResultClass = reduceJobResultClass;
this.cancellationToken = cancellationToken;
}

/**
Expand All @@ -69,6 +75,15 @@ public List<DeploymentUnit> units() {
return units;
}

/**
* Cancellation token.
*
* @return Cancellation token if defined or {@code null}.
*/
@Nullable public CancellationToken cancellationToken() {
return cancellationToken;
}

/**
* Marshaller for split job argument.
*
Expand Down Expand Up @@ -127,6 +142,7 @@ public static class Builder<T, R> {
private Marshaller<T, byte[]> splitJobArgumentMarshaller;
private Marshaller<R, byte[]> reduceJobResultMarshaller;
private Class<R> reduceJobResultClass;
@Nullable private CancellationToken cancellationToken;

private Builder(String taskClassName) {
Objects.requireNonNull(taskClassName);
Expand Down Expand Up @@ -156,6 +172,17 @@ public Builder<T, R> units(DeploymentUnit... units) {
return this;
}

/**
* Bind {@link CancellationToken} with current job execution.
*
* @param cancellationToken Cancellation token or {@code null}.
* @return {@code this} for chaining.
*/
public Builder<T, R> cancellationToken(@Nullable CancellationToken cancellationToken) {
this.cancellationToken = cancellationToken;
return this;
}

/**
* Sets the marshaller for split job argument.
*
Expand Down Expand Up @@ -201,7 +228,8 @@ public TaskDescriptor<T, R> build() {
units == null ? List.of() : units,
splitJobArgumentMarshaller,
reduceJobResultMarshaller,
reduceJobResultClass
reduceJobResultClass,
cancellationToken
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static CompletableFuture<Void> process(
Object arg = unpackJobArgumentWithoutMarshaller(in);

JobExecution<Object> execution = compute.executeAsyncWithFailover(
candidates, deploymentUnits, jobClassName, options, arg
candidates, deploymentUnits, jobClassName, options, arg
);
sendResultAndState(execution, notificationSender);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.ViewUtils;
import org.apache.ignite.lang.CancelHandleHelper;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterNode;
Expand Down Expand Up @@ -95,7 +97,16 @@ public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> descr
Objects.requireNonNull(target);
Objects.requireNonNull(descriptor);

return new ClientJobExecution<>(ch, submit0(target, descriptor, arg), descriptor.resultMarshaller(), descriptor.resultClass());
ClientJobExecution<R> execution = new ClientJobExecution<>(ch, submit0(target, descriptor, arg), descriptor.resultMarshaller(),
descriptor.resultClass());

CancellationToken cancellationToken = descriptor.options().cancellationToken();

if (cancellationToken != null) {
CancelHandleHelper.addCancelAction(cancellationToken, execution::cancelAsync, execution.resultAsync());
}

return execution;
}

private <T, R> CompletableFuture<SubmitResult> submit0(JobTarget target, JobDescriptor<T, R> descriptor, T arg) {
Expand Down Expand Up @@ -177,6 +188,12 @@ public <T, R> Map<ClusterNode, JobExecution<R>> submitBroadcast(Set<ClusterNode>
if (map.put(node, execution) != null) {
throw new IllegalStateException("Node can't be specified more than once: " + node);
}

CancellationToken cancellationToken = descriptor.options().cancellationToken();

if (cancellationToken != null) {
CancelHandleHelper.addCancelAction(cancellationToken, execution::cancelAsync, execution.resultAsync());
}
}

return map;
Expand All @@ -186,11 +203,19 @@ public <T, R> Map<ClusterNode, JobExecution<R>> submitBroadcast(Set<ClusterNode>
public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
Objects.requireNonNull(taskDescriptor);

return new ClientTaskExecution<>(ch,
ClientTaskExecution<R> clientExecution = new ClientTaskExecution<>(ch,
doExecuteMapReduceAsync(taskDescriptor, arg),
taskDescriptor.reduceJobResultMarshaller(),
taskDescriptor.reduceJobResultClass()
);

CancellationToken cancellationToken = taskDescriptor.cancellationToken();

if (cancellationToken != null) {
CancelHandleHelper.addCancelAction(cancellationToken, clientExecution::cancelAsync, clientExecution.resultAsync());
}

return clientExecution;
}

@Override
Expand Down
Loading