Netty client not receiving full response

I wrote a simple Netty client that connects to a Server designed using QuickFix/J library. Now the problem is whenever the Server sends Strings at a very high speed the client sometimes not receiving the full strings. That means generally, Server sends a string starting with “8=FIX.4.2 | 9=123 | ….| 10=21 |”
Every string starts with ‘8=’ and ends with tag ’10=’. Now at high speed the client is showing up the strings are cut to a particular size and the remaining string is sent in the next retrieval of the Buffer.

Below is the output at Netty client:

Incoming From Exchange >> 8=FIX.4.29=24735=834=5449=orderSimulator52=20170809-07:38:30.24056=orderGateway1=orderGateway6=1250511=12695039514=017=505320=131=1250532=137=838=139=440=241=11616711044=1250554=155=GC60=20170809-07:38:30.240150=4151=0200=201708207=CME10=255

Incoming From Exchange >> 8=FIX.4.29=24735=834=5549=orderSimulator52=20170809-07:38:30.26056=orderGateway1=orderGateway6=1250511=10926793114=017=505420=131=1250532=137=538=139=440=241=18595658744=1250554=155=GC60=20170809-07:38:30.260150=4151=0200=201708207=CME10=030

Incoming From Exchange >> 8=FIX.4.29=24735=834=5649=orderSimulator52=20170809-07:38:30.26256=orderGateway1=orderGateway6=1250511=10479037514=017=505520=131=1250532=137=238=139=440=241=16819718944=1250554=155=GC60=20170809-07:38:30.262150=4151=0200=201708207=CME10=0278=FIX.4.29=24735

Incoming From Exchange >> =834=5749=orderSimulator52=20170809-07:38:30.26256=orderGateway1=orderGateway6=1250511=17021962114=017=505620=131=1250532=137=738=139=440=241=15642709944=1250554=155=GC60=20170809-07:38:30.262150=4151=0200=201708207=CME10=0208=FIX.4.29=24735=834=5849=orderS

Incoming From Exchange >> imulator52=20170809-07:38:30.26256=orderGateway1=orderGateway6=1250511=14044819714=017=505720=131=1250532=137=438=139=440=241=16279265744=1250554=155=GC60=20170809-07:38:30.262150=4151=0200=201708207=CME10=030

Incoming From Exchange >> 8=FIX.4.29=24835=834=5949=orderSimulator52=20170809-07:38:30.27756=orderGateway1=orderGateway6=1250511=14108324114=017=505820=131=1250532=137=1138=139=440=241=15954718144=1250554=155=GC60=20170809-07:38:30.277150=4151=0200=201708207=CME10=073

Incoming From Exchange >> 8=FIX.4.29=24835=834=6049=orderSimulator52=20170809-07:38:30.27756=orderGateway1=orderGateway6=1250511=12836588814=017=505920=131=1250532=137=1238=139=440=241=14005997644=1250554=155=GC60=20170809-07:38:30.277150=4151=0200=201708207=CME10=092

Incoming From Exchange >> 8=FIX.4.29=24635=834=6149=orderSimulator52=20170809-07:38:30.27756=orderGateway1=orderGateway6=1250511=9376296914=017=506020=131=1250532=137=638=139=440=241=11452227544=1250554=155=GC60=20170809-07:38:30.277150=4151=0200=201708207=CME10=236

You can clearly see that first 2 responses from the server are in correct format that is starting with “8=” and ended exactly with “10=”. But in the third response, next string got appended to the current string and is cut. In the fourth response I am getting the remaining part of the string. How can I eliminate this? The strings coming from server can be of any size.

I have also referred and implemented this process. But I am getting the following errors every time. Is there anyway in eliminating this. As I said the strings come in different size. I wonder if I can get retrieve specific strings from Buffer , like strings starting from “8=” and ending with “10=” from the buffer directly?

    Aug 09, 2017 7:23:28 PM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: PooledUnsafeDirectByteBuf(ridx: 0, widx: 256, cap: 496).slice(0, 512)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: PooledUnsafeDirectByteBuf(ridx: 0, widx: 256, cap: 496).slice(0, 512)
    at io.netty.buffer.AbstractUnpooledSlicedByteBuf.checkSliceOutOfBounds(AbstractUnpooledSlicedByteBuf.java:492)
    at io.netty.buffer.PooledSlicedByteBuf.newInstance(PooledSlicedByteBuf.java:44)
    at io.netty.buffer.PooledByteBuf.retainedSlice(PooledByteBuf.java:150)
    at io.netty.buffer.AbstractByteBuf.readRetainedSlice(AbstractByteBuf.java:843)
    at io.netty.buffer.WrappedByteBuf.readRetainedSlice(WrappedByteBuf.java:626)
    at io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
    at FixedLengthFrameDecoder.decode(FixedLengthFrameDecoder.java:79)
    at FixedLengthFrameDecoder.decode(FixedLengthFrameDecoder.java:60)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
    ... 16 more

Below is my client code:

public  class SendStringToExchangeClient {
    public static int port;
    public static int reconnectTime = 1;
    public int MAX_RECONNECTIONS = GetPropertiesFromFile.MAX_RECONNECTIONS;
    Channel channel;
    EventLoopGroup workGroup = new NioEventLoopGroup();
    static ChannelFuture channelFuture = null;
    String message;
    int rcvBuf, sndBuf, lowWaterMark, highWaterMark;
    public SendStringToExchangeClient(int port) throws Exception{
        SendStringToExchangeClient.port = port;
        rcvBuf = 2048;
        sndBuf =   2048;
        lowWaterMark = 1024;
        highWaterMark = 2048;
      //  channelFuture = connectLoop();
    }

    public final void connectLoop() throws InterruptedException {
    try{
            Bootstrap b = new Bootstrap();
            b.group(workGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true)
             .option(ChannelOption.SO_RCVBUF, rcvBuf * 2048)
             .option(ChannelOption.SO_SNDBUF, sndBuf * 2048)
             .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWaterMark * 2048, highWaterMark * 2048))
             .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new SendStringToExchangeClientHandler());
                }
            });

            channelFuture = b.connect(GetPropertiesFromFile.SendToExchageIP.trim(), port).sync();
            this.channel = channelFuture.channel();

        } 
            catch(Exception ex){
                System.err.println("ERROR : Exchange Not In Connection");
                Logger.error("Exchange not in connection"+ex.getMessage());
            }
         //   return channelFuture;
        }


    public void shutdown(){
        workGroup.shutdownGracefully();
    }

}

My client Handler code:

    public class SendStringToExchangeClientHandler extends ChannelInboundHandlerAdapter {
    static int count =0;

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf byteBuf = (ByteBuf) msg;
        //receive from EXCHANGE
        String message = byteBuf.toString(Charset.defaultCharset());
        System.out.println("Incoming From Exchange >> "+message);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

}