java – Debezium notifying method never executes

I have a maven project. I want to do a CDC project Debezium + PostgreSQL but this is not working. Im using java 1.8 + maven with this dependences. Its appear there aren’t errors but the notifying method never executes.

I just create the maven project, config the connection like below and ran the program. Slot replication was created btw by the java program automatically.

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-api</artifactId>
    <version>1.9.4.Final</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-embedded</artifactId>
        <version>1.9.4.Final</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-postgres</artifactId>
        <version>1.9.4.Final</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>2.0.0-alpha7</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>2.0.0-alpha7</version>
    <scope>runtime</scope>
</dependency>

MY APP CODE:

public class App 
{
    public static void main( String[] args ) throws IOException, InterruptedException
    {
        /**DatabaseChangeEventListener d = new DatabaseChangeEventListener();
        System.out.println("Comenzando ejecuciĆ³n");
        d.startEmbeddedEngine();*/
        File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
        File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
       Configuration config = Configuration.create()
            .with("name", "customer-postgresql-connector")
            .with("connector.class", PostgresConnector.class)
            .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
            .with("offset.flush.interval.ms", "60000")
            .with("database.hostname", "localhost")
            .with("database.port", 5432)
            .with("database.user", "postgres")
            .with("database.password", "admin")
            .with("database.dbname", "anotherdb")
            .with("database.include.list", "anotherdb")
            .with("include.schema.changes", "false")
            .with("database.allowPublicKeyRetrieval", "true")
            .with("plugin.name", "wal2json")
            .with("schemas.enable", false)
            .with("database.server.id", "10181")
            .with("database.server.name", "localhost-postgresql")
            .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
            .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
            .build();
       
       try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
                .using(config.asProperties())
                .notifying(record -> {
                    System.out.println("Record: "+record);
                })
                .build()) {
           ExecutorService executor = Executors.newSingleThreadExecutor();
           executor.execute(engine);
           engine.close();
           executor.shutdown();
            while (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                System.out.println("Waiting another 5 seconds for the embedded engine to shut down");
            }
        }
    }
}

Y console print this:

[main] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

[main] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false

[main] INFO io.debezium.embedded.EmbeddedEngine$EmbeddedConfig - EmbeddedConfig values: 
    access.control.allow.methods = 
    access.control.allow.origin = 
    admin.listeners = null
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    config.providers = []
    connector.client.config.override.policy = All
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = [http://:8083]
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 60000
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = C:UsersVASSAppDataLocalTempoffsets_4608803621683662537.dat
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic = 
    plugin.path = null
    response.http.headers.config = 
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    ssl.cipher.suites = null
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    task.shutdown.graceful.timeout.ms = 5000
    topic.creation.enable = true
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter

[main] WARN org.apache.kafka.connect.runtime.WorkerConfig - The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.
[main] WARN org.apache.kafka.connect.runtime.WorkerConfig - Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
[main] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

[main] INFO org.apache.kafka.connect.json.JsonConverterConfig - JsonConverterConfig values: 
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

[main] INFO io.debezium.embedded.EmbeddedEngine - Stopping the embedded engine
[pool-1-thread-1] INFO org.apache.kafka.connect.storage.FileOffsetBackingStore - Starting FileOffsetBackingStore with file C:UsersVASSAppDataLocalTempoffsets_4608803621683662537.dat
[pool-1-thread-1] WARN io.debezium.connector.postgresql.PostgresConnectorConfig - Logical decoder 'wal2json' is deprecated and will be removed in future versions
[pool-1-thread-1] WARN io.debezium.connector.postgresql.PostgresConnectorConfig - Configuration property 'truncate.handling.mode' is deprecated and will be removed in future versions. Please use 'skipped.operations' instead.
[pool-1-thread-1] WARN io.debezium.connector.postgresql.PostgresConnectorConfig - Configuration property 'toasted.value.placeholder' is deprecated and will be removed in future versions. Please use 'unavailable.value.placeholder' instead.
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask - Starting PostgresConnectorTask with configuration:
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    connector.class = io.debezium.connector.postgresql.PostgresConnector
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.allowPublicKeyRetrieval = true
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.history.file.filename = C:UsersVASSAppDataLocalTempdbhistory_3035630169534300344.dat
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.user = postgres
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.dbname = anotherdb
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    offset.storage = org.apache.kafka.connect.storage.FileOffsetBackingStore
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.server.id = 10181
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.server.name = localhost-postgresql
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    offset.flush.timeout.ms = 5000
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    include.schema.changes = false
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.port = 5432
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    plugin.name = wal2json
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    offset.flush.interval.ms = 60000
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    internal.key.converter = org.apache.kafka.connect.json.JsonConverter
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    offset.storage.file.filename = C:UsersVASSAppDataLocalTempoffsets_4608803621683662537.dat
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.hostname = localhost
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.password = ********
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    name = customer-postgresql-connector
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    internal.value.converter = org.apache.kafka.connect.json.JsonConverter
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    value.converter = org.apache.kafka.connect.json.JsonConverter
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    key.converter = org.apache.kafka.connect.json.JsonConverter
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.include.list = anotherdb
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    database.history = io.debezium.relational.history.FileDatabaseHistory
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask -    schemas.enable = false
[pool-2-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
[pool-1-thread-1] INFO io.debezium.connector.common.BaseSourceTask - No previous offsets found
[pool-1-thread-1] INFO io.debezium.connector.postgresql.PostgresConnectorTask - user 'postgres' connected to database 'anotherdb' on PostgreSQL 11.13, compiled by Visual C++ build 1914, 64-bit with roles:
    role 'pg_read_all_settings' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_stat_scan_tables' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_write_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'replicauser2' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: true]
    role 'replicauser' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true]
    role 'usuarioreplica4' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true]
    role 'usuarioreplica2' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true]
    role 'sysrnt' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true]
    role 'pg_monitor' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_read_server_files' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_execute_server_program' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'usuarioreplica' [superuser: false, replication: true, inherit: false, create role: false, create db: false, can log in: true]
    role 'pg_read_all_stats' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
    role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]
