Stream API (SourceFunction, AbstractSourceFunction and RichSourceFunction) in Flink
In Flink's streaming API, there is a critical interface named Function, which extends Serializable:
public interface Function extends Serializable {
}
Nearly all streaming APIs, including both classes and interfaces, implement this Function interface. Consequently, these original Flink classes are serializable. In other words, when implementing custom functions or APIs, we must ensure that these classes can be serialized, or that non-serializable fields are marked as transient. For instance, consider a datasource class named CustomDataSource, which contains a non-serializable field:
private DataSource dataSource; // (java.sql.DataSource)
After running Flink, you might encounter error messages such as XXX is not serializable. The object probably contains or references non-serializable fields.
Furthermore, when initializing instance fields in Flink, it's crucial not to do so in the constructor method but in the open() method instead. This is because Flink operates on a distributed computing framework, where each operator or function is serialized and sent to multiple machines before being deserialized. The open() method is called before the execution of the operator/function, not the constructor.
How to load data periodically from database or other source?
Flink provides a built-in timer mechanism, but it is only available within ProcessFunction and KeyedProcessFunction. If you need to implement a timer in a SourceFunction, you might consider alternatives to the direct use of Thread.sleep() for periodic data loading.
- Suboptimal Example:
Using Thread.sleep() to achieve periodic loading is not recommended. This approach is generally considered inefficient and can lead to issues in performance and reliability. Discussions on StackOverflow (Post1 and Post2) highlight why Thread.sleep() is discouraged in scenarios that require precise timing and resource efficiency.
package blog;
import com.alibaba.fastjson.JSON;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import entity.StockSql;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;
import java.sql.ResultSet;
public class StockSqlReading extends RichSourceFunction<StockSql> {
private transient HikariDataSource dataSource;
private transient boolean isRunning = true;
private String sql = "SELECT stock,price,ttime FROM flink_tab WHERE ttime > CURRENT_TIMESTAMP - INTERVAL 30 SECOND";
@Override
public void open(Configuration parameters) throws Exception {
isRunning = true;
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/worddb?serverTimezone=GMT%2B8");
config.setUsername("root");
config.setPassword("0902");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
dataSource = new HikariDataSource(config);
}
private void fetchAndEmit(SourceContext<StockSql> ctx) {
try {
ResultSet next = dataSource.getConnection().createStatement().executeQuery(sql);
while (next.next() && isRunning) {
StockSql stockSql = new StockSql();
stockSql.setStock((String) next.getString("stock"));
stockSql.setPrice((Float) next.getFloat("price"));
stockSql.setTtime((String) next.getString("ttime"));
log.info("collect: " + JSON.toJSONString(stockSql));
ctx.collect(stockSql);
}
} catch (Exception e) {
// Handle any exceptions here
log.info(e.getMessage());
}
}
@Override
public void run(SourceContext<StockSql> ctx) throws Exception {
// This is left empty intentionally. All work is done in the executor service.
// Initialize ScheduledExecutorService
while (true) {
log.info("being schedule");
this.fetchAndEmit(ctx);
Thread.sleep(3000);
}
}
@Override
public void cancel() {
isRunning = false;
if (dataSource != null) {
dataSource.close();
}
}
@Override
public void close() throws Exception {
cancel();
if (!dataSource.isClosed()) {
dataSource.close();
}
}
}
- Alternative Approach:
Instead of using Thread.sleep(), employing a ScheduledThreadPoolExecutor can be a more robust method for loading data periodically within the run() method. This approach leverages the capabilities of Java's concurrent utilities to manage scheduled tasks more efficiently, avoiding the pitfalls associated with Thread.sleep().
package blog;
import com.alibaba.fastjson.JSON;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import entity.StockSql;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class StockSqlReadingV2 extends RichSourceFunction<StockSql> {
private transient HikariDataSource dataSource;
private transient boolean isRunning = true;
private String sql = "SELECT stock,price,ttime FROM flink_tab WHERE ttime > CURRENT_TIMESTAMP - INTERVAL 30 SECOND";
@Override
public void open(Configuration parameters) throws Exception {
isRunning = true;
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/worddb?serverTimezone=GMT%2B8");
config.setUsername("root");
config.setPassword("0902");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
dataSource = new HikariDataSource(config);
}
private void fetchAndEmit(SourceFunction.SourceContext<StockSql> ctx, DataSource dataSource) {
try {
ResultSet next = dataSource.getConnection().createStatement().executeQuery(sql);
while (next.next() && isRunning) {
StockSql stockSql = new StockSql();
stockSql.setStock((String) next.getString("stock"));
stockSql.setPrice((Float) next.getFloat("price"));
stockSql.setTtime((String) next.getString("ttime"));
log.info("collect: " + JSON.toJSONString(stockSql));
ctx.collect(stockSql);
}
} catch (Exception e) {
// Handle any exceptions here
log.error("Reading error {}", e.getMessage());
}
}
@Override
public void run(SourceFunction.SourceContext<StockSql> ctx) throws Exception {
// This is left empty intentionally. All work is done in the executor service.
// Initialize ScheduledExecutorService
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
scheduledExecutorService.scheduleAtFixedRate(() -> {
log.warn("being schedule");
this.fetchAndEmit(ctx, dataSource);
}, 0, 3, TimeUnit.SECONDS);
}
@Override
public void cancel() {
log.warn("being cancel");
isRunning = false;
if (dataSource != null) {
dataSource.close();
}
}
@Override
public void close() throws Exception {
log.warn("being close");
cancel();
if (!dataSource.isClosed()) {
dataSource.close();
}
}
}
Although using a ScheduledThreadPoolExecutor seems like a viable solution for periodic data loading, it introduces a critical issue: it creates a new thread to execute the loading task, causing the run() method to return immediately. This premature return leads to the close() and cancel() methods being called while data is still being read. Consequently, the DataSource is closed during the data retrieval process, disrupting the operation.
- Effective Method
A more refined approach involves ensuring that the run() method does not return prematurely. This can be accomplished by making the run() method wait until the process is terminated. One effective strategy is to use the synchronized keyword on a local object and then call wait(). This ensures that the run() method remains active, and the data loading process continues until the application is explicitly terminated.
@Override
public void run(SourceFunction.SourceContext<StockSql> ctx) throws Exception {
// This is left empty intentionally. All work is done in the executor service.
// Initialize ScheduledExecutorService
this.ctx = ctx;
executor.scheduleAtFixedRate(this::fetchAndEmit, 0, 5, TimeUnit.SECONDS);
// Wait until cancel is called
final Object waitLock = new Object();
synchronized (waitLock) {
waitLock.wait();
}
}