Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ public final class WorkflowConstants {
public static final String CONFIG_SCHEDULE_AFTER = "schedule." + CONFIG_AFTER;
public static final String CONFIG_BATCH_SIZE = "batch-size";
public static final String CONFIG_SCHEDULE_BATCH_SIZE = "schedule." + CONFIG_BATCH_SIZE;
public static final String CONFIG_LAST_SCHEDULE_RUN = "schedule.lastRun";
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
import org.keycloak.models.sessions.infinispan.stream.ValueIdentityBiFunction;
import org.keycloak.models.workflow.WorkflowScheduleClusterEvent;
import org.keycloak.sessions.CommonClientSessionModel;
import org.keycloak.storage.UserStorageProviderClusterEvent;
import org.keycloak.storage.UserStorageProviderModel;
Expand Down Expand Up @@ -234,6 +235,9 @@
ValueIdentityBiFunction.class,
LoginFailuresLifespanUpdate.class,

// workflow package
WorkflowScheduleClusterEvent.class,

// infinispan.module.certificates
ReloadCertificateFunction.class,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ private Marshalling() {
public static final int VALUE_IDENTITY_BI_FUNCTION = 65619;
public static final int LOGIN_FAILURES_LIFESPAN_UPDATE = 65620;

/** see {@link org.keycloak.models.workflow.WorkflowScheduleClusterEvent} */
public static final int WORKFLOW_SCHEDULE_CLUSTER_EVENT = 65621;

public static void configure(GlobalConfigurationBuilder builder) {
getSchemas().forEach(builder.serialization()::addContextInitializer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.keycloak.models.workflow;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -15,6 +14,7 @@

import org.keycloak.common.util.DurationConverter;
import org.keycloak.common.util.MultivaluedHashMap;
import org.keycloak.common.util.Time;
import org.keycloak.component.ComponentFactory;
import org.keycloak.component.ComponentModel;
import org.keycloak.models.KeycloakSession;
Expand All @@ -26,7 +26,6 @@
import org.keycloak.representations.workflows.WorkflowConstants;
import org.keycloak.representations.workflows.WorkflowRepresentation;
import org.keycloak.representations.workflows.WorkflowStepRepresentation;
import org.keycloak.services.scheduled.ClusterAwareScheduledTaskRunner;
import org.keycloak.timer.TimerProvider;

import org.jboss.logging.Logger;
Expand All @@ -36,6 +35,7 @@
public class DefaultWorkflowProvider implements WorkflowProvider {

private static final Logger log = Logger.getLogger(DefaultWorkflowProvider.class);
private static final Logger scheduleLog = Logger.getLogger("org.keycloak.workflow.schedule");

private final KeycloakSession session;
private final WorkflowStateProvider stateProvider;
Expand Down Expand Up @@ -108,10 +108,11 @@ public void updateWorkflow(Workflow workflow, WorkflowRepresentation representat

// finally, update the workflow's config along with the steps' configs
workflow.updateConfig(representation.getConfig(), newSteps);
}

cancelScheduledWorkflow(workflow);
scheduleWorkflow(workflow);
cancelScheduledWorkflow(workflow);
scheduleWorkflow(workflow);
notifyScheduleChange(workflow, false);
}
}

@Override
Expand All @@ -122,6 +123,7 @@ public void removeWorkflow(Workflow workflow) {
realm.removeComponent(component);
stateProvider.removeByWorkflow(workflow.getId());
cancelScheduledWorkflow(workflow);
notifyScheduleChange(workflow, true);
}

@Override
Expand Down Expand Up @@ -472,27 +474,61 @@ private Workflow addWorkflow(Workflow workflow) {
workflow = new Workflow(session, realm.addComponentModel(model));

scheduleWorkflow(workflow);
notifyScheduleChange(workflow, false);

return workflow;
}

private void scheduleWorkflow(Workflow workflow) {
// only start the task if the workflow is enabled and has a schedule configured
String scheduled = workflow.getConfig().getFirst(WorkflowConstants.CONFIG_SCHEDULE_AFTER);

if (workflow.isEnabled() && scheduled != null) {
Duration duration = DurationConverter.parseDuration(scheduled);
initLastScheduleRun(workflow);
int intervalSecs = (int) DurationConverter.parseDuration(scheduled).toSeconds();
int initialDelaySecs = ScheduledWorkflowRunner.computeInitialDelay(workflow, intervalSecs);
TimerProvider timer = session.getProvider(TimerProvider.class);
timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ScheduledWorkflowRunner(workflow.getId(), realm.getId()), duration.toMillis()), duration.toMillis());
ScheduledWorkflowRunner runner = new ScheduledWorkflowRunner(workflow.getId(), realm.getId(), intervalSecs);
timer.scheduleTask(runner, initialDelaySecs * 1000L, intervalSecs * 1000L);
scheduleLog.debugf("Scheduled workflow '%s' with interval %d s, initial delay %d s", workflow.getName(), intervalSecs, initialDelaySecs);
}
}

private void initLastScheduleRun(Workflow workflow) {
if (ScheduledWorkflowRunner.getLastScheduleRun(workflow) <= 0) {
ComponentModel component = realm.getComponent(workflow.getId());
component.put(WorkflowConstants.CONFIG_LAST_SCHEDULE_RUN, String.valueOf(Time.currentTime()));
realm.updateComponent(component);
}
}

void cancelScheduledWorkflow(Workflow workflow) {
session.getProvider(TimerProvider.class).cancelTask(new ScheduledWorkflowRunner(workflow.getId(), realm.getId()).getTaskName());
session.getProvider(TimerProvider.class).cancelTask(ScheduledWorkflowRunner.taskName(workflow.getId()));
}

void rescheduleWorkflow(Workflow workflow) {
cancelScheduledWorkflow(workflow);
scheduleWorkflow(workflow);
}

private void notifyScheduleChange(Workflow workflow, boolean removed) {
DefaultWorkflowProviderFactory factory = (DefaultWorkflowProviderFactory) sessionFactory
.getProviderFactory(WorkflowProvider.class, DefaultWorkflowProviderFactory.ID);
WorkflowScheduleEventListener listener = factory.getScheduleEventListener();

if (listener != null) {
int intervalSecs = 0;
int lastScheduleRun = 0;

if (!removed) {
String scheduled = workflow.getConfig().getFirst(WorkflowConstants.CONFIG_SCHEDULE_AFTER);

if (workflow.isEnabled() && scheduled != null) {
intervalSecs = (int) DurationConverter.parseDuration(scheduled).toSeconds();
lastScheduleRun = ScheduledWorkflowRunner.getLastScheduleRun(workflow);
}
}

listener.notifyCluster(session, realm.getId(), workflow.getId(), removed, intervalSecs, lastScheduleRun);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class DefaultWorkflowProviderFactory implements WorkflowProviderFactory<D
private static final long DEFAULT_EXECUTOR_TASK_TIMEOUT = 5000L;

private WorkflowExecutor executor;
private WorkflowScheduleEventListener scheduleEventListener;
private boolean blocking;
private long taskTimeout;

Expand Down Expand Up @@ -51,7 +52,9 @@ public void init(Scope config) {
@Override
public void postInit(KeycloakSessionFactory factory) {
this.executor = new WorkflowExecutor(getTaskExecutor(factory), blocking, taskTimeout);
this.scheduleEventListener = new WorkflowScheduleEventListener(factory);
factory.register(this);
factory.register(scheduleEventListener);
}

@Override
Expand Down Expand Up @@ -82,6 +85,10 @@ public void onEvent(ProviderEvent event) {
}


WorkflowScheduleEventListener getScheduleEventListener() {
return scheduleEventListener;
}

@Override
public void close() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ public ScheduledStep getScheduledStep(String workflowId, String resourceId) {
return entity != null ? toScheduledStep(entity) : null;
}

private static final Duration DEFAULT_STEP_DURATION = Duration.ofMinutes(1);

@Override
public ScheduleResult scheduleStep(Workflow workflow, WorkflowStep step, String resourceId, String executionId) {
WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, executionId);
Duration duration = DurationConverter.parseDuration(step.getAfter());
if (duration == null) {
// shouldn't happen as the step duration should have been validated before
throw new IllegalArgumentException("Invalid duration (%s) found when scheduling step %s in workflow %s"
.formatted(step.getAfter(), step.getProviderId(), workflow.getName()));
duration = DEFAULT_STEP_DURATION;
}

if (entity == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import java.util.concurrent.TimeoutException;

import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.DurationConverter;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.utils.KeycloakModelUtils;
Expand All @@ -22,27 +24,50 @@ class RunWorkflowTask extends WorkflowTransactionalTask {
this.context = context;
}

private static final int EXECUTION_LOCK_TIMEOUT_SECS = 300;

@Override
public void run(KeycloakSession session) {

DefaultWorkflowExecutionContext context = new DefaultWorkflowExecutionContext(session, this.context);
WorkflowStep nextStep = runCurrentStep(context);
String executionId = context.getExecutionId();

while (nextStep != null) {
WorkflowStateProvider.ScheduleResult result = scheduleStep(session, context, nextStep);
if (result == WorkflowStateProvider.ScheduleResult.CREATED && context.getEvent() != null) {
fireWorkflowActivated(session, context);
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
ExecutionResult<Void> result = clusterProvider.executeIfNotExecuted("wf-exec::" + executionId, EXECUTION_LOCK_TIMEOUT_SECS, () -> {
if (context.getStep() != null) {
WorkflowStateProvider stateProvider = session.getProvider(WorkflowStateProvider.class);
WorkflowStateProvider.ScheduledStep scheduledStep = stateProvider.getScheduledStep(
context.getWorkflow().getId(), context.getResourceId());
if (scheduledStep == null || !scheduledStep.stepId().equals(context.getStep().getId())) {
log.debugf("Execution %s for resource %s: DB state has changed (expected step %s), skipping",
executionId, context.getResourceId(), context.getStep().getProviderId());
return null;
}
}
boolean isNextStepScheduled = DurationConverter.isPositiveDuration(nextStep.getAfter());
if (isNextStepScheduled) {
fireWorkflowStepScheduled(session, context, nextStep);
return;

WorkflowStep nextStep = runCurrentStep(context);

while (nextStep != null) {
WorkflowStateProvider.ScheduleResult scheduleResult = scheduleStep(session, context, nextStep);
if (scheduleResult == WorkflowStateProvider.ScheduleResult.CREATED && context.getEvent() != null) {
fireWorkflowActivated(session, context);
}
boolean isNextStepScheduled = DurationConverter.isPositiveDuration(nextStep.getAfter());
if (isNextStepScheduled) {
fireWorkflowStepScheduled(session, context, nextStep);
return null;
}
nextStep = runWorkflowStep(context, nextStep);
}
nextStep = runWorkflowStep(context, nextStep);
}

// no more steps to run, complete the workflow execution
completeWorkflowExecution(session, context);
completeWorkflowExecution(session, context);
return null;
});

if (!result.isExecuted()) {
log.debugf("Execution %s for resource %s already in progress on another node, skipping",
executionId, context.getResourceId());
}
}

protected WorkflowStep runCurrentStep(DefaultWorkflowExecutionContext context) {
Expand Down Expand Up @@ -99,22 +124,10 @@ private WorkflowStateProvider.ScheduleResult scheduleStep(KeycloakSession sessio
Workflow workflow = context.getWorkflow();
String resourceId = context.getResourceId();
String executionId = context.getExecutionId();
boolean isImmediateStep = !DurationConverter.isPositiveDuration(nextStep.getAfter());

// we always persist the step state in the database, even if the step doesn't have a time defined, to make sure that the workflow execution
// can be resumed from this step in case of server failure
return KeycloakModelUtils.runJobInTransactionWithResult(session.getKeycloakSessionFactory(), session.getContext(), s -> {
WorkflowStateProvider stateProvider = s.getProvider(WorkflowStateProvider.class);
// if the step is an immediate step, we set after to a short period to prevent it from being picked up by the workflow execution task
// while we run it
try {
if (isImmediateStep)
nextStep.setAfter("1m");
return stateProvider.scheduleStep(workflow, nextStep, resourceId, executionId);
} finally {
if (isImmediateStep)
nextStep.setAfter(null);
}
return stateProvider.scheduleStep(workflow, nextStep, resourceId, executionId);
}, "Workflow step scheduling task");
}

Expand Down
Loading
Loading