Class ReplayingDecoder<S>
- Type Parameters:
S
- the state type which is usually anEnum
; useVoid
if state management is unused
- All Implemented Interfaces:
ChannelHandler
,ChannelInboundHandler
- Direct Known Subclasses:
MqttDecoder
,Socks4ClientDecoder
,Socks4ServerDecoder
,Socks5CommandRequestDecoder
,Socks5CommandResponseDecoder
,Socks5InitialRequestDecoder
,Socks5InitialResponseDecoder
,Socks5PasswordAuthRequestDecoder
,Socks5PasswordAuthResponseDecoder
,SocksAuthRequestDecoder
,SocksAuthResponseDecoder
,SocksCmdRequestDecoder
,SocksCmdResponseDecoder
,SocksInitRequestDecoder
,SocksInitResponseDecoder
,StompSubframeDecoder
,WebSocket00FrameDecoder
ByteToMessageDecoder
which enables implementation
of a non-blocking decoder in the blocking I/O paradigm.
The biggest difference between ReplayingDecoder
and
ByteToMessageDecoder
is that ReplayingDecoder
allows you to
implement the decode()
and decodeLast()
methods just like
all required bytes were received already, rather than checking the
availability of the required bytes. For example, the following
ByteToMessageDecoder
implementation:
public class IntegerHeaderFrameDecoder extendsis simplified like the following withByteToMessageDecoder
{@Override
protected void decode(ChannelHandlerContext
ctx,ByteBuf
buf, List<Object> out) throws Exception { if (buf.readableBytes() < 4) { return; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; } out.add(buf.readBytes(length)); } }
ReplayingDecoder
:
public class IntegerHeaderFrameDecoder extendsReplayingDecoder
<Void
> { protected void decode(ChannelHandlerContext
ctx,ByteBuf
buf, List<Object> out) throws Exception { out.add(buf.readBytes(buf.readInt())); } }
How does this work?
ReplayingDecoder
passes a specialized ByteBuf
implementation which throws an Error
of certain type when there's not
enough data in the buffer. In the IntegerHeaderFrameDecoder
above,
you just assumed that there will be 4 or more bytes in the buffer when
you call buf.readInt()
. If there's really 4 bytes in the buffer,
it will return the integer header as you expected. Otherwise, the
Error
will be raised and the control will be returned to
ReplayingDecoder
. If ReplayingDecoder
catches the
Error
, then it will rewind the readerIndex
of the buffer
back to the 'initial' position (i.e. the beginning of the buffer) and call
the decode(..)
method again when more data is received into the
buffer.
Please note that ReplayingDecoder
always throws the same cached
Error
instance to avoid the overhead of creating a new Error
and filling its stack trace for every throw.
Limitations
At the cost of the simplicity, ReplayingDecoder
enforces you a few
limitations:
- Some buffer operations are prohibited.
- Performance can be worse if the network is slow and the message format is complicated unlike the example above. In this case, your decoder might have to decode the same part of the message over and over again.
- You must keep in mind that
decode(..)
method can be called many times to decode a single message. For example, the following code will not work:public class MyDecoder extends
The correct implementation looks like the following, and you can also utilize the 'checkpoint' feature which is explained in detail in the next section.ReplayingDecoder
<Void
> { private final Queue<Integer> values = new LinkedList<Integer>();@Override
public void decode(..,ByteBuf
buf, List<Object> out) throws Exception { // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // This assertion will fail intermittently since values.offer() // can be called more than two times! assert values.size() == 2; out.add(values.poll() + values.poll()); } }public class MyDecoder extends
ReplayingDecoder
<Void
> { private final Queue<Integer> values = new LinkedList<Integer>();@Override
public void decode(..,ByteBuf
buf, List<Object> out) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; out.add(values.poll() + values.poll()); } }
Improving the performance
Fortunately, the performance of a complex decoder implementation can be
improved significantly with the checkpoint()
method. The
checkpoint()
method updates the 'initial' position of the buffer so
that ReplayingDecoder
rewinds the readerIndex
of the buffer
to the last position where you called the checkpoint()
method.
Calling checkpoint(T)
with an Enum
Although you can just use checkpoint()
method and manage the state
of the decoder by yourself, the easiest way to manage the state of the
decoder is to create an Enum
type which represents the current state
of the decoder and to call checkpoint(T)
method whenever the state
changes. You can have as many states as you want depending on the
complexity of the message you want to decode:
public enum MyDecoderState { READ_LENGTH, READ_CONTENT; } public class IntegerHeaderFrameDecoder extendsReplayingDecoder
<MyDecoderState> { private int length; public IntegerHeaderFrameDecoder() { // Set the initial state. super(MyDecoderState.READ_LENGTH); }@Override
protected void decode(ChannelHandlerContext
ctx,ByteBuf
buf, List<Object> out) throws Exception { switch (state()) { case READ_LENGTH: length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT); case READ_CONTENT: ByteBuf frame = buf.readBytes(length); checkpoint(MyDecoderState.READ_LENGTH); out.add(frame); break; default: throw new Error("Shouldn't reach here."); } } }
Calling checkpoint()
with no parameter
An alternative way to manage the decoder state is to manage it by yourself.
public class IntegerHeaderFrameDecoder extendsReplayingDecoder
<Void
> { private boolean readLength; private int length;@Override
protected void decode(ChannelHandlerContext
ctx,ByteBuf
buf, List<Object> out) throws Exception { if (!readLength) { length = buf.readInt(); readLength = true; checkpoint(); } if (readLength) { ByteBuf frame = buf.readBytes(length); readLength = false; checkpoint(); out.add(frame); } } }
Replacing a decoder with another decoder in a pipeline
If you are going to write a protocol multiplexer, you will probably want to
replace a ReplayingDecoder
(protocol detector) with another
ReplayingDecoder
, ByteToMessageDecoder
or MessageToMessageDecoder
(actual protocol decoder).
It is not possible to achieve this simply by calling
ChannelPipeline.replace(ChannelHandler, String, ChannelHandler)
, but
some additional steps are required:
public class FirstDecoder extendsReplayingDecoder
<Void
> {@Override
protected void decode(ChannelHandlerContext
ctx,ByteBuf
buf, List<Object> out) { ... // Decode the first message Object firstMessage = ...; // Add the second decoder ctx.pipeline().addLast("second", new SecondDecoder()); if (buf.isReadable()) { // Hand off the remaining data to the second decoder out.add(firstMessage); out.add(buf.readBytes(super.actualReadableBytes())); } else { // Nothing to hand off out.add(firstMessage); } // Remove the first decoder (me) ctx.pipeline().remove(this); }
-
Nested Class Summary
Nested classes/interfaces inherited from class io.netty.handler.codec.ByteToMessageDecoder
ByteToMessageDecoder.Cumulator
Nested classes/interfaces inherited from interface io.netty.channel.ChannelHandler
ChannelHandler.Sharable
-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate int
(package private) static final Signal
private final ReplayingDecoderByteBuf
private S
Fields inherited from class io.netty.handler.codec.ByteToMessageDecoder
COMPOSITE_CUMULATOR, cumulation, MERGE_CUMULATOR
-
Constructor Summary
ConstructorsModifierConstructorDescriptionprotected
Creates a new instance with no initial state (i.e:null
).protected
ReplayingDecoder
(S initialState) Creates a new instance with the specified initial state. -
Method Summary
Modifier and TypeMethodDescriptionprotected void
callDecode
(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) Called once data should be decoded from the givenByteBuf
.(package private) final void
channelInputClosed
(ChannelHandlerContext ctx, List<Object> out) Called when the input of the channel was closed which may be because it changed to inactive or because ofChannelInputShutdownEvent
.protected void
Stores the internal cumulative buffer's reader position.protected void
checkpoint
(S state) Stores the internal cumulative buffer's reader position and updates the current decoder state.protected S
state()
Returns the current state of this decoder.protected S
Sets the current state of this decoder.Methods inherited from class io.netty.handler.codec.ByteToMessageDecoder
actualReadableBytes, channelInactive, channelRead, channelReadComplete, decode, decodeLast, decodeRemovalReentryProtection, discardSomeReadBytes, expandCumulation, fireChannelRead, fireChannelRead, handlerRemoved, handlerRemoved0, internalBuffer, isSingleDecode, setCumulator, setDiscardAfterReads, setSingleDecode, userEventTriggered
Methods inherited from class io.netty.channel.ChannelInboundHandlerAdapter
channelActive, channelRegistered, channelUnregistered, channelWritabilityChanged, exceptionCaught
Methods inherited from class io.netty.channel.ChannelHandlerAdapter
ensureNotSharable, handlerAdded, isSharable
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface io.netty.channel.ChannelHandler
handlerAdded
-
Field Details
-
REPLAY
-
replayable
-
state
-
checkpoint
private int checkpoint
-
-
Constructor Details
-
ReplayingDecoder
protected ReplayingDecoder()Creates a new instance with no initial state (i.e:null
). -
ReplayingDecoder
Creates a new instance with the specified initial state.
-
-
Method Details
-
checkpoint
protected void checkpoint()Stores the internal cumulative buffer's reader position. -
checkpoint
Stores the internal cumulative buffer's reader position and updates the current decoder state. -
state
Returns the current state of this decoder.- Returns:
- the current state of this decoder
-
state
Sets the current state of this decoder.- Returns:
- the old state of this decoder
-
channelInputClosed
Description copied from class:ByteToMessageDecoder
Called when the input of the channel was closed which may be because it changed to inactive or because ofChannelInputShutdownEvent
.- Overrides:
channelInputClosed
in classByteToMessageDecoder
- Throws:
Exception
-
callDecode
Description copied from class:ByteToMessageDecoder
Called once data should be decoded from the givenByteBuf
. This method will callByteToMessageDecoder.decode(ChannelHandlerContext, ByteBuf, List)
as long as decoding should take place.- Overrides:
callDecode
in classByteToMessageDecoder
- Parameters:
ctx
- theChannelHandlerContext
which thisByteToMessageDecoder
belongs toin
- theByteBuf
from which to read dataout
- theList
to which decoded messages should be added
-