A Flink data pipeline reads from Apache Pulsar partitioned topic. I have set the PulsarSource subscription to SubscriptionType.Exclusive. When this is changed to SubscriptionType.Shared it expects transaction policies to be enabled for the namespace. I then enabled transaction manager in the broker and started it. But still get this exception

Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:140)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:55)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    ... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:56)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.newTransaction(PulsarUnorderedPartitionSplitReader.java:165)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:91)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:115)
    ... 9 more
Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction(PulsarTransactionUtils.java:51)
    ... 12 more
Caused by: org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException$CoordinatorNotFoundException: Transaction manager is not started or not enabled
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError(TransactionMetaStoreHandler.java:419)
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleTransactionFailOp(TransactionMetaStoreHandler.java:352)
    at org.apache.pulsar.client.impl.TransactionMetaStoreHandler.handleNewTxnResponse(TransactionMetaStoreHandler.java:210)
    at org.apache.pulsar.client.impl.ClientCnx.handleNewTxnResponse(ClientCnx.java:945)
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:382)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

I do not see a way (method) to enable the transaction inside the PulsarSource of Flink.

1

The standalone Pulsar uses the standalone.conf and not the broker.conf. I wrongly have enabled the transaction coordinator in broker.conf.