I'm developing a Flink application where checkpointing is enabled, but I have a special requirement: I need to ensure one of my keyed operators starts with empty state every time the job restarts, even when recovering from a checkpoint.
Current challenges:
For standard ProcessFunction, state can be cleared using the clean method
However, this doesn't work for keyed state since it needs proper context
I know broadcast operators can access/modify keyed state via applyToKeyedState in processBroadcastElement
What I need to know:
Are there any built-in configuration options or methods in Flink to automatically clear keyed operator state during job restart?
If no built-in solution exists, what's the recommended pattern for ensuring a keyed operator begins with fresh state in a checkpointed job?
I'm developing a Flink application where checkpointing is enabled, but I have a special requirement: I need to ensure one of my keyed operators starts with empty state every time the job restarts, even when recovering from a checkpoint.
Current challenges:
For standard ProcessFunction, state can be cleared using the clean method
However, this doesn't work for keyed state since it needs proper context
I know broadcast operators can access/modify keyed state via applyToKeyedState in processBroadcastElement
What I need to know:
Are there any built-in configuration options or methods in Flink to automatically clear keyed operator state during job restart?
If no built-in solution exists, what's the recommended pattern for ensuring a keyed operator begins with fresh state in a checkpointed job?
java
kotlin
apache-flink
Share
Improve this question
asked Jan 9 at 18:53
AndreiAndrei1
Add a comment
|
2 Answers
2
Reset to default
0
Given that requirement, why bother using Flink's managed keyed state at all? Seems like you could implement something yourself, using a data structure that sits on the heap.
I've never tried this, but there is the getRuntimeContext().getTaskInfo().getAttemptNumber() call. If that's higher than a value stored in some new currentAttempt state, your method could clear your special state and update currentAttempt state with the current attempt number.
This wouldn't clear all state, but it would make it so you don't use state following a restart.