@@ -23,6 +23,7 @@ Licensed to the Apache Software Foundation (ASF) under one
23
23
import org .apache .flink .metrics .groups .OperatorCoordinatorMetricGroup ;
24
24
import org .apache .flink .runtime .checkpoint .CheckpointCoordinator ;
25
25
import org .apache .flink .runtime .jobgraph .OperatorID ;
26
+ import org .apache .flink .util .MdcUtils ;
26
27
import org .apache .flink .util .Preconditions ;
27
28
import org .apache .flink .util .function .ThrowingConsumer ;
28
29
import org .apache .flink .util .function .ThrowingRunnable ;
@@ -33,6 +34,7 @@ Licensed to the Apache Software Foundation (ASF) under one
33
34
import javax .annotation .Nullable ;
34
35
35
36
import java .time .Duration ;
37
+ import java .util .Map ;
36
38
import java .util .concurrent .BlockingQueue ;
37
39
import java .util .concurrent .CompletableFuture ;
38
40
import java .util .concurrent .LinkedBlockingQueue ;
@@ -58,7 +60,7 @@ private RecreateOnResetOperatorCoordinator(
58
60
throws Exception {
59
61
this .context = context ;
60
62
this .provider = provider ;
61
- this .coordinator = new DeferrableCoordinator (context .getOperatorId ());
63
+ this .coordinator = new DeferrableCoordinator (context .getOperatorId (), context . getJobID () );
62
64
this .coordinator .createNewInternalCoordinator (context , provider );
63
65
this .coordinator .processPendingCalls ();
64
66
this .closingTimeoutMs = closingTimeoutMs ;
@@ -132,7 +134,7 @@ public void resetToCheckpoint(final long checkpointId, @Nullable final byte[] ch
132
134
// After this point all the subsequent calls will be made to the new coordinator.
133
135
final DeferrableCoordinator oldCoordinator = coordinator ;
134
136
final DeferrableCoordinator newCoordinator =
135
- new DeferrableCoordinator (context .getOperatorId ());
137
+ new DeferrableCoordinator (context .getOperatorId (), context . getJobID () );
136
138
coordinator = newCoordinator ;
137
139
// Close the old coordinator asynchronously in a separate closing thread.
138
140
// The future will be completed when the old coordinator closes.
@@ -327,26 +329,30 @@ private OperatorCoordinator.Context getContext() {
327
329
private static class DeferrableCoordinator {
328
330
private final OperatorID operatorId ;
329
331
private final BlockingQueue <NamedCall > pendingCalls ;
332
+ private final Map <String , String > mdc ;
330
333
private QuiesceableContext internalQuiesceableContext ;
331
334
private OperatorCoordinator internalCoordinator ;
332
335
private boolean hasCaughtUp ;
333
336
private boolean closed ;
334
337
private volatile boolean failed ;
335
338
336
- private DeferrableCoordinator (OperatorID operatorId ) {
339
+ private DeferrableCoordinator (OperatorID operatorId , JobID jobID ) {
337
340
this .operatorId = operatorId ;
338
341
this .pendingCalls = new LinkedBlockingQueue <>();
339
342
this .hasCaughtUp = false ;
340
343
this .closed = false ;
341
344
this .failed = false ;
345
+ this .mdc = MdcUtils .asContextData (jobID );
342
346
}
343
347
344
348
synchronized <T extends Exception > void applyCall (
345
349
String name , ThrowingConsumer <OperatorCoordinator , T > call ) throws T {
346
350
synchronized (this ) {
347
351
if (hasCaughtUp ) {
348
352
// The new coordinator has caught up.
349
- call .accept (internalCoordinator );
353
+ try (MdcUtils .MdcCloseable ignored = MdcUtils .withContext (mdc )) {
354
+ call .accept (internalCoordinator );
355
+ }
350
356
} else {
351
357
pendingCalls .add (new NamedCall (name , call ));
352
358
}
0 commit comments