[pool-1-thread-1] INFO io.debezium.connector.postgresql.connection.PostgresConnection - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/9999D438}, catalogXmin=3619]
[pool-1-thread-1] INFO io.debezium.connector.postgresql.PostgresConnectorTask - No previous offset found
[pool-1-thread-1] INFO io.debezium.connector.postgresql.snapshot.InitialSnapshotter - Taking initial snapshot for new datasource
[pool-1-thread-1] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = localhost-postgresql named = change-event-source-coordinator
[pool-1-thread-1] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Metrics registered
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Context created
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.snapshot.InitialSnapshotter - Taking initial snapshot for new datasource
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource - According to the connector configuration data will be snapshotted
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 1 - Preparing
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 2 - Determining captured tables
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Adding table public.animales to the list of capture schema tables
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 3 - Locking captured tables []
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 4 - Determining snapshot offset
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource - Creating initial offset context
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource - Read xlogStart at 'LSN{0/999A2398}' from transaction '3648'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource - Read xlogStart at 'LSN{0/999A2398}' from transaction '3648'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 5 - Reading structure of captured tables
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 6 - Persisting schema history
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 7 - Snapshotting data
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshotting contents of 0 tables while still in transaction
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.source.AbstractSnapshotChangeEventSource - Snapshot - Final stage
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server="localhost-postgresql"db='anotherdb', lsn=LSN{0/999A2398}, txId=3648, timestamp=2022-07-20T22:58:38.570Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=true, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]]
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] WARN io.debezium.relational.RelationalDatabaseSchema - After applying the include/exclude list filters, no changes will be captured. Please check your configuration!
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Connected metrics set to 'true'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.pipeline.ChangeEventSourceCoordinator - Starting streaming
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Retrieved latest position from stored offset 'LSN{0/999A2398}'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/999A2398}'
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.PostgresConnection - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/9999D438}, catalogXmin=3619]
[pool-3-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = localhost-postgresql named = keep-alive
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-localhost-postgresql-keep-alive
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Searching for WAL resume position
Waiting another 5 seconds for the embedded engine to shut down
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - First LSN 'LSN{0/999A23F8}' received
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - WAL resume position 'LSN{0/999A23F8}' discovered
[pool-4-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = localhost-postgresql named = keep-alive
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-localhost-postgresql-keep-alive
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Processing messages
[debezium-postgresconnector-localhost-postgresql-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - Message with LSN 'LSN{0/999A23F8}' arrived, switching off the filtering
Waiting another 5 seconds for the embedded engine to shut down
Waiting another 5 seconds for the embedded engine to shut down
Waiting another 5 seconds for the embedded engine to shut down
Waiting another 5 seconds for the embedded engine to shut down
...

As you can see, the print method of Notifying section has been never executed:

.notifying(record -> {
                    System.out.println("Record: "+record);
                })
is not executing. Why?

Im running the app and INSERT data in the table but it is appearing nothing append. What is wrong?

Leave a Comment