MYDB提供数据库崩溃后的恢复功能,DM 模块在每次对底层数据操作时,都会记录一条日志到磁盘上。在数据库崩溃之后(即第一页校验不对时),再次启动时,可以根据日志的内容,恢复数据文件,保证其一致性。
日志是一个自定义的 log 格式的二进制文件,按照如下的格式进行排布:
[XChecksum][Log1][Log2][Log3]...[LogN][BadTail]
其中, XChecksum 是一个四字节的整数,是对后面所有日志计算的校验和(注意不包括BadTail)。Log1 ~ LogN 是常规的日志数据,BadTail 是在数据库崩溃时,没有来得及写完的日志数据,这个 BadTail 不一定存在。
每条日志 [LogN] 的格式如下:
[Size][Checksum][Data]
其中,Size 是一个四字节整数,标识了 Data 段的字节数。Checksum 则是该条日志的校验和。
单条日志的校验和,其实就是通过一个指定的种子实现的,对所有日志求出校验和,求和就能得到日志文件的校验和了。
private int calChecksum(int xCheck, byte[] log) {for (byte b : log) {xCheck = xCheck * SEED + b;}return xCheck;}
Logger 被实现成迭代器模式,通过 next() 方法,不断地从文件中读取下一条日志,并将其中的 Data 解析出来并返回。 next() 方法的实现主要依靠 internNext() 。
public byte[] next() {lock.lock();try {byte[] log = internNext();if(log == null) return null;/// 返回这条[Log]的[Data]pyOfRange(log, OF_DATA, log.length);} finally {lock.unlock();}}
private byte[] internNext() {/// position是当前日志文件中的指针位置,表示[LogN]的起始,OF_DATA为8,表示一个[LogN]中,[Data]的起始if(position + OF_DATA >= fileSize) {return null;}ByteBuffer tmp = ByteBuffer.allocate(4);try {fc.position(position);fc.read(tmp); /// 将[Size]读到tmp中} catch(IOException e) {Panic.panic(e);}int size = Parser.parseInt(tmp.array());/// size + OF_DATA 是当前position所指[Log]的长度if(position + size + OF_DATA > fileSize) {return null;}ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size);try {fc.position(position);fc.read(buf); /// 将[LogN]读到buf中} catch(IOException e) {Panic.panic(e);}byte[] log = buf.array();/// 根据一条日志的[Data]计算出其校验和int checkSum1 = calChecksum(0, pyOfRange(log, OF_DATA, log.length));/// 获取一条日志的[CheckSum]int checkSum2 = Parser.pyOfRange(log, OF_CHECKSUM, OF_DATA));/// 此[Log]是BadTailif(checkSum1 != checkSum2) {return null;}/// position指向下一条[Log]的起始position += log.length;return log;}
在打开一个日志文件时,需要首先校验日志文件的 XChecksum,并移除文件尾部可能存在的 BadTail,由于 BadTail 该条日志尚未写入完成,文件的校验和也就不会包含该日志的校验和,去掉 BadTail 即可保证日志文件的一致性。
private void checkAndRemoveTail() {/// 将position置为4rewind();int xCheck = 0;while(true) {/// internNext会改变position的值,循环结束后position指向最后一个正常[Log]的末尾byte[] log = internNext();if(log == null) break;/// xCheck是累加的xCheck = calChecksum(xCheck, log);}if(xCheck != xChecksum) {Panic.panic(Error.BadLogFileException);}try {/// 将文件截断至position位置,即只保留position前的部分truncate(position);} catch (Exception e) {Panic.panic(e);}try {/// 将文件指针设置至positionfile.seek(position);} catch (IOException e) {Panic.panic(e);}rewind();}
public void truncate(long x) throws Exception {lock.lock();try {fc.truncate(x);} finally {lock.unlock();}}
向日志文件写入日志时,也是首先将数据包裹成日志格式,写入文件后,再更新文件的校验和,更新校验和时,会刷新缓冲区,保证内容写入磁盘。
public void log(byte[] data) {/// 将data包裹为[Log]格式byte[] log = wrapLog(data);ByteBuffer buf = ByteBuffer.wrap(log);lock.lock();try {fc.position(fc.size());fc.write(buf);} catch(IOException e) {Panic.panic(e);} finally {lock.unlock();}updateXChecksum(log);}/// 更新XChecksumprivate void updateXChecksum(byte[] log) {this.xChecksum = calChecksum(this.xChecksum, log);try {fc.position(0);fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum)));fc.force(false);} catch(IOException e) {Panic.panic(e);}}private byte[] wrapLog(byte[] data) {byte[] checksum = Parser.int2Byte(calChecksum(0, data));byte[] size = Parser.int2Byte(data.length);/// Google Guava库中的方法,将多个字节数组连接在一起形成一个新的字节数组at(size, checksum, data);}
DM 为上层模块,提供了两种操作,分别是插入新数据(I)和更新现有数据(U)。至于为啥没有删除数据,这个会在 VM 一节叙述。
DM的日志策略:在进行 I 和 U 操作之前,必须先进行对应的日志操作,在保证日志写入磁盘后,才进行数据操作。
对于 I 和 U 操作,DM 记录的格式如下,由两个静态内部类实现。
插入操作 I:(xid, pgno, offset, x) 表示事务 xid 在位置 offset 插入了一条数据 x。
更新操作 U:(xid, pgno, offset, oldx, newx) 表示事务 xid 在位置 offset 将 oldx 更新为 newx。
假设日志中最后一个事务是 Ti,恢复时需要:
对 Ti 前的所有事务日志重做(redo),
在 XID 文件中检查 Ti 的状态, 如果 Ti 的状态是已完成(包括 committed 和 aborted),就将 Ti 重做,否则进行撤销(undo)。
接下来,如何对事务 T 重做:
正序扫描事务 T 的所有日志,
如果日志是插入操作 (xid, pgno, offset, x),就将 x 重新插入,
如果日志是更新操作 (xid, pgno, offset, oldx, newx) ,就将 A值设置为 newx。
如何对事务 T 撤销:
倒序扫描事务 T 的所有日志,
如果日志是插入操作(xid, pgno, offset, x),就将数据删除,
如果日志是更新操作(xid, pgno, offset, oldx, newx) ,就将值设置为 oldx。
单线程的恢复策略在多线程下会出现问题。
第一种:事务 T1 读到了 T2 未提交的数据:
T1 begin
T2 begin
T2 update(x)
T1 read(x)
...
T1 commit
DB break down
在系统崩溃时,T2 仍然是活跃状态。那么当数据库重新启动,执行恢复例程时,会撤销 T2,它对数据库的影响会被消除。但是由于 T1 读取了 T2 更新的值,既然 T2 被撤销,那么 T1 也应当被撤销。这种情况,就是级联回滚。但是,T1 已经 commit 了,所有 commit 的事务的影响,应当被持久化。这里就造成了矛盾。所以这里需要保证:
正在进行的事务,不会读取其他事务未提交的数据。
第二种:事务T2修改了事务T1修改后但是并未提交的数据:
x = 0
T1 begin
T2 begin
T1 set x = x + 1 // x = 1
T2 set x = x + 1 // x = 2
T2 commit
DB break down
在系统崩溃时,T1 仍然是活跃状态。那么当数据库重新启动,执行恢复例程时,会对 T1 进行撤销,对 T2 进行重做,但是,如果先撤销T1,再重做T2,x变为2,如果先重做T2,再撤销T1,x变为0,都不是1,都是错误的(所有 commit 的事务的影响,应当被持久化)。所以这里需要保证:
正在进行的事务,不会修改取其他事务未提交的修改或产生的数据。
由于 VM层 的存在,传递到 DM 层,真正执行的操作序列,都可以保证规定 1 和规定 2,所以在DM层无需另外的代码来保证这两个规定。
private static final byte LOG_TYPE_INSERT = 0;
private static final byte LOG_TYPE_UPDATE = 1;// 即上文中 [Log] 的 [Data] 部分
updateLog:
[LogType] [XID] [UID] [OldRaw] [NewRaw] // UID 经过转化变为 Pgno 和 OffsetinsertLog:
[LogType] [XID] [Pgno] [Offset] [Raw]
public static void recover(TransactionManager tm, Logger lg, PageCache pc) {System.out.println(");lg.rewind(); /// 设置 lg 中指针的位置为4,即第一个[Log]的起始位置int maxPgno = 0;while(true) {byte[] log = lg.next(); /// 拿到[Log]中的[data]if(log == null) break;int pgno;if(isInsertLog(log)) { /// 判断log[0]是否是插入类型InsertLogInfo li = parseInsertLog(log); ///将[data]转换为对象pgno = li.pgno;} else {UpdateLogInfo li = parseUpdateLog(log);pgno = li.pgno;}if(pgno > maxPgno) {maxPgno = pgno;}}if(maxPgno == 0) {maxPgno = 1;}pc.truncateByBgno(maxPgno); /// 更新页面文件,保留maxPgno以内的部分System.out.println("Truncate to " + maxPgno + " pages.");redoTranscations(tm, lg, pc);System.out.println("Redo Transactions Over.");undoTranscations(tm, lg, pc);System.out.println("Undo Transactions Over.");System.out.println("Recovery Over.");}
redo 的实现
private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc) {lg.rewind(); /// 设置 lg 中指针的位置为4,即第一个[Log]的起始位置while(true) {byte[] log = lg.next(); /// 拿到[Log]中的[data]if(log == null) break;if(isInsertLog(log)) { /// 判断log[0]是否是插入类型InsertLogInfo li = parseInsertLog(log);long xid = li.xid;if(!tm.isActive(xid)) { /// 判断事务是否不处于进行中doInsertLog(pc, log, REDO); /// redo 插入操作}} else {UpdateLogInfo xi = parseUpdateLog(log);long xid = xi.xid;if(!tm.isActive(xid)) {doUpdateLog(pc, log, REDO); /// redo 更新操作}}}}
undo 的实现需要将一个事务内的多条日志倒序进行撤销
private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) {Map<Long, List<byte[]>> logCache = new HashMap<>(); /// key 是 xid,value 是日志数据lg.rewind();while(true) {byte[] log = lg.next(); /// 拿到[Log]中的[data]if(log == null) break;if(isInsertLog(log)) {InsertLogInfo li = parseInsertLog(log);long xid = li.xid;if(tm.isActive(xid)) { /// 判断事务是否处于进行中if(!ainsKey(xid)) {logCache.put(xid, new ArrayList<>());}(xid).add(log);}} else {UpdateLogInfo xi = parseUpdateLog(log);long xid = xi.xid;if(tm.isActive(xid)) {if(!ainsKey(xid)) {logCache.put(xid, new ArrayList<>());}(xid).add(log);}}}// 对所有active log进行倒序undofor(Entry<Long, List<byte[]>> entry : Set()) {List<byte[]> logs = Value();for (int i = logs.size()-1; i >= 0; i --) {byte[] log = (i);if(isInsertLog(log)) {doInsertLog(pc, log, UNDO);} else {doUpdateLog(pc, log, UNDO);}}tm.Key()); /// 将事务 xid 状态改为已撤销(回滚)}}
doInsertLog 中的删除使用的是 setDataItemRawInvalid,将该条数据的有效位设置为无效,进行逻辑删除。
private static void doInsertLog(PageCache pc, byte[] log, int flag) {InsertLogInfo li = parseInsertLog(log);Page pg = null;try {pg = pc.getPage(li.pgno);} catch(Exception e) {Panic.panic(e);}try {if(flag == UNDO) {DataItem.setDataItemRawInvalid(li.raw);}verInsert(pg, li.raw, li.offset);} finally {pg.release();}}
private static void doUpdateLog(PageCache pc, byte[] log, int flag) {int pgno;short offset;byte[] raw;if(flag == REDO) {UpdateLogInfo xi = parseUpdateLog(log);pgno = xi.pgno;offset = xi.offset;raw = xi.newRaw; /// redo 的话,重做为新数据} else {UpdateLogInfo xi = parseUpdateLog(log);pgno = xi.pgno;offset = xi.offset;raw = xi.oldRaw; /// undo 的话,撤销为旧数据}Page pg = null;try {pg = pc.getPage(pgno);} catch (Exception e) {Panic.panic(e);}try {verUpdate(pg, raw, offset); /// 将数据放到缓存中} finally {pg.release(); /// 释放缓存,将页面数据写入磁盘}}
本文发布于:2024-02-03 07:15:42,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170691574349472.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |