Apache Flink

Tacnode is a high-performance, distributed database compatible with the PostgreSQL protocol. It supports standard SQL syntax and the full PostgreSQL tool ecosystem. This guide details efficient data ingestion to Tacnode via Apache Flink, with a focus on both the DataStream API and Flink SQL methods.

Environment Preparation

Version Compatibility

  • Flink version: 1.14+ (1.16+ recommended)
  • JDBC driver: PostgreSQL JDBC driver (42.5+ recommended)

Dependency Configuration

Add the PostgreSQL JDBC driver dependency to your Flink project:

<!-- Maven configuration -->
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.5.0</version>
</dependency>
Flink SQL TypeTacnode Type
BOOLEANBOOLEAN
TINYINTSMALLINT
SMALLINTSMALLINT
INTINTEGER
BIGINTBIGINT
FLOATREAL
DOUBLEDOUBLE PRECISION
DECIMAL(p,s)NUMERIC(p,s)
VARCHAR(n)VARCHAR(n)
CHAR(n)CHAR(n)
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
ARRAYARRAY
MAPJSONB
ROWJSONB

Writing with the DataStream API

Using the JDBC Sink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
// Define data stream
DataStream<User> userStream = env.addSource(...);
 
// Configure JDBC connection options
JdbcConnectionOptions jdbcOpts = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:postgresql://tacnode-host:5432/dbname")
    .withDriverName("org.postgresql.Driver")
    .withUsername("your-username")
    .withPassword("your-password")
    .build();
 
// Create JDBC Sink
userStream.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    (ps, user) -> {
        ps.setInt(1, user.getId());
        ps.setString(2, user.getName());
        ps.setInt(3, user.getAge());
    },
    jdbcOpts));
 
env.execute("Tacnode Sink Job");

Batch Write Optimization

JdbcExecutionOptions execOpts = new JdbcExecutionOptions.Builder()
    .withBatchSize(1000)              // Records per batch
    .withBatchIntervalMs(200)         // Batch interval (ms)
    .withMaxRetries(3)                // Max retry attempts
    .build();
 
userStream.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    (ps, user) -> {
        ps.setInt(1, user.getId());
        ps.setString(2, user.getName());
        ps.setInt(3, user.getAge());
    },
    execOpts,
    jdbcOpts));

Registering the Tacnode Catalog

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
// Register Tacnode Catalog
tableEnv.executeSql(
    "CREATE CATALOG Tacnode WITH (" +
    "  'type'='jdbc'," +
    "  'default-database'='dbname'," +
    "  'username'='your-username'," +
    "  'password'='your-password'," +
    "  'base-url'='jdbc:postgresql://tacnode-host:5432'" +
    ")");
 
// Set active catalog
tableEnv.useCatalog("Tacnode");
// Register Flink source table
tableEnv.executeSql(
    "CREATE TABLE source_table (" +
    "  id INT," +
    "  name STRING," +
    "  age INT" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'user_topic'," +
    "  'properties.bootstrap.servers' = 'kafka:9092'," +
    "  'format' = 'json'" +
    ")");
 
// Perform insertion
tableEnv.executeSql(
    "INSERT INTO users " +
    "SELECT id, name, age FROM source_table");

Alternatively, submit via the Flink SQL client or web UI:

CREATE TEMPORARY TABLE users_sink (
 id INTEGER,
 name STRING,
 age INTEGER
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:postgresql://tacnode-host:5432/dbname',
 'table-name' = 'users',
 'username' = 'your-username',
 'password' = 'your-password',
 'driver' = 'org.postgresql.Driver',
 'sink.buffer-flush.max-rows' = '1000',
 'sink.buffer-flush.interval' = '1s',
 'sink.max-retries' = '3'
);
 
INSERT INTO users_sink
SELECT * FROM source_table;

Common Configuration Parameters

Parameter NameDescriptionRecommended Value
sink.buffer-flush.max-rowsMax number of rows per batch1000-5000
sink.buffer-flush.intervalBatch flush interval1s
sink.max-retriesMax retry attempts3
sink.parallelismSink parallelismNumber of Tacnode nodes
connection.max-retry-timeoutConnection timeout30s

For more configuration options, see: Apache Flink JDBC SQL Connector.

Performance Tuning

SQL Job Parallelism

Set via SQL:

SET 'parallelism.default' = '8';

Batch Insert Parameters

CREATE TABLE Tacnode_sink (
  ...
) WITH (
  ...
  'sink.buffer-flush.interval' = '1s',
  'sink.buffer-flush.max-rows' = '1000',
  'sink.parallelism' = '8'
);

Monitoring and Operations

Monitoring SQL Jobs

Monitor with Flink Web UI:

  • Write throughput (records/s)
  • Operator backpressure status
  • Checkpoint state

Logging Configuration

# log4j.properties
log4j.logger.org.apache.flink.table.runtime=INFO
log4j.logger.org.postgresql=INFO

Troubleshooting

Type Mapping Issues

Symptom: Write failures because of field type mismatch.

Resolution:

  1. Explicitly specify type mapping in DDL:
CREATE TABLE Tacnode_sink (
  id INT,
  name VARCHAR(100),        -- Set length explicitly
  create_time TIMESTAMP(3)
) WITH (...);
  1. Use CAST to convert types:
INSERT INTO Tacnode_sink
SELECT id, name, CAST(create_time AS TIMESTAMP(3)) FROM source_table;

Performance Bottlenecks

Symptom: Low SQL job throughput.

Resolution:

  1. Increase parallelism
  2. Tune batch parameters
  3. Check Tacnode cluster load
  4. Optimize SQL query logic