Skip to content

Commit 7341b6e

Browse files
authored
[FLINK-36693][state/forst] Implement checkpoint/restore for ForStSyncKeyedStateBackend (apache#25643)
1 parent f415348 commit 7341b6e

17 files changed

+780
-303
lines changed

flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ void testKeyGroupedInternalPriorityQueue(boolean addAll) throws Exception {
409409
}
410410
}
411411

412+
@TestTemplate
412413
void testValueStateWorkWithTtl() throws Exception {
413414
TestAsyncFrameworkExceptionHandler testExceptionHandler =
414415
new TestAsyncFrameworkExceptionHandler();

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.flink.runtime.state.KeyedStateHandle;
4040
import org.apache.flink.runtime.state.PriorityComparable;
4141
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
42-
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
4342
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
4443
import org.apache.flink.runtime.state.SnapshotResult;
4544
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
@@ -157,7 +156,7 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> {
157156
* retrieve the column family that is used for a state and also for sanity checks when
158157
* restoring.
159158
*/
160-
private final LinkedHashMap<String, ForStKvStateInfo> kvStateInformation;
159+
private final LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation;
161160

162161
/** Lock guarding the {@code managedStateExecutors} and {@code disposed}. */
163162
private final Object lock = new Object();
@@ -181,7 +180,7 @@ public ForStKeyedStateBackend(
181180
Supplier<DataOutputSerializer> valueSerializerView,
182181
Supplier<DataInputDeserializer> valueDeserializerView,
183182
RocksDB db,
184-
LinkedHashMap<String, ForStKvStateInfo> kvStateInformation,
183+
LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
185184
Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
186185
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
187186
ColumnFamilyHandle defaultColumnFamilyHandle,
@@ -328,11 +327,12 @@ public <N, S extends InternalKeyedState, SV> S createStateInternal(
328327
StateDescriptor<SV> stateDesc, TypeSerializer<N> namespaceSerializer)
329328
throws Exception {
330329

331-
ForStKvStateInfo oldStateInfo = kvStateInformation.get(stateDesc.getStateId());
330+
ForStOperationUtils.ForStKvStateInfo oldStateInfo =
331+
kvStateInformation.get(stateDesc.getStateId());
332332

333333
TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
334334

335-
ForStKvStateInfo newStateInfo;
335+
ForStOperationUtils.ForStKvStateInfo newStateInfo;
336336
RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo;
337337
if (oldStateInfo != null) {
338338
@SuppressWarnings("unchecked")
@@ -346,7 +346,9 @@ public <N, S extends InternalKeyedState, SV> S createStateInternal(
346346
stateSerializer,
347347
namespaceSerializer);
348348

349-
newStateInfo = new ForStKvStateInfo(oldStateInfo.columnFamilyHandle, newMetaInfo);
349+
newStateInfo =
350+
new ForStOperationUtils.ForStKvStateInfo(
351+
oldStateInfo.columnFamilyHandle, newMetaInfo);
350352
kvStateInformation.put(stateDesc.getStateId(), newStateInfo);
351353
} else {
352354
newMetaInfo =
@@ -358,12 +360,13 @@ public <N, S extends InternalKeyedState, SV> S createStateInternal(
358360
StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform());
359361

360362
newStateInfo =
361-
ForStOperationUtils.createAsyncStateInfo(
363+
ForStOperationUtils.createStateInfo(
362364
newMetaInfo,
363365
db,
364366
columnFamilyOptionsFactory,
365367
ttlCompactFiltersManager,
366-
optionsContainer.getWriteBufferManagerCapacity());
368+
optionsContainer.getWriteBufferManagerCapacity(),
369+
null);
367370
ForStOperationUtils.registerKvStateInformation(
368371
this.kvStateInformation,
369372
this.nativeMetricMonitor,
@@ -558,21 +561,4 @@ KeyGroupedInternalPriorityQueue<T> create(
558561
stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
559562
}
560563
}
561-
562-
/** ForSt specific information about the k/v states. */
563-
public static class ForStKvStateInfo implements AutoCloseable {
564-
public final ColumnFamilyHandle columnFamilyHandle;
565-
public final RegisteredStateMetaInfoBase metaInfo;
566-
567-
public ForStKvStateInfo(
568-
ColumnFamilyHandle columnFamilyHandle, RegisteredStateMetaInfoBase metaInfo) {
569-
this.columnFamilyHandle = columnFamilyHandle;
570-
this.metaInfo = metaInfo;
571-
}
572-
573-
@Override
574-
public void close() throws Exception {
575-
this.columnFamilyHandle.close();
576-
}
577-
}
578564
}

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.flink.state.forst.restore.ForStRestoreOperation;
4848
import org.apache.flink.state.forst.restore.ForStRestoreResult;
4949
import org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy;
50+
import org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy;
5051
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
5152
import org.apache.flink.state.forst.sync.ForStPriorityQueueConfig;
5253
import org.apache.flink.util.CollectionUtil;
@@ -171,7 +172,7 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
171172

172173
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
173174

174-
LinkedHashMap<String, ForStKeyedStateBackend.ForStKvStateInfo> kvStateInformation =
175+
LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation =
175176
new LinkedHashMap<>();
176177
LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates =
177178
new LinkedHashMap<>();
@@ -293,7 +294,7 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
293294
}
294295

295296
private ForStRestoreOperation getForStRestoreOperation(
296-
LinkedHashMap<String, ForStKeyedStateBackend.ForStKvStateInfo> kvStateInformation,
297+
LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
297298
LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) {
298299
// Currently, ForStDB does not support mixing local-dir and remote-dir, and ForStDB will
299300
// concatenates the dfs directory with the local directory as working dir when using flink
@@ -363,9 +364,7 @@ private ForStRestoreOperation getForStRestoreOperation(
363364
@Nonnull RocksDB db,
364365
@Nonnull ResourceGuard forstResourceGuard,
365366
@Nonnull TypeSerializer<K> keySerializer,
366-
@Nonnull
367-
LinkedHashMap<String, ForStKeyedStateBackend.ForStKvStateInfo>
368-
kvStateInformation,
367+
@Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
369368
@Nonnull KeyGroupRange keyGroupRange,
370369
@Nonnegative int keyGroupPrefixBytes,
371370
@Nonnull UUID backendUID,
@@ -375,31 +374,36 @@ private ForStRestoreOperation getForStRestoreOperation(
375374
long lastCompletedCheckpointId)
376375
throws IOException {
377376

378-
ForStSnapshotStrategyBase<K, ?> snapshotStrategy;
379-
380377
ForStFlinkFileSystem forStFs = optionsContainer.getFileSystem();
381378
ForStStateDataTransfer stateTransfer =
382379
new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs);
383380

384381
if (enableIncrementalCheckpointing) {
385-
snapshotStrategy =
386-
new ForStIncrementalSnapshotStrategy<>(
387-
db,
388-
forstResourceGuard,
389-
optionsContainer,
390-
keySerializer,
391-
kvStateInformation,
392-
keyGroupRange,
393-
keyGroupPrefixBytes,
394-
backendUID,
395-
uploadedStateHandles,
396-
stateTransfer,
397-
lastCompletedCheckpointId);
382+
return new ForStIncrementalSnapshotStrategy<>(
383+
db,
384+
forstResourceGuard,
385+
optionsContainer,
386+
keySerializer,
387+
kvStateInformation,
388+
keyGroupRange,
389+
keyGroupPrefixBytes,
390+
backendUID,
391+
uploadedStateHandles,
392+
stateTransfer,
393+
lastCompletedCheckpointId);
398394

399395
} else {
400-
throw new UnsupportedOperationException("Not implemented yet for ForStStateBackend");
396+
return new ForStNativeFullSnapshotStrategy<>(
397+
db,
398+
forstResourceGuard,
399+
optionsContainer,
400+
keySerializer,
401+
kvStateInformation,
402+
keyGroupRange,
403+
keyGroupPrefixBytes,
404+
backendUID,
405+
stateTransfer);
401406
}
402-
return snapshotStrategy;
403407
}
404408

405409
private HeapPriorityQueueSetFactory createHeapQueueFactory() {

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java

Lines changed: 21 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
2424
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
2525
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
26-
import org.apache.flink.state.forst.ForStKeyedStateBackend.ForStKvStateInfo;
2726
import org.apache.flink.state.forst.sync.ForStIteratorWrapper;
28-
import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
2927
import org.apache.flink.util.FlinkRuntimeException;
3028
import org.apache.flink.util.IOUtils;
3129
import org.apache.flink.util.OperatingSystem;
@@ -209,7 +207,7 @@ public static void addColumnFamilyOptionsToCloseLater(
209207
* @param importFilesMetaData if not empty, we import the files specified in the metadata to the
210208
* column family.
211209
*/
212-
public static ForStSyncKeyedStateBackend.ForStDbKvStateInfo createStateInfo(
210+
public static ForStKvStateInfo createStateInfo(
213211
RegisteredStateMetaInfoBase metaInfoBase,
214212
RocksDB db,
215213
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@@ -227,8 +225,7 @@ public static ForStSyncKeyedStateBackend.ForStDbKvStateInfo createStateInfo(
227225

228226
try {
229227
ColumnFamilyHandle columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db);
230-
return new ForStSyncKeyedStateBackend.ForStDbKvStateInfo(
231-
columnFamilyHandle, metaInfoBase);
228+
return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase);
232229
} catch (Exception ex) {
233230
IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
234231
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex);
@@ -241,7 +238,7 @@ public static ForStSyncKeyedStateBackend.ForStDbKvStateInfo createStateInfo(
241238
* @param cancelStreamRegistryForRestore {@link ICloseableRegistry#close closing} it interrupts
242239
* KV state creation
243240
*/
244-
public static ForStSyncKeyedStateBackend.ForStDbKvStateInfo createStateInfo(
241+
public static ForStKvStateInfo createStateInfo(
245242
RegisteredStateMetaInfoBase metaInfoBase,
246243
RocksDB db,
247244
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@@ -279,7 +276,7 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
279276

280277
if (ttlCompactFiltersManager != null) {
281278
if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) {
282-
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtlV2(
279+
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(
283280
metaInfoBase, options);
284281
} else {
285282
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtlV2(
@@ -340,19 +337,6 @@ static boolean sanityCheckArenaBlockSize(
340337
}
341338
}
342339

343-
public static void registerKvStateInformation(
344-
Map<String, ForStSyncKeyedStateBackend.ForStDbKvStateInfo> kvStateInformation,
345-
ForStNativeMetricMonitor nativeMetricMonitor,
346-
String columnFamilyName,
347-
ForStSyncKeyedStateBackend.ForStDbKvStateInfo registeredColumn) {
348-
349-
kvStateInformation.put(columnFamilyName, registeredColumn);
350-
if (nativeMetricMonitor != null) {
351-
nativeMetricMonitor.registerColumnFamily(
352-
columnFamilyName, registeredColumn.columnFamilyHandle);
353-
}
354-
}
355-
356340
public static void registerKvStateInformation(
357341
Map<String, ForStKvStateInfo> kvStateInformation,
358342
ForStNativeMetricMonitor nativeMetricMonitor,
@@ -370,7 +354,6 @@ public static ForStKvStateInfo createStateInfo(
370354
RegisteredStateMetaInfoBase metaInfoBase,
371355
RocksDB db,
372356
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
373-
374357
ColumnFamilyDescriptor columnFamilyDescriptor =
375358
createColumnFamilyDescriptor(metaInfoBase.getName(), columnFamilyOptionsFactory);
376359

@@ -385,28 +368,6 @@ public static ForStKvStateInfo createStateInfo(
385368
return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase);
386369
}
387370

388-
public static ForStKvStateInfo createAsyncStateInfo(
389-
RegisteredStateMetaInfoBase metaInfoBase,
390-
RocksDB db,
391-
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
392-
@Nullable ForStDBTtlCompactFiltersManager ttlCompactFiltersManager,
393-
@Nullable Long writeBufferManagerCapacity) {
394-
395-
ColumnFamilyDescriptor columnFamilyDescriptor =
396-
createColumnFamilyDescriptor(
397-
metaInfoBase,
398-
columnFamilyOptionsFactory,
399-
ttlCompactFiltersManager,
400-
writeBufferManagerCapacity);
401-
try {
402-
ColumnFamilyHandle columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db);
403-
return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase);
404-
} catch (Exception ex) {
405-
IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
406-
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex);
407-
}
408-
}
409-
410371
private static void throwExceptionIfPathLengthExceededOnWindows(String path, Exception cause)
411372
throws IOException {
412373
// max directory path length on Windows is 247.
@@ -422,4 +383,21 @@ private static void throwExceptionIfPathLengthExceededOnWindows(String path, Exc
422383
cause);
423384
}
424385
}
386+
387+
/** ForSt specific information about the k/v states. */
388+
public static class ForStKvStateInfo implements AutoCloseable {
389+
public final ColumnFamilyHandle columnFamilyHandle;
390+
public final RegisteredStateMetaInfoBase metaInfo;
391+
392+
public ForStKvStateInfo(
393+
ColumnFamilyHandle columnFamilyHandle, RegisteredStateMetaInfoBase metaInfo) {
394+
this.columnFamilyHandle = columnFamilyHandle;
395+
this.metaInfo = metaInfo;
396+
}
397+
398+
@Override
399+
public void close() throws Exception {
400+
this.columnFamilyHandle.close();
401+
}
402+
}
425403
}

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,11 @@ private ForStOptionsFactory configureOptionsFactory(
520520
return optionsFactory;
521521
}
522522

523+
/** Both ForStSyncKeyedStateBackend and ForStKeyedStateBackend support no claim mode. */
524+
public boolean supportsNoClaimRestoreMode() {
525+
return true;
526+
}
527+
523528
// ------------------------------------------------------------------------
524529
// Parameters
525530
// ------------------------------------------------------------------------

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.metrics.MetricGroup;
2323
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
2424
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
25-
import org.apache.flink.state.forst.ForStKeyedStateBackend.ForStKvStateInfo;
2625
import org.apache.flink.state.forst.ForStNativeMetricMonitor;
2726
import org.apache.flink.state.forst.ForStNativeMetricOptions;
2827
import org.apache.flink.state.forst.ForStOperationUtils;
@@ -49,7 +48,7 @@ class ForStHandle implements AutoCloseable {
4948

5049
private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
5150
private final DBOptions dbOptions;
52-
private final Map<String, ForStKvStateInfo> kvStateInformation;
51+
private final Map<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation;
5352
private final String dbPath;
5453
private List<ColumnFamilyHandle> columnFamilyHandles;
5554
private List<ColumnFamilyDescriptor> columnFamilyDescriptors;
@@ -61,7 +60,7 @@ class ForStHandle implements AutoCloseable {
6160
@Nullable private ForStNativeMetricMonitor nativeMetricMonitor;
6261

6362
protected ForStHandle(
64-
Map<String, ForStKvStateInfo> kvStateInformation,
63+
Map<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation,
6564
Path instanceRocksDBPath,
6665
DBOptions dbOptions,
6766
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@@ -114,10 +113,10 @@ private void loadDb() throws IOException {
114113
: null;
115114
}
116115

117-
ForStKvStateInfo getOrRegisterStateColumnFamilyHandle(
116+
ForStOperationUtils.ForStKvStateInfo getOrRegisterStateColumnFamilyHandle(
118117
ColumnFamilyHandle columnFamilyHandle, StateMetaInfoSnapshot stateMetaInfoSnapshot) {
119118

120-
ForStKvStateInfo registeredStateMetaInfoEntry =
119+
ForStOperationUtils.ForStKvStateInfo registeredStateMetaInfoEntry =
121120
kvStateInformation.get(stateMetaInfoSnapshot.getName());
122121

123122
if (null == registeredStateMetaInfoEntry) {
@@ -131,7 +130,7 @@ ForStKvStateInfo getOrRegisterStateColumnFamilyHandle(
131130
stateMetaInfo, db, columnFamilyOptionsFactory);
132131
} else {
133132
registeredStateMetaInfoEntry =
134-
new ForStKvStateInfo(columnFamilyHandle, stateMetaInfo);
133+
new ForStOperationUtils.ForStKvStateInfo(columnFamilyHandle, stateMetaInfo);
135134
}
136135

137136
ForStOperationUtils.registerKvStateInformation(

0 commit comments

Comments
 (0)