Apache Flink Tacnode is a high-performance, distributed database compatible with the PostgreSQL protocol. It supports standard SQL syntax and the full PostgreSQL tool ecosystem. This guide details efficient data ingestion to Tacnode via Apache Flink, with a focus on both the DataStream API and Flink SQL methods.
Flink version: 1.14+ (1.16+ recommended)
JDBC driver: PostgreSQL JDBC driver (42.5+ recommended)
Add the PostgreSQL JDBC driver dependency to your Flink project:
<!-- Maven configuration -->
< dependency >
< groupId > org.postgresql </ groupId >
< artifactId > postgresql </ artifactId >
< version > 42.5.0 </ version >
</ dependency >
Flink SQL Type Tacnode Type BOOLEAN BOOLEAN TINYINT SMALLINT SMALLINT SMALLINT INT INTEGER BIGINT BIGINT FLOAT REAL DOUBLE DOUBLE PRECISION DECIMAL(p,s) NUMERIC(p,s) VARCHAR(n) VARCHAR(n) CHAR(n) CHAR(n) DATE DATE TIME TIME TIMESTAMP TIMESTAMP ARRAY ARRAY MAP JSONB ROW JSONB
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ();
// Define data stream
DataStream < User > userStream = env . addSource (...);
// Configure JDBC connection options
JdbcConnectionOptions jdbcOpts = new JdbcConnectionOptions . JdbcConnectionOptionsBuilder ()
. withUrl ( "jdbc:postgresql://tacnode-host:5432/dbname" )
. withDriverName ( "org.postgresql.Driver" )
. withUsername ( "your-username" )
. withPassword ( "your-password" )
. build ();
// Create JDBC Sink
userStream . addSink ( JdbcSink . sink (
"INSERT INTO users (id, name, age) VALUES (?, ?, ?)" ,
( ps , user ) -> {
ps . setInt ( 1 , user . getId ());
ps . setString ( 2 , user . getName ());
ps . setInt ( 3 , user . getAge ());
},
jdbcOpts ));
env . execute ( "Tacnode Sink Job" );
JdbcExecutionOptions execOpts = new JdbcExecutionOptions . Builder ()
. withBatchSize ( 1000 ) // Records per batch
. withBatchIntervalMs ( 200 ) // Batch interval (ms)
. withMaxRetries ( 3 ) // Max retry attempts
. build ();
userStream . addSink ( JdbcSink . sink (
"INSERT INTO users (id, name, age) VALUES (?, ?, ?)" ,
( ps , user ) -> {
ps . setInt ( 1 , user . getId ());
ps . setString ( 2 , user . getName ());
ps . setInt ( 3 , user . getAge ());
},
execOpts ,
jdbcOpts ));
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ();
StreamTableEnvironment tableEnv = StreamTableEnvironment . create ( env );
// Register Tacnode Catalog
tableEnv . executeSql (
"CREATE CATALOG Tacnode WITH (" +
" 'type'='jdbc'," +
" 'default-database'='dbname'," +
" 'username'='your-username'," +
" 'password'='your-password'," +
" 'base-url'='jdbc:postgresql://tacnode-host:5432'" +
")" );
// Set active catalog
tableEnv . useCatalog ( "Tacnode" );
// Register Flink source table
tableEnv . executeSql (
"CREATE TABLE source_table (" +
" id INT," +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'user_topic'," +
" 'properties.bootstrap.servers' = 'kafka:9092'," +
" 'format' = 'json'" +
")" );
// Perform insertion
tableEnv . executeSql (
"INSERT INTO users " +
"SELECT id, name, age FROM source_table" );
Alternatively, submit via the Flink SQL client or web UI:
CREATE TEMPORARY TABLE users_sink (
id INTEGER ,
name STRING ,
age INTEGER
) WITH (
'connector' = 'jdbc' ,
'url' = 'jdbc:postgresql://tacnode-host:5432/dbname' ,
'table-name' = 'users' ,
'username' = 'your-username' ,
'password' = 'your-password' ,
'driver' = 'org.postgresql.Driver' ,
'sink.buffer-flush.max-rows' = '1000' ,
'sink.buffer-flush.interval' = '1s' ,
'sink.max-retries' = '3'
);
INSERT INTO users_sink
SELECT * FROM source_table ;
Parameter Name Description Recommended Value sink.buffer-flush.max-rows Max number of rows per batch 1000-5000 sink.buffer-flush.interval Batch flush interval 1s sink.max-retries Max retry attempts 3 sink.parallelism Sink parallelism Number of Tacnode nodes connection.max-retry-timeout Connection timeout 30s
For more configuration options, see: Apache Flink JDBC SQL Connector .
Set via SQL:
SET 'parallelism.default' = '8' ;
CREATE TABLE Tacnode_sink (
...
) WITH (
...
'sink.buffer-flush.interval' = '1s' ,
'sink.buffer-flush.max-rows' = '1000' ,
'sink.parallelism' = '8'
);
Monitor with Flink Web UI:
Write throughput (records/s)
Operator backpressure status
Checkpoint state
# log4j.properties
log4j.logger.org.apache.flink.table.runtime = INFO
log4j.logger.org.postgresql = INFO
Symptom : Write failures because of field type mismatch.
Resolution :
Explicitly specify type mapping in DDL:
CREATE TABLE Tacnode_sink (
id INT ,
name VARCHAR ( 100 ), -- Set length explicitly
create_time TIMESTAMP ( 3 )
) WITH (...);
Use CAST to convert types:
INSERT INTO Tacnode_sink
SELECT id, name , CAST (create_time AS TIMESTAMP ( 3 )) FROM source_table;
Symptom : Low SQL job throughput.
Resolution :
Increase parallelism
Tune batch parameters
Check Tacnode cluster load
Optimize SQL query logic