Redis事务的灵活应用与异步连接的优化策略

阅读: 评论:0

Redis事务的灵活应用与异步连接的优化策略

Redis事务的灵活应用与异步连接的优化策略

redis异步连接

  • 一、redis 事务命令
  • 二、lua 脚本实现原子性
    • 2.1、EVAL
    • 2.2、EVALSHA
    • 2.3、script load
    • 2.4、应用
    • 2.5、lua脚本的ACID分析
  • 三、redis 事务实现方式
  • 四、redis 发布订阅
  • 五、redis 驱动异步连接
    • 5.1、hiredis库安装
    • 5.2、redis 异步连接
    • 5.3、hridis+libevent实现
  • 总结

一、redis 事务命令

事务是指用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。

(1)MULTI, 开启事务。
事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行。
begin / start transaction。
(2)EXEC,提交事务。
commit。
(3)DISCARD,取消事务。
rollback。
(4)WATCH。
检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现(cas);
若被取消则事务返回 nil 。

二、lua 脚本实现原子性

redis 中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当某个脚本正在执行的时候,不会
有其他命令或者脚本被执行。
lua 脚本当中的命令会直接修改数据状态。

redis lua 虚拟机

lua 脚本 mysql 存储区别:MySQL存储过程不具备事务性,所以也不具备原子性。

注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令。

2.1、EVAL

# 测试使用
EVAL script numkeys key [key ...] arg [arg ...]

示例:

127.0.0.1:6379> get mark
"100"
127.0.0.1:6379> eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set',KEYS[1],2*val); return 2*val; end;return 0;" 1 mark
(integer) 200
127.0.0.1:6379> eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set',KEYS[1],2*val); return 2*val; end;return 0;" 1 not_exist
(integer) 0

2.2、EVALSHA

# 线上使用
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

2.3、script load

# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local val = KEYS[1]; return val'
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists
"b8059ba43af6ffe8bed3db65bac35d452f8115d8"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.

使用 script load可以将脚本放入到redis中,redis会返回一个sha1值,redis内部是通过字典方式存放sha1-value,然后使用evalsha执行sha1对应的lua脚本。
示例:

127.0.0.1:6379> script load  "local val=redis.call('get',KEYS[1]);if val then redis.call('set',KEYS[1],2*val); return 2*val; end;return 0;"
"e221b6cd9c6e2a664ed7f4da89f21ca8223e4c7c"
127.0.0.1:6379> evalsha e221b6cd9c6e2a664ed7f4da89f21ca8223e4c7c 1 mark
(integer) 400
127.0.0.1:6379> evalsha e221b6cd9c6e2a664ed7f4da89f21ca8223e4c7c 1 not_exist
(integer) 0

script load带来的好处之一:只需要执行一次script load,就可以多次调用evalsha复用执行。

使用这种方式的原因是真正使用过程中的lua脚本很长,是远远超过40位的字符串,使用eval不合理,而使用evalsha使数据包小一些,从而使网络带宽小一些,服务器处理压力也小一些。

2.4、应用

  1. 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load)。
  2. 项目中若需要热更新,通过redis-cli script flush;然后可以通过订阅发布功能通知所有服务器重新加载lua脚本。
  3. 若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行。

2.5、lua脚本的ACID分析

(1)原则性。lua脚本具备原则性。lua脚本是通过一个完整的数据包,一个命令发送过去的;作为一个完整的数据包执行,因为redis是单线程,只有这个数据包执行完才会执行其他的数据包,所以不会被其他连接干扰。
(2)一致性。lua脚本不具备一致性。lua脚本中存在多条语句,如果有一部分执行成功,有一条语句执行失败时,成功执行的语句是不会回滚的。此时,它不满足全部都不执行或全部都执行,也就不满足不可分割的工作单元。
(3)隔离性。lua脚本满足隔离性,因为redis是单线程的,而lua脚本又是一个完整的数据包。天然具备隔离性。
(4)持久性。redis只有在aof持久化策略的时候,并且每写入一个数据都要进行写盘操作,才满足持久性。

三、redis 事务实现方式

(1)乐观锁实现,watch+multi+exec,所以失败需要重试,增加业务逻辑的复杂度。
缺点就是写代码的时候不方便,需要通过watch来保证事务的正确性;watch过程中可能会取消事务,失败需要进行重试,所有业务逻辑比较麻烦。
(2)lua脚本实现。script load+evalsha。网络带宽小一些,服务器处理压力也小一些,并且可以多次调用evalsha复用执行。

四、redis 发布订阅

为了支持消息的多播机制,redis 引入了发布订阅模块。
消息不一定可达;分布式消息队列; stream 的方式确保一定可达。

# 订阅频道
subscribe 频道
# 订阅模式频道
psubscribe 频道
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容

示例:

subscribe news.it news.showbiz news.car
psubscribe news.*
publish new.showbiz 'hello redis'

五、redis 驱动异步连接

后端通常都是采用的reactor网络模型,redis驱动是指server端的驱动,在server程序构建一个模块,可以和redis交互数据(即server发送的协议redis能识别并处理,redis返回的数据驱动模块能够识别并开展业务逻辑)。

redis 驱动就是把redis连接融合reactor进行管理。

redis 驱动 把redis连接融合reactor进行管理 构建事件对象 hiredis 事件对象 reactor 事件对象 适配事件控制,复用项目中的reactor的事件循环 hiredis封装规则 io由hiredis自己来做 提供了事件操作的接口,我们只需要适配这些事件操作的接口 不同网络库,不同平台,对事件操作的接口不一致 读数据 分割数据 ......

异步连接处理逻辑需要提供函数来接收返回。
redis协议图:

简单字符串回复 error回复 整数回复 二进制安全字符串回复 nil不存在 批量回复 空数组 超时 Client * 参数数量 r n $ 参数字节数 r n Server + 状态描述字符串 r n - 错误类型字符串 空格 错误描述字符串 : 整数字符串 $ 回复数据字节数 r n 回复数据 -1 * 0 -1 回复数量 r n


协议实现的第一步需要知道如何界定数据包:

  1. 长度 + 二进制流
  2. 二进制流 + 特殊分隔符

5.1、hiredis库安装

git clone .git -b 6.2
cd redis/deps/hiredis
mkdir build
cd build
cmake ..
make
sudo make install

程序代码编译的时候加上 -lhiredis

5.2、redis 异步连接

同步连接方案采用阻塞 io 来实现;优点是代码书写是同步的,业务逻辑没有割裂。缺点是阻塞当前线程,直至 redis 返回结
果;通常用多个线程来实现线程池来解决效率问题。

异步连接方案采用非阻塞 io 来实现。优点是没有阻塞当前线程,redis 没有返回,依然可以往 redis 发送命令。缺点是代码
书写是异步的(回调函数),业务逻辑割裂,可以通过协程解决(openresty,skynet);配合 redis6.0 以后的 io 多线程(前
提是有大量并发请求),异步连接池,能更好解决应用层的数据访问性能。

5.3、hridis+libevent实现

libevent.h


#ifndef __HIREDIS_LIBEVENT_H__
#define __HIREDIS_LIBEVENT_H__
#include <event2/event.h>
#include "hiredis.h"
#include "async.h"#define REDIS_LIBEVENT_DELETED 0x01
#define REDIS_LIBEVENT_ENTERED 0x02typedef struct redisLibeventEvents {redisAsyncContext *context;struct event *ev;struct event_base *base;struct timeval tv;short flags;short state;
} redisLibeventEvents;static void redisLibeventDestroy(redisLibeventEvents *e) {hi_free(e);
}static void redisLibeventHandler(int fd, short event, void *arg) {((void)fd);redisLibeventEvents *e = (redisLibeventEvents*)arg;e->state |= REDIS_LIBEVENT_ENTERED;#define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {redisLibeventDestroy(e);return; }if ((event & EV_TIMEOUT) && (e->state & REDIS_LIBEVENT_DELETED) == 0) {redisAsyncHandleTimeout(e->context);CHECK_DELETED();}if ((event & EV_READ) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {redisAsyncHandleRead(e->context);CHECK_DELETED();}if ((event & EV_WRITE) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {redisAsyncHandleWrite(e->context);CHECK_DELETED();}e->state &= ~REDIS_LIBEVENT_ENTERED;#undef CHECK_DELETED
}static void redisLibeventUpdate(void *privdata, short flag, int isRemove) {redisLibeventEvents *e = (redisLibeventEvents *)privdata;const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL;if (isRemove) {if ((e->flags & flag) == 0) {return;} else {e->flags &= ~flag;}} else {if (e->flags & flag) {return;} else {e->flags |= flag;}}event_del(e->ev);event_assign(e->ev, e->base, e->context->c.fd, e->flags | EV_PERSIST,redisLibeventHandler, privdata);event_add(e->ev, tv);
}static void redisLibeventAddRead(void *privdata) {redisLibeventUpdate(privdata, EV_READ, 0);
}static void redisLibeventDelRead(void *privdata) {redisLibeventUpdate(privdata, EV_READ, 1);
}static void redisLibeventAddWrite(void *privdata) {redisLibeventUpdate(privdata, EV_WRITE, 0);
}static void redisLibeventDelWrite(void *privdata) {redisLibeventUpdate(privdata, EV_WRITE, 1);
}static void redisLibeventCleanup(void *privdata) {redisLibeventEvents *e = (redisLibeventEvents*)privdata;if (!e) {return;}event_del(e->ev);event_free(e->ev);e->ev = NULL;if (e->state & REDIS_LIBEVENT_ENTERED) {e->state |= REDIS_LIBEVENT_DELETED;} else {redisLibeventDestroy(e);}
}static void redisLibeventSetTimeout(void *privdata, struct timeval tv) {redisLibeventEvents *e = (redisLibeventEvents *)privdata;short flags = e->flags;e->flags = 0;e->tv = tv;redisLibeventUpdate(e, flags, 0);
}static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {redisContext *c = &(ac->c);redisLibeventEvents *e;/* Nothing should be attached when something is already attached */if (ac->ev.data != NULL)return REDIS_ERR;/* Create container for context and r/w events */e = (redisLibeventEvents*)hi_calloc(1, sizeof(*e));if (e == NULL)return REDIS_ERR;e->context = ac;/* Register functions to start/stop listening for events */ac->ev.addRead = redisLibeventAddRead;ac->ev.delRead = redisLibeventDelRead;ac->ev.addWrite = redisLibeventAddWrite;ac->ev.delWrite = redisLibeventDelWrite;ac->ev.cleanup = redisLibeventCleanup;ac->ev.scheduleTimer = redisLibeventSetTimeout;ac->ev.data = e;/* Initialize and install read/write events */e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e);e->base = base;return REDIS_OK;
}
#endif

main.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>#include <hiredis.h>
#include <async.h>
#include <adapters/libevent.h>void getCallback(redisAsyncContext *c, void *r, void *privdata) {redisReply *reply = r;if (reply == NULL) {if (c->errstr) {printf("errstr: %sn", c->errstr);}return;}printf("argv[%s]: %sn", (char*)privdata, reply->str);/* Disconnect after receiving the reply to GET */redisAsyncDisconnect(c);
}void connectCallback(const redisAsyncContext *c, int status) {if (status != REDIS_OK) {printf("Error: %sn", c->errstr);return;}printf(&#n");
}void disconnectCallback(const redisAsyncContext *c, int status) {if (status != REDIS_OK) {printf("Error: %sn", c->errstr);return;}printf(&#n");
}int main (int argc, char **argv) {
#ifndef _WIN32signal(SIGPIPE, SIG_IGN);
#endifstruct event_base *base = event_base_new();redisOptions options = {0};REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);struct timeval tv = {0};tv.tv_sec = t_timeout = &tv;redisAsyncContext *c = redisAsyncConnectWithOptions(&options);if (c->err) {/* Let *c leak  */printf("Error: %sn", c->errstr);return 1;}redisLibeventAttach(c,base);redisAsyncSetConnectCallback(c,connectCallback);redisAsyncSetDisconnectCallback(c,disconnectCallback);redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");event_base_dispatch(base);return 0;
}

总结

  1. 事务如果是乐观锁实现,即watch+multi+exec,失败需要重试,会增加业务逻辑的复杂度。
  2. lua脚本满足原子性和隔离性,但不满足一致性和持久性。redis只有在aof持久化策略的时候,并且每写入一个数据都要进行写盘操作,才满足持久性。
  3. redis 同步连接方案采用阻塞 io 来实现,异步连接方案采用非阻塞 io 来实现。

本文发布于:2024-02-04 13:31:18,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170708383256009.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:灵活   策略   事务   Redis
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23