spring – Got a java.lang.IllegalArgumentException when sending a Java object by the dynamic TCP/IP integration flow?

Gary Russell helped me some time ago with the following ‘DynamicTcpServer’ flow (see Building a TCP/IP server with SI’s dynamic flow registration) having now a message service injected which gets the message to send as soon as a client connects:

public class DynamicTcpServer implements TcpServer {

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ApplicationContext appContext;

    private final Map<String, IntegrationFlowRegistration> registrations = new HashMap<>();

    private final Map<String, String> clients = new ConcurrentHashMap<>();

    private final Map<String, TcpServerSpec> sockets;

    private final MessageService messenger;

    @Autowired
    public DynamicTcpServer(MessageService messenger, Map<String, TcpServerSpec> sockets) {
        this.messenger = messenger;
        this.sockets = sockets;
    }

    @Override
    public void start(String context) {
        start(context, sockets.get(context).getPort());
    }

    @Override
    public void start(String context, int port) {
        if (this.registrations.containsKey(context)) {
            /* already running */
        }
        else {
            TcpServerConnectionFactorySpec server = Tcp.netServer(port).id(context).serializer(TcpCodecs.lf());
            server.get().registerListener(msg -> false); // dummy listener so the accept thread doesn't exit
            IntegrationFlow flow = f -> f.handle(Tcp.outboundAdapter(server));
            this.registrations.put(context, flowContext.registration(flow).register());
        }
    }

    @Override
    public Set<String> running() {
        return registrations.keySet();
    }

    @Override
    public void stop(String context) {
        IntegrationFlowRegistration registration = this.registrations.remove(context);
        if (registration != null) {
            registration.destroy();
        }
    }

    @EventListener
    public void connect(TcpConnectionOpenEvent event) {
        String connectionId = event.getConnectionId();
        this.clients.put(connectionId, event.getConnectionFactoryName());
    }

    @EventListener
    public void closed(TcpConnectionCloseEvent event) {
        this.clients.remove(event.getConnectionId());
    }

    @EventListener
    public void listening(TcpConnectionServerListeningEvent event) {
    }

    @Scheduled(
            fixedDelayString = "${com.harry.potter.scheduler.fixed-delay}",
            initialDelayString = "${com.harry.potter.scheduler.initial-delay}"
    )
    public void sender() {
        this.clients.forEach((connectId, context) -> {
            IntegrationFlowRegistration register = registrations.get(context);
            if (register != null) {
                try {
                    while (true) {
                        List<ServerMessage> msgs = messenger.getMessagesToSend(sockets.get(context));
                        msgs.stream().forEach(msg -> 
                                register.getMessagingTemplate().send(
                                        MessageBuilder.withPayload(msg)
                                                .setHeader(IpHeaders.CONNECTION_ID, connectId).build()));       
                    }
                }
                catch (NoMessageToSendException nm) {
                    appContext.getBean(context, TcpNetServerConnectionFactory.class)
                            .closeConnection(connectId);    
                }
            }
        });
    }
}

The message service returns a Java object ‘com.harry.potter.entity.ServerMessage’ to be sent. So I assume I have to add some other kind of converter at ‘.serializer(TcpCodecs.lf())’ because I got an exception saying:

2022-04-17 04:00:45.729 DEBUG [] --- [pool-283-thread-1]  c.l.c.c.cas.service.DynamicTcpServer     : sender: send 1 messages to potter1
2022-04-17 04:00:45.738 DEBUG [] --- [pool-283-thread-1]  c.l.c.c.c.service.DynamicTcpServer     : closed event=TcpConnectionCloseEvent [source=TcpNetConnection:harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae], [factory=potter1, connectionId=harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae] **CLOSED**
2022-04-17 04:00:45.740 ERROR [] --- [pool-283-thread-1]  o.s.i.ip.tcp.TcpSendingMessageHandler    : Error sending message

org.springframework.messaging.MessagingException: Send Failed; nested exception is java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:118)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsServer(TcpSendingMessageHandler.java:119)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:103)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99)
    at com.harry.potter.service.DynamicTcpServer.lambda$sender$2(DynamicTcpServer.java:125)
    at com.harry.potter.service.DynamicTcpServer$$Lambda$40600/0x000000006f511b08.accept(Unknown Source)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
    at com.harry.potter.service.DynamicTcpServer.lambda$sender$3(DynamicTcpServer.java:124)
    at com.harry.potter.service.DynamicTcpServer$$Lambda$40552/0x000000003344f3b0.accept(Unknown Source)
    at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
    at com.harry.potter.service.DynamicTcpServer.sender(DynamicTcpServer.java:115)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:884)
Caused by: java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
    at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.getPayloadAsBytes(TcpMessageMapper.java:277)
    at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.fromMessage(TcpMessageMapper.java:252)
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:111)
    ... 34 common frames omitted

Which converter (serializer) do I have to use and how to plug it in my DynamicTcpServer exactly?

Leave a Comment