Gary Russell helped me some time ago with the following ‘DynamicTcpServer’ flow (see Building a TCP/IP server with SI’s dynamic flow registration) having now a message service injected which gets the message to send as soon as a client connects:
public class DynamicTcpServer implements TcpServer {
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ApplicationContext appContext;
private final Map<String, IntegrationFlowRegistration> registrations = new HashMap<>();
private final Map<String, String> clients = new ConcurrentHashMap<>();
private final Map<String, TcpServerSpec> sockets;
private final MessageService messenger;
@Autowired
public DynamicTcpServer(MessageService messenger, Map<String, TcpServerSpec> sockets) {
this.messenger = messenger;
this.sockets = sockets;
}
@Override
public void start(String context) {
start(context, sockets.get(context).getPort());
}
@Override
public void start(String context, int port) {
if (this.registrations.containsKey(context)) {
/* already running */
}
else {
TcpServerConnectionFactorySpec server = Tcp.netServer(port).id(context).serializer(TcpCodecs.lf());
server.get().registerListener(msg -> false); // dummy listener so the accept thread doesn't exit
IntegrationFlow flow = f -> f.handle(Tcp.outboundAdapter(server));
this.registrations.put(context, flowContext.registration(flow).register());
}
}
@Override
public Set<String> running() {
return registrations.keySet();
}
@Override
public void stop(String context) {
IntegrationFlowRegistration registration = this.registrations.remove(context);
if (registration != null) {
registration.destroy();
}
}
@EventListener
public void connect(TcpConnectionOpenEvent event) {
String connectionId = event.getConnectionId();
this.clients.put(connectionId, event.getConnectionFactoryName());
}
@EventListener
public void closed(TcpConnectionCloseEvent event) {
this.clients.remove(event.getConnectionId());
}
@EventListener
public void listening(TcpConnectionServerListeningEvent event) {
}
@Scheduled(
fixedDelayString = "${com.harry.potter.scheduler.fixed-delay}",
initialDelayString = "${com.harry.potter.scheduler.initial-delay}"
)
public void sender() {
this.clients.forEach((connectId, context) -> {
IntegrationFlowRegistration register = registrations.get(context);
if (register != null) {
try {
while (true) {
List<ServerMessage> msgs = messenger.getMessagesToSend(sockets.get(context));
msgs.stream().forEach(msg ->
register.getMessagingTemplate().send(
MessageBuilder.withPayload(msg)
.setHeader(IpHeaders.CONNECTION_ID, connectId).build()));
}
}
catch (NoMessageToSendException nm) {
appContext.getBean(context, TcpNetServerConnectionFactory.class)
.closeConnection(connectId);
}
}
});
}
}
The message service returns a Java object ‘com.harry.potter.entity.ServerMessage’ to be sent. So I assume I have to add some other kind of converter at ‘.serializer(TcpCodecs.lf())’ because I got an exception saying:
2022-04-17 04:00:45.729 DEBUG [] --- [pool-283-thread-1] c.l.c.c.cas.service.DynamicTcpServer : sender: send 1 messages to potter1
2022-04-17 04:00:45.738 DEBUG [] --- [pool-283-thread-1] c.l.c.c.c.service.DynamicTcpServer : closed event=TcpConnectionCloseEvent [source=TcpNetConnection:harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae], [factory=potter1, connectionId=harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae] **CLOSED**
2022-04-17 04:00:45.740 ERROR [] --- [pool-283-thread-1] o.s.i.ip.tcp.TcpSendingMessageHandler : Error sending message
org.springframework.messaging.MessagingException: Send Failed; nested exception is java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:118)
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsServer(TcpSendingMessageHandler.java:119)
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:103)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99)
at com.harry.potter.service.DynamicTcpServer.lambda$sender$2(DynamicTcpServer.java:125)
at com.harry.potter.service.DynamicTcpServer$$Lambda$40600/0x000000006f511b08.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at com.harry.potter.service.DynamicTcpServer.lambda$sender$3(DynamicTcpServer.java:124)
at com.harry.potter.service.DynamicTcpServer$$Lambda$40552/0x000000003344f3b0.accept(Unknown Source)
at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
at com.harry.potter.service.DynamicTcpServer.sender(DynamicTcpServer.java:115)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:884)
Caused by: java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.getPayloadAsBytes(TcpMessageMapper.java:277)
at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.fromMessage(TcpMessageMapper.java:252)
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:111)
... 34 common frames omitted
Which converter (serializer) do I have to use and how to plug it in my DynamicTcpServer exactly?