dubbo系列之网络通信编码解码-provider请求和响应编码

dubbo系列之网络通信编码解码-provider请求和响应编码

简要

这篇博客分两个部分:

1:provider 请求编码

2:provider响应结果编码

源码解析

在provider收到consumer编码后的数据时,首先要开始解码,下面先看一下provider是如何解码的。

provider请求编码

NettyCodecAdapter.InternalDecoder类

messageReceived()
1
msg = codec.decode(channel, message);

ExchangeCodec类

decode()
1
2
3
4
5
6
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();//消息体长度
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];//消息头
buffer.readBytes(header);//读消息体的16个字节
return decode(channel, buffer, readable, header);
}

取出在consumer请求编码阶段传入的消息体长度和消息头。

decode()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
// 检查前两个字节是否是指定魔数
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// check length.
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}

// get data length.
//取出消息体总长度:第十三个字节到第十六个字节
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);

int tt = len + HEADER_LENGTH;
if (readable < tt) {//对消息体进行检查
return DecodeResult.NEED_MORE_INPUT;
}

// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

try {
return decodeBody(channel, is, header);//解码,debug
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
decodeBody()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
//取第三个字节,代表是双向还是单向
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// get request id.
//取第5到第12个字节代表request id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {//单向
...
} else {//双向
// decode request.
//还原这个request
Request req = new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;//data赋值
}
req.setData(data);
} catch (Throwable t) {
...
}
return req;
}
}

data的值

data

这样,整个provider解码的过程就结束了

provider响应结果编码

所谓响应结果编码,也就是对DemoServiceImpl类中return语句的编码。

1
return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress();

NettyCodecAdapter.InternalEncoder类

encode()
1
codec.encode(channel, buffer, msg);

ExchangeCodec类

encode()
1
2
3
else if (msg instanceof Response) {//响应
encodeResponse(channel, buffer, (Response) msg);
}
encodeResponse()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
try {
Serialization serialization = getSerialization(channel);
// header.
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = serialization.getContentTypeId();//heeder第二位代表序列化ID
if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
// set response status.
byte status = res.getStatus();
header[3] = status;//第四个字节为20,代表"ok"
// set request id.
Bytes.long2bytes(res.getId(), header, 4);

int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
if (status == Response.OK) {
if (res.isHeartbeat()) {
encodeHeartbeatData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult());//debug
}
} else out.writeUTF(res.getErrorMessage());
out.flushBuffer();
bos.flush();
bos.close();

int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);//写入消息头和消息体总长度
} catch (Throwable t) {
// 发送失败信息给Consumer,否则Consumer只能等超时了
if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
try {
// FIXME 在Codec中打印出错日志?在IoHanndler的caught中统一处理?
logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);

Response r = new Response(res.getId(), res.getVersion());
r.setStatus(Response.BAD_RESPONSE);
r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
channel.send(r);

return;
} ...
}
...
}
}
encodeResponseData()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
Result result = (Result) data;

Throwable th = result.getException();
if (th == null) {
Object ret = result.getValue();
if (ret == null) {//为空
out.writeByte(RESPONSE_NULL_VALUE);//写入状态:2
} else {//有值
out.writeByte(RESPONSE_VALUE);//写入状态:1
out.writeObject(ret);//写入值
}
} else {//异常
out.writeByte(RESPONSE_WITH_EXCEPTION);//写入状态:3
out.writeObject(th);
}
}

我们可以看到,虽然provider和consumer的消息体还是有一些区别的

总结一下

dubbo的消息头是一个定长的 16个字节。
第1-2个字节:是一个魔数数字:就是一个固定的数字
第3个字节:序列号组件类型,它用于和客户端约定的序列号编码号
第四个字节:它是response的结果响应码 例如 OK=20
第5-12个字节:请求id:long型8个字节。异步变同步的全局唯一ID,用来做consumer和provider的来回通信标记。
第13-16个字节:消息体的长度,也就是消息头+请求数据的长度。