目录
Timer on ProcessFunction and KeyedProcessFunction
Flink distinguishes between two key notions of time: processing time and event time. Processing time refers to the system time at which an event is processed, whereas event time is explicitly defined or configured based on the timestamps of the events themselves. For instance, consider a scenario where an action needs to be triggered at a specific timestamp, such as sending a notification five minutes after a particular event occurs. In such cases, Flink's timer functionality is crucial.
Register Timer on ProcessFunction
In Flink, the Timer
or onTimer()
method can only be utilized with keyed streams, such as within KeyedProcessFunction and ProcessFunction following a keyBy() operation. It is important to note that when registering a timer in KeyedProcessFunction, it is associated with a specific key and timestamp, and similarly, it is bound with the current thread in ProcessFunction. Consequently, a timer cannot be registered more than once with the same key and timestamp in KeyedProcessFunction. Likewise, the same timer cannot be registered with the same trigger timestamp. Crucially, once a timer is registered, it will only trigger once.
Below is a simple demonstration of how to register and trigger timers in ProcessFunction and KeyedProcessFunction:
env.addSource(new StockSqlReadingV3()).map(new MapFunction<StockSql, StockSql>() {
@Override
public StockSql map(StockSql stock) throws Exception {
stock.setPrice(stock.getPrice() * 7);
return stock;
}
}).process(new TimerServiceProcessFunction()).sinkTo(new PrintSink<>());
After runing, console log appears the following message.
Caused by: java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams.
Now, let's use keyBy()
before process()
and write some easy code on onTimer() method. Note that, we register timer twice with same trigger time.
import entity.StockSql;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;
@Slf4j
public class TimerServiceProcessFunction extends ProcessFunction<StockSql, StockSql> {
private transient SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
@Override
public void processElement(StockSql stockSql, ProcessFunction<StockSql, StockSql>.Context context, Collector<StockSql> collector) throws Exception {
val timerService = context.timerService();
// register twice
Date date = this.sdf.parse(stockSql.getTtime());
timerService.registerProcessingTimeTimer(date.getTime() + 3000);
timerService.registerProcessingTimeTimer(date.getTime() + 3000);
log.info("Thread: {}, register timer twice at processElement at {}", Thread.currentThread().getName(),
date.getTime() + 3000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<StockSql> out) throws Exception {
log.info("Thread: {}, timer being triggered at timestamp: {}", Thread.currentThread().getName(), timestamp);
}
}
After runing our task, we can observe console outputs, such as
[Process -> Sink: Writer (7/8)#0] INFO blog.TimerServiceProcessFunction - Thread: Process -> Sink: Writer (7/8)#0, register timer twice at processElement at 1716906987000
[Process -> Sink: Writer (4/8)#0] INFO blog.TimerServiceProcessFunction - Thread: Process -> Sink: Writer (4/8)#0, register timer twice at processElement at 1716906987000
[Process -> Sink: Writer (7/8)#0] INFO blog.TimerServiceProcessFunction - Thread: Process -> Sink: Writer (7/8)#0, timer being triggered at timestamp: 1716906987000
[Process -> Sink: Writer (4/8)#0] INFO blog.TimerServiceProcessFunction - Thread: Process -> Sink: Writer (4/8)#0, timer being triggered at timestamp: 1716906987000
How source code of Timer worked
Obviously, timer only can be triggered at most once even we register same timestamp multi times. Actually, when we
debug on source code registerProcessingTimeTimer()
, we can find out a timer is bound with grouped key, timestamp
and VoidNamespace (the space is same for all timer). The crucial register method is processingTimeTimersQueue.add(new TimerHeapInternalTimer(time, this.keyContext.getCurrentKey(), namespace))
.
Using Timer on KeyedProcessFunction
In KeyedProcessFunction, it's possible to access the processing key and use keyed state, such as MapState<UK, VK>
. This functionality allows for state to be maintained on a per-key basis, enhancing the granularity of state management. Conversely, such keyed state cannot be utilized in ProcessFunction, where state management is more generalized and not tied to specific keys. Here’s an example code demonstrating the use of keyed state in KeyedProcessFunction:
public class TimerServiceKeyedProcessFunction extends KeyedProcessFunction<String, StockSql, StockSql> {
private transient SimpleDateFormat sdf;
private transient MapState<String, Integer> mapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
this.mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("map state",
String.class,
Integer.class));
log.info("map state: {}", mapState);
}
@Override
public void processElement(StockSql stockSql, KeyedProcessFunction<String, StockSql, StockSql>.Context context,
Collector<StockSql> collector) throws Exception {
val timerService = context.timerService();
// register twice
Date date = this.sdf.parse(stockSql.getTtime());
timerService.registerProcessingTimeTimer(date.getTime() + 3000);
Integer cnt = this.mapState.get(context.getCurrentKey());
if (Objects.isNull(cnt)) {
cnt = 0;
}
log.info("Thread: {}, key = {}, register timer {} times at processElement at {}",
Thread.currentThread().getName(),
context.getCurrentKey(), cnt + 1,
date.getTime() + 3000);
this.mapState.put(context.getCurrentKey(), cnt + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<StockSql> out) throws Exception {
log.info("Thread: {}, key = {}, timer being triggered at timestamp: {}", Thread.currentThread().getName(),
ctx.getCurrentKey(),
timestamp);
}
}