dubbo系列之网络通信编码解码-consumer响应结果编码

dubbo系列之网络通信编码解码-consumer响应结果编码

简要

在收到provider返回的编码数据后,consumer要进行解码,这时候可能会出现粘包和拆包的问题。

源码解析

NettyCodecAdapter.InternalDecoder类

messageReceived()

分两个部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//首先判断当前decoder对象的buffer中是否有可以读取的消息,若有,则合并。并将对象赋值给message局部变量
if (buffer.readable()) {
if (buffer instanceof DynamicChannelBuffer) {
buffer.writeBytes(input.toByteBuffer());
message = buffer;//message获取当前channel的inbound消息
} else {
int size = buffer.readableBytes() + input.readableBytes();
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
size > bufferSize ? size : bufferSize);
message.writeBytes(buffer, buffer.readableBytes());
message.writeBytes(input.toByteBuffer());
}
} else {
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
input.toByteBuffer());
}

NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
Object msg;
int saveReaderIndex;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
try {
// decode object.
do {
//保存当前message的读索引,用于后期消息回滚
saveReaderIndex = message.readerIndex();
try {
//DubboCountCodec的decode每次只会解析出一个完整的dubbo协议栈
msg = codec.decode(channel, message);//debug
} catch (IOException e) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw e;
}
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
if (saveReaderIndex == message.readerIndex()) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw new IOException("Decode without read data.");
}
if (msg != null) {
Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
}
}
} while (message.readable());
} finally {
if (message.readable()) {
message.discardReadBytes();
buffer = message;
} else {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
}
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}

DubboCountCodec类

decode()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();//保存buffer的读索引
MultiMessage result = MultiMessage.create();
do {
Object obj = codec.decode(channel, buffer);//debug
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//数据不足
buffer.readerIndex(save);//将buffer读索引回滚
break;
} else {
result.addMessage(obj);
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();//save更新
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}

这里暂存了当前buffer的读索引,同样也是为了后面的回滚。可以看到当decode返回的是NEED_MORE_INPUT则表示当前的buffer中数据不足,不能完整解析出一个dubbo协议栈,同时将buffer的读索引回滚到之前暂存的索引并且退出循环,将结果返回。

为什么这里有一个死循环?

接着往下看

ExchangeCodec类

decode()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// check length.
//1:当前buffer的可读长度还没有消息头长,说明当前buffer连协议栈的头都不完整,所以需要继续读取inbound数据
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}

// get data length.
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);

int tt = len + HEADER_LENGTH;
//2:当前buffer包含了完整的消息头,便可以得到payload的长度,发现它的可读的长度,并没有包含整个协议栈的数据,所以也需要继续读取inbound数据
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;//表示数据不足
}

// limit input stream.
//3:如果上面两个情况都不符合,那么说明当前的buffer至少包含一个dubbo协议栈的数据,那么从当前buffer中读取一个dubbo协议栈的数据,解析出一个dubbo数据,当然这里可能读取完一个dubbo数据之后还会有剩余的数据。
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

try {
return decodeBody(channel, is, header);//debug
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}

针对包含一个以上的dubbo协议栈,当然也会解析出其中一个dubbo协议栈,但是经过ExchangeCodec解析之后,message的readIndex不在message尾部,所以messagereadable方法返回的是true。那么则会继续遍历message,读取下面的信息。最终要么message刚好整数倍包含完整的dubbo协议栈,要不ExchangeCodec返回NEED_MORE_INPUT,最后将未读完的数据缓存到buffer中,等待下次inbound事件,将buffer中的消息合并到下次的inbound消息中,种类又回到了拆包的问题上。

##总结

dubbo在处理tcp的粘包和拆包时是借助InternalDecoderbuffer缓存对象来缓存不完整的dubbo协议栈数据,等待下次inbound事件,合并进去。所以说在dubbo中解决TCP拆包和粘包的时候是通过buffer变量来解决的

上面的方法就解决了粘包了拆包的问题了

若出现粘包,就会根据它的请求长度进行截取;

若出现拆包,数据会不完整,就进入循环重新读取,直到取到完整数据。

接着往里看看

DubboCodec类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
//取第三个字节,代表是双向还是单向
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);//provider发回的序列化ID为2
// get request id.
//取第5到第12个字节代表request id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {//单向
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
Object data;
if (res.isHeartbeat()) {//是否是心跳
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (res.isEvent()) {//是否是事件
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
result.decode();//debug
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
} else {
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
} else {//双向
...
}

DecodeableRpcResult类

decode()——根据RESPONSE_NULL_VALUE RESPONSE_VALUE RESPONSE_WITH_EXCEPTION进行相应的处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);

byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE://值为null
break;
case DubboCodec.RESPONSE_VALUE://有值
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION://异常
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, but get " + obj);
setException((Throwable) obj);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
}
return this;
}