最近学习Netty的时候想做一个基于redis服务协议的编码解码模块,过程中顺便阅读了Redis服务序列化协议RESP,结合自己的理解对文档进行了翻译并且简单实现了RESP基于JAVA语言的解析。编写本文的使用使用的JDK版本为[8+]。
Redis客户端与Redis服务端基于一个称作RESP的协议进行通信,RESP全称为Redis Serialization Protocol,也就是Redis序列化协议。虽然RESP为Redis设计,但是它也可以应用在其他客户端-服务端(Client-Server)的软件项目中。RESP在设计的时候折中考虑了如下几点:
RESP可以序列化不同的数据类型,如整型、字符串、数组还有一种特殊的Error类型。需要执行的Redis命令会封装为类似于字符串数组的请求然后通过Redis客户端发送到Redis服务端。Redis服务端会基于特定的命令类型选择对应的一种数据类型进行回复(这一句是意译,原文是:Redis replies with a command-specific data type)。
RESP是二进制安全的(binary-safe),并且在RESP下不需要处理从一个进程传输到另一个进程的批量数据,因为它使用了前缀长度(prefixed-length,后面会分析,就是在每个数据块的前缀已经定义好数据块的个数,类似于Netty里面的定长编码解码)来传输批量数据。
注意:此处概述的协议仅仅使用在客户端-服务端通信,Redis Cluster使用不同的二进制协议在多个节点之间交换消息(也就是Redis集群中的节点之间并不使用RESP通信)。
网络层
Redis客户端通过创建一个在6379端口的TCP连接,连接到Redis服务端。
虽然RESP在底层通信协议技术上是非TCP特定的,但在Redis的上下文中,RESP仅用于TCP连接(或类似的面向流的连接,如Unix套接字)。
请求-响应模型
Redis服务端接收由不同参数组成的命令,接收到命令并将其处理之后会把回复发送回Redis客户端。这是最简单的模型,但是有两种例外的情况:
除了上述两个特例之外,Redis协议是一种简单的请求-响应协议。
RESP在Redis 1.2中引入,在Redis 2.0,RESP正式成为与Redis服务端通信的标准方案。也就是如果需要编写Redis客户端,你就必须在客户端中实现此协议。
RESP本质上是一种序列化协议,它支持的数据类型如下:单行字符串、错误消息、整型数字、定长字符串和RESP数组。
RESP在Redis中用作请求-响应协议的方式如下:
在RESP中,数据类型取决于数据报的第一个字节:
另外,在RESP中可以使用定长字符串或者数组的特殊变体来表示Null值,后面会提及。在RESP中,协议的不同部分始终以rn(CRLF)终止。
目前RESP中5种数据类型的小结如下:
数据类型 本文翻译名称 基本特征 例子 Simple String 单行字符串 第一个字节是+,最后两个字节是rn,其他字节是字符串内容 +OKrn Error 错误消息 第一个字节是-,最后两个字节是rn,其他字节是异常消息的文本内容 -ERRrn Integer 整型数字 第一个字节是:,最后两个字节是rn,其他字节是数字的文本内容 :100rn Bulk String 定长字符串 第一个字节是$,紧接着的字节是内容字符串长度rn,最后两个字节是rn,其他字节是字符串内容 $4rndogern Array RESP数组 第一个字节是*,紧接着的字节是元素个数rn,最后两个字节是rn,其他字节是各个元素的内容,每个元素可以是任意一种数据类型 *2rn:100rn$4rndogern 下面的小节是对每种数据类型的更细致的分析。
RESP简单字符串-Simple String
简单字符串的编码方式如下:
简单字符串能够保证在最小开销的前提下传输非二进制安全的字符串。例如很多Redis命令执行成功后服务端需要回复OK字符串,此时通过简单字符串编码为5字节的数据报如下:
+OKrn 复制代码
如果需要发送二进制安全的字符串,那么需要使用定长字符串。
当Redis服务端用简单字符串响应时,Redis客户端库应该向调用者返回一个字符串,该响应到调用者的字符串由+之后直到字符串内容末尾的字符组成(其实就是上面提到的第(2)部分的内容),不包括最后的CRLF字节。
RESP错误消息-Error
错误消息类型是RESP特定的数据类型。实际上,错误消息类型和简单字符串类型基本一致,只是其第一个字节为-。错误消息类型跟简单字符串类型的最大区别是:错误消息作为Redis服务端响应的时候,对于客户端而言应该感知为异常,而错误消息中的字符串内容应该感知为Redis服务端返回的错误信息。错误消息的编码方式如下:
一个简单的例子如下:
-Error messagern 复制代码
Redis服务端只有在真正发生错误或者感知错误的时候才会回复错误消息,例如尝试对错误的数据类型执行操作或者命令不存在等等。Redis客户端接收到错误消息的时候,应该触发异常(一般情况就是直接抛出异常,可以根据错误消息的内容进行异常分类)。下面是错误消息响应的一些例子:
-ERR unknown command 'foobar' -WRONGTYPE Operation against a key holding the wrong kind of value 复制代码
-之后的第一个单词到第一个空格或换行符之间的内容,代表返回的错误类型。这只是Redis使用的约定,不是RESP错误消息格式的一部分。
例如,ERR是通用错误,WRONGTYPE则是更具体的错误,表示客户端试图针对错误的数据类型执行操作。这种定义方式称为错误前缀,是一种使客户端能够理解服务器返回的错误类型的方法,而不必依赖于所给出的确切消息定义,该消息可能会随时间而变化。
客户端实现可以针对不同的错误类型返回不同种类的异常,或者可以通过将错误类型的名称作为字符串直接提供给调用方来提供捕获错误的通用方法。
但是,不应该将错误消息分类处理的功能视为至关重要的功能,因为它作用并不巨大,并且有些的客户端实现可能会简单地返回特定值去屏蔽错误消息作为通用的异常处理,例如直接返回false。
RESP整型数字-Integer
整型数字的编码方式如下:
例如:
:0rn :1000rn 复制代码
许多Redis命令返回整型数字,像INCR,LLEN和LASTSAVE命令等等。
返回的整型数字没有特殊的含义,像INCR返回的是增量的总量,而LASTSAVE是UNIX时间戳。但是Redis服务端保证返回的整型数字在带符号的64位整数范围内。
有些情况下,返回的整型数字会指代true或者false。如EXISTS或者SISMEMBER命令执行返回1代表true,0代表false。
有些情况下,返回的整型数字会指代命令是否真正产生了效果。如SADD,SREM和SETNX命令执行返回1代表命令执行生效,0代表命令执行不生效(等价于命令没有执行)。
下面的一组命令执行后都是返回整型数字:SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD。
RESP定长字符串-Bulk String
定长字符串用于表示一个最大长度为512MB的二进制安全的字符串(Bulk,本身有体积大的含义)。定长字符串的编码方式如下:
举个例子,doge使用定长字符串编码如下:
第一个字节 前缀长度 CRLF 字符串内容 CRLF 定长字符串 $ 4 rn doge rn ===> $4rndogern foobar使用定长字符串编码如下:
第一个字节 前缀长度 CRLF 字符串内容 CRLF 定长字符串 $ 6 rn foobar rn ===> $6rnfoobarrn 表示空字符串(Empty String,对应Java中的"") 的时候,使用定长字符串编码如下:
第一个字节 前缀长度 CRLF CRLF 定长字符串 $ 0 rn rn ===> $0rnrn 定长字符串也可以使用特殊的格式来表示Null值,指代值不存在。在这种特殊格式中,前缀长度为-1,并且没有数据,因此使用定长字符串对Null值进行编码如下:
第一个字节 前缀长度 CRLF 定长字符串 $ -1 rn ===> $-1rn 当Redis服务端返回定长字符串编码的Null值的时候,客户端不应该返回空字符串,而应该返回对应编程语言中的Null对象。例如Ruby中对应nil,C语言中对应NULL,Java中对应null,以此类推。
RESP数组-Array
Redis客户端使用RESP数组发送命令到Redis服务端。与此相似,某些Redis命令执行完毕后服务端需要使用RESP数组类型将元素集合返回给客户端,如返回一个元素列表的LRANGE命令。RESP数组和我们认知中的数组并不完全一致,它的编码格式如下:
一个空的RESP数组的编码如下:
*0rn 复制代码
一个包含2个定长字符串元素内容分别为foo和bar的RESP数组的编码如下:
*2rn$3rnfoorn$3rnbarrn 复制代码
通用格式就是:*<count>CRLF作为RESP数组的前缀部分,而组成RESP数组的其他数据类型的元素只是一个接一个地串联在一起。例如一个包含3个整数类型元素的RESP数组的编码如下:
*3rn:1rn:2rn:3rn 复制代码
RESP数组的元素不一定是同一种数据类型,可以包含混合类型的元素。例如下面是一个包含4个整数类型元素和1个定长字符串类型元素(一共有5个元素)的RESP数组的编码(为了看得更清楚,分多行进行编码,实际上不能这样做):
# 元素个数 *5rn # 第1个整型类型的元素 :1rn # 第2个整型类型的元素 :2rn # 第3个整型类型的元素 :3rn # 第4个整型类型的元素 :4rn # 定长字符串类型的元素 $6rn foobarrn 复制代码
Redis服务端响应报的首行*5rn定义了后面会紧跟着5个回复数据,然后每个回复数据分别作元素项,构成了用于传输的多元素定长回复(Multi Bulk Reply,感觉比较难翻译,这里的大概意思就是每个回复行都是整个回复报中的一个项)。
这里可以类比为Java中的ArrayList(泛型擦除),有点类似于下面的伪代码:
List encode = new ArrayList(); // 添加元素个数 encode.add(elementCount); encode.add(CRLF); // 添加第1个整型类型的元素 - 1 encode.add(':'); encode.add(1); encode.add(CRLF); // 添加第2个整型类型的元素 - 2 encode.add(':'); encode.add(2); encode.add(CRLF); // 添加第3个整型类型的元素 - 3 encode.add(':'); encode.add(3); encode.add(CRLF); // 添加第4个整型类型的元素 - 4 encode.add(':'); encode.add(4); encode.add(CRLF); // 添加定长字符串类型的元素 encode.add('$'); // 前缀长度 encode.add(6); // 字符串内容 encode.add("foobar"); encode.add(CRLF); 复制代码
RESP数组中也存在Null值的概念,下面称为RESP Null Array。处于历史原因,RESP数组中采用了另一种特殊的编码格式定义Null值,区别于定长字符串中的Null值字符串。例如,BLPOP命令执行超时的时候,就会返回一个RESP Null Array类型的响应。RESP Null Array的编码如下:
*-1rn 复制代码
当Redis服务端的回复是RESP Null Array类型的时候,客户端应该返回一个Null对象,而不是一个空数组或者空列表。这一点比较重要,它是区分回复是空数组(也就是命令正确执行完毕,返回结果正常)或者其他原因(如BLPOP命令的超时等)的关键。
RESP数组的元素也可以是RESP数组,下面是一个包含2个RESP数组类型的元素的RESP数组,编码如下(为了看得更清楚,分多行进行编码,实际上不能这样做):
# 元素个数 *2rn # 第1个RESP数组元素 *3rn :1rn :2rn :3rn # 第2个RESP数组元素 *2rn +Foorn -Barrn 复制代码
上面的RESP数组的包含2个RESP数组类型的元素,第1个RESP数组元素包含3个整型类型的元素,而第2个RESP数组元素包含1个简单字符串类型的元素和1个错误消息类型的元素。
RESP数组中的Null元素
RESP数组中的单个元素也有Null值的概念,下面称为Null元素。Redis服务端回复如果是RESP数组类型,并且RESP数组中存在Null元素,那么意味着元素丢失,绝对不能用空字符串替代。缺少指定键的前提下,当与GET模式选项一起使用时,SORT命令可能会发生这种情况。
下面是一个包含Null元素的RESP数组的例子(为了看得更清楚,分多行进行编码,实际上不能这样做):
*3rn $3rn foorn $-1rn $3rn barrn 复制代码
RESP数组中的第2个元素是Null元素,客户端API最终返回的内容应该是:
# Ruby ["foo",nil,"bar"] # Java ["foo",null,"bar"] 复制代码
主要包括:
其实文档中还有一节使用C语言编写高性能RESP解析器,这里不做翻译,因为掌握RESP的相关内容后,可以基于任何语言编写解析器。
将命令发送到Redis服务端
如果已经相对熟悉RESP中的序列化格式,那么编写Redis客户端类库就会变得很容易。我们可以进一步指定客户端和服务器之间的交互方式:
下面是典型的交互例子:Redis客户端发送命令LLEN mylist以获得KEY为mylist的长度,Redis服务端将以整数类型进行回复,如以下示例所示(C是客户端,S服务器),伪代码如下:
C: *2rn C: $4rn C: LLENrn C: $6rn C: mylistrn S: :48293rn 复制代码
为了简单起见,我们使用换行符来分隔协议的不同部分(这里指上面的代码分行展示),但是实际交互的时候Redis客户端在发送*2rn$4rnLLENrn$6rnmylistrn的时候是整体发送的。
批量命令与管道
Redis客户端可以使用相同的连接发送批量命令。Redis支持管道特性,因此Redis客户端可以通过一次写操作发送多个命令,而无需在发送下一个命令之前读取Redis服务端对上一个命令的回复。批量发送命令之后,所有的回复可以在最后得到(合并为一个回复)。更多相关信息可以查看Using pipelining to speedup Redis queries。
内联命令
有些场景下,我们可能只有telnet命令可以使用,在这种条件下,我们需要发送命令到Redis服务端。尽管Redis协议易于实现,但在交互式会话中并不理想,并且redis-cli有些情况下不一定可用。处于这类原因,Redis设计了一种专为人类设计的命令格式,称为内联命令(Inline Command格式。
以下是服务器/客户端使用内联命令进行聊天的示例(S代表服务端,C代表客户端):
C: PING S: +PONG 复制代码
以下是使用内联命令返回整数的另一个示例:
C: EXISTS somekey S: :0 复制代码
基本上只需在telnet会话中编写以空格分隔的参数。由于除了统一的请求协议之外没有命令会以*开头,Redis能够检测到这种情况并解析输入的命令。
因为JDK原生提供的字节缓冲区java.nio.ByteBuffer存在不能自动扩容、需要切换读写模式等等问题,这里直接引入Netty并且使用Netty提供的ByteBuf进行RESP数据类型解析。编写本文的时候(2019-10-09)Netty的最新版本为4.1.42.Final。引入依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> <version>4.1.42.Final</version> </dependency> 复制代码
定义解码器接口:
public interface RespDecoder<V>{ V decode(ByteBuf buffer); } 复制代码
定义常量:
public class RespConstants { public static final Charset ASCII = StandardCharsets.US_ASCII; public static final Charset UTF_8 = StandardCharsets.UTF_8; public static final byte DOLLAR_BYTE = '$'; public static final byte ASTERISK_BYTE = '*'; public static final byte PLUS_BYTE = '+'; public static final byte MINUS_BYTE = '-'; public static final byte COLON_BYTE = ':'; public static final String EMPTY_STRING = ""; public static final Long ZERO = 0L; public static final Long NEGATIVE_ONE = -1L; public static final byte CR = (byte) 'r'; public static final byte LF = (byte) 'n'; public static final byte[] CRLF = "rn".getBytes(ASCII); public enum ReplyType { SIMPLE_STRING, ERROR, INTEGER, BULK_STRING, RESP_ARRAY } } 复制代码
下面的章节中解析模块的实现已经忽略第一个字节的解析,因为第一个字节是决定具体的数据类型。
解析简单字符串
简单字符串类型就是单行字符串,它的解析结果对应的就是Java中的String类型。解码器实现如下:
// 解析单行字符串 public class LineStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { return CodecUtils.X.readLine(buffer); } } public enum CodecUtils { X; public int findLineEndIndex(ByteBuf buffer) { int index = buffer.forEachByte(ByteProcessor.FIND_LF); return (index > 0 && buffer.getByte(index - 1) == 'r') ? index : -1; } public String readLine(ByteBuf buffer) { int lineEndIndex = findLineEndIndex(buffer); if (lineEndIndex > -1) { int lineStartIndex = buffer.readerIndex(); // 计算字节长度 int size = lineEndIndex - lineStartIndex - 1; byte[] bytes = new byte[size]; buffer.readBytes(bytes); // 重置读游标为rn之后的第一个字节 buffer.readerIndex(lineEndIndex + 1); buffer.markReaderIndex(); return new String(bytes, RespConstants.UTF_8); } return null; } } public class RespSimpleStringDecoder extends LineStringDecoder { } 复制代码
这里抽取出一个类LineStringDecoder用于解析单行字符串,这样在解析错误消息的时候可以做一次继承即可。测试一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // +OKrn buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); String value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:OK 复制代码
解析错误消息
错误消息的本质也是单行字符串,所以其解码的实现可以和简单字符串的解码实现一致。错误消息数据类型的解码器如下:
public class RespErrorDecoder extends LineStringDecoder { } 复制代码
测试一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // -ERR unknown command 'foobar'rn buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); String value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:ERR unknown command 'foobar' 复制代码
解析整型数字
整型数字类型,本质就是需要从字节序列中还原出带符号的64bit的长整型,因为是带符号的,类型标识位:后的第一个字节需要判断是否负数字符-,因为是从左向右解析,然后每解析出一个新的位,当前的数字值要乘10。其解码器的实现如下:
public class RespIntegerDecoder implements RespDecoder<Long> { @Override public Long decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); // 没有行尾,异常 if (-1 == lineEndIndex) { return null; } long result = 0L; int lineStartIndex = buffer.readerIndex(); boolean negative = false; byte firstByte = buffer.getByte(lineStartIndex); // 负数 if (RespConstants.MINUS_BYTE == firstByte) { negative = true; } else { int digit = firstByte - '0'; result = result * 10 + digit; } for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) { byte value = buffer.getByte(i); int digit = value - '0'; result = result * 10 + digit; } if (negative) { result = -result; } // 重置读游标为rn之后的第一个字节 buffer.readerIndex(lineEndIndex + 1); return result; } } 复制代码
整型数字类型的解析相对复杂,一定要注意负数判断。测试一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // :-1000rn buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); Long value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:-1000 复制代码
解析定长字符串
定长字符串类型解析的关键是先读取类型标识符$后的第一个字节序列分块解析成64bit带符号的整数,用来确定后面需要解析的字符串内容的字节长度,然后再按照该长度读取后面的字节。其解码器实现如下:
public class RespBulkStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } // 使用RespIntegerDecoder读取长度 Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Bulk Null String if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Bulk Empty String if (RespConstants.ZERO.equals(length)) { return RespConstants.EMPTY_STRING; } // 真实字节内容的长度 int readLength = (int) length.longValue(); if (buffer.readableBytes() > readLength) { byte[] bytes = new byte[readLength]; buffer.readBytes(bytes); // 重置读游标为rn之后的第一个字节 buffer.readerIndex(buffer.readerIndex() + 2); return new String(bytes, RespConstants.UTF_8); } return null; } } 复制代码
测试一下:
public static void main(String[] args) throws Exception{ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // $6rnthrowablern buffer = ByteBufAllocator.DEFAULT.buffer(); buffer.writeBytes("$9".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); String value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:throwable 复制代码
解析RESP数组
RESP数组类型解析的关键:
参考过不少Redis协议解析框架,不少是用栈或者状态机实现,这里先简单点用递归实现,解码器代码如下:
public class RespArrayDecoder implements RespDecoder { @Override public Object decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } // 解析元素个数 Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Null Array if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Array Empty List if (RespConstants.ZERO.equals(length)) { return Lists.newArrayList(); } List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue()); // 递归 for (int i = 0; i < length; i++) { result.add(DefaultRespCodec.X.decode(buffer)); } return result; } } 复制代码
测试一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); //*2rn$3rnfoorn$3rnbarrn buffer = ByteBufAllocator.DEFAULT.buffer(); buffer.writeBytes("*2".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("$3".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("foo".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("$3".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("bar".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); List value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:[foo, bar] 复制代码
对RESP的内容和其编码解码的过程有相对深刻的认识后,就可以基于Netty编写Redis服务的编码解码模块,作为Netty入门的十分有意义的例子。本文的最后一节只演示了RESP的解码部分,编码模块和更多细节会在另一篇用Netty实现Redis客户端的文章中展示。
参考资料:
链接
希望你能读到这里,然后发现我:
附录
本文涉及的所有代码:
public class RespConstants { public static final Charset ASCII = StandardCharsets.US_ASCII; public static final Charset UTF_8 = StandardCharsets.UTF_8; public static final byte DOLLAR_BYTE = '$'; public static final byte ASTERISK_BYTE = '*'; public static final byte PLUS_BYTE = '+'; public static final byte MINUS_BYTE = '-'; public static final byte COLON_BYTE = ':'; public static final String EMPTY_STRING = ""; public static final Long ZERO = 0L; public static final Long NEGATIVE_ONE = -1L; public static final byte CR = (byte) 'r'; public static final byte LF = (byte) 'n'; public static final byte[] CRLF = "rn".getBytes(ASCII); public enum ReplyType { SIMPLE_STRING, ERROR, INTEGER, BULK_STRING, RESP_ARRAY } } public enum CodecUtils { X; public int findLineEndIndex(ByteBuf buffer) { int index = buffer.forEachByte(ByteProcessor.FIND_LF); return (index > 0 && buffer.getByte(index - 1) == 'r') ? index : -1; } public String readLine(ByteBuf buffer) { int lineEndIndex = findLineEndIndex(buffer); if (lineEndIndex > -1) { int lineStartIndex = buffer.readerIndex(); // 计算字节长度 int size = lineEndIndex - lineStartIndex - 1; byte[] bytes = new byte[size]; buffer.readBytes(bytes); // 重置读游标为rn之后的第一个字节 buffer.readerIndex(lineEndIndex + 1); buffer.markReaderIndex(); return new String(bytes, RespConstants.UTF_8); } return null; } } public interface RespCodec { RespCodec X = DefaultRespCodec.X; <IN, OUT> OUT decode(ByteBuf buffer); <IN, OUT> ByteBuf encode(IN in); } public enum DefaultRespCodec implements RespCodec { X; static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap(); private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder(); static { DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder()); DECODERS.put(ReplyType.ERROR, new RespErrorDecoder()); DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder()); DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder()); DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder()); } @SuppressWarnings("unchecked") @Override public <IN, OUT> OUT decode(ByteBuf buffer) { return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer); } private ReplyType determineReplyType(ByteBuf buffer) { byte firstByte = buffer.readByte(); ReplyType replyType; switch (firstByte) { case RespConstants.PLUS_BYTE: replyType = ReplyType.SIMPLE_STRING; break; case RespConstants.MINUS_BYTE: replyType = ReplyType.ERROR; break; case RespConstants.COLON_BYTE: replyType = ReplyType.INTEGER; break; case RespConstants.DOLLAR_BYTE: replyType = ReplyType.BULK_STRING; break; case RespConstants.ASTERISK_BYTE: replyType = ReplyType.RESP_ARRAY; break; default: { throw new IllegalArgumentException("first byte:" + firstByte); } } return replyType; } @Override public <IN, OUT> ByteBuf encode(IN in) { // TODO throw new UnsupportedOperationException("encode"); } } public interface RespDecoder<V> { V decode(ByteBuf buffer); } public class DefaultRespDecoder implements RespDecoder { @Override public Object decode(ByteBuf buffer) { throw new IllegalStateException("decoder"); } } public class LineStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { return CodecUtils.X.readLine(buffer); } } public class RespSimpleStringDecoder extends LineStringDecoder { } public class RespErrorDecoder extends LineStringDecoder { } public class RespIntegerDecoder implements RespDecoder<Long> { @Override public Long decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); // 没有行尾,异常 if (-1 == lineEndIndex) { return null; } long result = 0L; int lineStartIndex = buffer.readerIndex(); boolean negative = false; byte firstByte = buffer.getByte(lineStartIndex); // 负数 if (RespConstants.MINUS_BYTE == firstByte) { negative = true; } else { int digit = firstByte - '0'; result = result * 10 + digit; } for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) { byte value = buffer.getByte(i); int digit = value - '0'; result = result * 10 + digit; } if (negative) { result = -result; } // 重置读游标为rn之后的第一个字节 buffer.readerIndex(lineEndIndex + 1); return result; } } public class RespBulkStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Bulk Null String if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Bulk Empty String if (RespConstants.ZERO.equals(length)) { return RespConstants.EMPTY_STRING; } // 真实字节内容的长度 int readLength = (int) length.longValue(); if (buffer.readableBytes() > readLength) { byte[] bytes = new byte[readLength]; buffer.readBytes(bytes); // 重置读游标为rn之后的第一个字节 buffer.readerIndex(buffer.readerIndex() + 2); return new String(bytes, RespConstants.UTF_8); } return null; } } public class RespArrayDecoder implements RespDecoder { @Override public Object decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } // 解析元素个数 Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Null Array if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Array Empty List if (RespConstants.ZERO.equals(length)) { return Lists.newArrayList(); } List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue()); // 递归 for (int i = 0; i < length; i++) { result.add(DefaultRespCodec.X.decode(buffer)); } return result; } }
作者:Throwable
链接:https://juejin.im/post/5d9dec2b6fb9a04e0a37edd5