mit 6.830 数据库系统: lab 2
总的来说,lab2的难度也不是太大,主要难的在于一些关于迭代器的实现.
1.总体架构
lab2中所需要实现的部分并不像lab1那样有那么强的结构性和整体性,相对来说比较零散.总的来说分为Join,Field,Group,Add,Delete
等操作,还有bufferpool的置换算法.
1) Operator,OpIterator为基础的迭代器体系
其中Opiterator
是所有运算操作所需要实现的基本接口,位于最底层,其中的需要实现的函数包含了一些迭代器中最基本的函数,比如说open,close,hasNext,next
等.
而Operator
则是以OpIterator
为基础所实现的一个抽象类,其中将next,hasNext,close
等操作进行实现,其中尤其是next,hasNext
理解这部分的实现能够有利于我们知道Opertaor
的子类中FetchNext
的作用.
private Tuple next = null;
private boolean open = false;
public boolean hasNext() throws DbException, TransactionAbortedException {
if (!this.open)
throw new IllegalStateException("Operator not yet open");
if (next == null)
next = fetchNext();
return next != null;
}
public Tuple next() throws DbException, TransactionAbortedException,
NoSuchElementException {
if (next == null) {
next = fetchNext();
if (next == null)
throw new NoSuchElementException();
}
Tuple result = next;
next = null;
return result;
}
在其中有两个变量next
和open,其中open是一个表示是否已经开启的flag,这个flag的真假将会影响到hasNext
等操作的执行,也就是该迭代器的状态.next
则用于保存fetchNext
的结果,当调用next的时候,可以认为是将next中储存的FetchNext
的结果取出,如果next中没有储存的结果,就通过调用FetchNext
取出一个来,然后再取出.而hasNext
则是通过检查当前next有没有存储的结果来实现的,如果没有则尝试能不能FetchNext
出一个结果,只有在没有储存的next结果并且取不出一个新的next的时候,hasNext为false.
在这里面fetchNext
是一个抽象方法,通常用于在继承Operator的类中实现,比如说Filter,Join
等.
protected abstract Tuple fetchNext() throws DbException,
TransactionAbortedException;
结合hasNext,next
以及后来派生类的实现,可以将这个模型概括为:
- 变量next是一个缓冲区(size为1).
- next从缓冲区中取走一个值,如果没有就通过fetchNext拉一个值,再取出.
- hasNext则检查该缓冲区有没有值,如果没有,就看能不能拉出来一个,如果拉取不出来,则就false.
- 在派生类中,往往又会有operator的操作对象,可能是一个关系,也可能是两个关系,通过派生类中的关系及其操作类型,fetchNext将返回一个Tuple提供给next.
所以在lab中,只有实现了fetchNext方法,才算是实现了一个完整的迭代器.
这种迭代器的设计方式看似比较别扭,但是其巧妙之处在于,可以使不同的迭代器共用相同的next和hasnext,只需要实现不同的fetchNext即可,降低了代码的耦合度.
2)join,field
这一部分操作的基础为断言(Predicate
).用来表示的是,像“a ==
b,a > b,a !=
b”等条件式.而在于数据库所需要的关系运算中,这种二元运算就往往需要在关系中进行,因此“二元”变成了关系中的两个filed
.因此一个Predicate
的基类的数据成员如下,恰好对应了二元条件表达式:
private int field_; //对应了关系中的某一列的所有field
private Op op_; //描述=,!=,>等操作符
private Field operand_; //只用来对应一个Field
JoinPredicate
与Predicate
的实现和功能类似,但是其中最主要的区别在于其数据成员中,包含的field的部分是两个表示field的索引,因此JoinPredicate
的比较对象是关系中的两个列整体进行OP操作,而Predacate
则用于将关系的一个列和一个固定的field进行比较.
private int field1_;
private int field2_;
private Predicate.Op op_;
因此,JoinPredicate
用于实现Join
,而Predicate
用于实现filter
.
3)Aggregate
首先是Aggregator
,这是一个接口,这个接口是对于聚类函数的抽象,比如说MIN,MAX,SUM,AVG
等.此外还包括mergeTupleIntoGroup
这个函数,因此每当有一个新的Tuple加入,聚类操作的结果就要发生变化.而关于迭代器的实现,将结合被聚类关系进行操作,也就是说,这个迭代器用于遍历被聚类关系上的一个个元组,每当next
遍历一次,就要执行mergeTupleIntoGroup
进行更新.而StringAggregator
和IntegerAggregator
是两个Aggregator
的派生类,
Aggregate
则是一个Operator
类,其中关于迭代器的实现仍然遵循上面所说的模式,具体来说就是根据InterAggregator或者StringAggregator的迭代器进行实现.
4)insert,delete
这一部分遵循HeapPage->HeapFile->BufferPool
的层次,BuferPool
所提供的关于insert/delete是最外层的接口,BufferPool会调用HeapFile中的insert/delete,然后再将改变的page在BufferPool更新,HeapFile中的insert和delete都需要读取Page,但这个时候借助BufferPool的getPage来读取,对于其中需要求改的Page,则进一步地调用HeapPage中的insert/delete.
2.实现过程
1) Exercise 1
关于Predicate
类的实现,其中需要补全的几个部分主要是一些关于Set和Get的函数,比较简单不再多说,其中需要实现filter
函数,该函数的作用在于给定一个Tuple,将其与Predicate
中的operand_
(也就是固定的被比较的域)进行比较,这个函数直接将Tuple中对应的field获取,然后调用compare函数进行比较即可.
public boolean filter(Tuple t) {
// some code goes here
Field field_t = t.getField(field_);
return field_t.compare(op_,operand_); //operand_就是被比较的对象
}
然后基于Predicate
实现Fielter
,这个类的作用在于对一个给定的关系,将其根据Predicate
执行过滤操作.除了几个简单的get,set操作之外,还有有关于迭代器的部分,其中最重要的是fetchNext,结合上面所说的next,hasNext和fetchNext的关系,实现如下:
protected Tuple fetchNext() throws NoSuchElementException,
TransactionAbortedException, DbException {
// some code goes here
while (child_.hasNext()) {
Tuple tuple = child_.next();
if (predicate_.filter(tuple)) {
return tuple;
}
}
return null;
}
这里的filter有两个数据成员,一个用来描述filter所需要满足的条件Predicate
,另一个表示作用对象child_,更具体地说,是一个用来操作关系的迭代器.这里的fetchNext基于child_
的迭代器实现,不过多了predicate_
的filter操作,直到找到下一个符合要求的才会返回,如果没有就返回null.
关于其他部分的实现:
public TupleDesc getTupleDesc() {
// some code goes here
return child_.getTupleDesc();
}
public void open() throws DbException, NoSuchElementException,
TransactionAbortedException {
// some code goes here
super.open(); //自己对应的迭代器
child_.open(); //child的迭代器
}
public void close() {
// some code goes here
child_.close();
super.close();
}
public void rewind() throws DbException, TransactionAbortedException {
// some code goes here
//child_.rewind();
close();
open();
}
由于这是一个基于child_的迭代器所实现的迭代器,所以在open,close,rewind
等操作中,需要开启/关闭的迭代器既包含child_
所对应的迭代器,也包含它自己的迭代器,对于TupleDesc来说,过滤操作不改变关系自身的模式,只是根据条件筛选出一部分Tuple而已,因此和child的TupleDesc相同.
Join
部分的目标在于,根据给定的JoinPredicate
将两个关系中的一部分Tuple进行合并.关于合并两个Tuple的操作如下
private Tuple joinTuples(Tuple tuple1,Tuple tuple2) { //根据是否符合某个条件将两者合并
if (joinPredicate_.filter(tuple1,tuple2)) {
TupleDesc tupleDesc = TupleDesc.merge(tuple1.getTupleDesc(),tuple2.getTupleDesc());
Tuple tuple = new Tuple(tupleDesc);
int fieldNum_1 = tuple1.getTupleDesc().numFields(),fieldNum_2 = tuple2.getTupleDesc().numFields();
for (int i = 0; i < fieldNum_1;i ++) {
tuple.setField(i,tuple1.getField(i));
}
for (int i = 0; i < fieldNum_2;i ++) {
tuple.setField(fieldNum_1 + i,tuple2.getField(i));
}
return tuple;
}
return null;
}
首先判断两个Tuple是否符合Predicate
的要求,否则直接返回null.对于符合条件的,先得出合并后的TupleDesc,然后得出两个tuple的所有field,然后通过setField进行设置,最终返回.
而其中关于fetchNext
的实现是比较烧脑的,实现如下.
protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// some code goes here
while (currTuple1_ != null || child1_.hasNext()) {
if (currTuple1_ == null) {
if (child1_.hasNext()) {
currTuple1_ = child1_.next();
} else {
return null;
}
}
if (!child2_.hasNext()) {
if(child1_.hasNext()){
child2_.rewind();
currTuple1_ = child1_.next();
}else{
return null;
}
}
while (child2_.hasNext()) {
currTuple2_ = child2_.next();
Tuple tuple = joinTuples(currTuple1_,currTuple2_);
if (tuple != null) {
return tuple;
}
}
}
return null;
}
2) Exercise 2
在此以StringAggregator
为例.
GpMap
中,每个key对应加入的Tuple中的不同的group
Field,其中value则是group Filed下的所有的StringField的集合.
private int gbfield_; //group所根据的field index
private Type gbfieldtype_; //上面索引中所对应的数据类型,也就是group域的数值类型
private int afield_; //聚合函数所对应的的field index
private Op what_; //聚合函数的类型
private HashMap<Field,ArrayList<StringField>> GpMap_; //每一个group field对应一个StringField的序列
private TupleDesc desc_; //代表聚类结果的TupleDesc
public StringAggregator(int gbfield, Type gbfieldtype, int afield, Op what) throws IllegalArgumentException{
// some code goes here
if (!what.equals(Op.COUNT)) {
throw new IllegalArgumentException("the what not COUNT in StringAggregator");
}
gbfield_ = gbfield;
gbfieldtype_ = gbfieldtype;
afield_ = afield;
what_ = what;
desc_ = null;
GpMap_ = new HashMap<Field,ArrayList<StringField>>();
}
其中TupleDesc
的实现主要区分好两种情况,一种是一个field的情况,这种情况通常是gbfield_
为-1或者该索引不存在的情况,除此之外则是两个field的情况.关于mergeTupleIntoGroup
这个函数主要的作用在于将一个特定的Tuple加入到该类维护的容器(GpMap)中,在这里仍然注意对于gpfield两种情况的区分,如果gpfield是-1的话,则map中对应的key为null.
public void mergeTupleIntoGroup(Tuple tup) throws NoSuchElementException {
// some code goes here
setTupleDesc(tup);
Field gpfield;
StringField afield;
if (gbfield_ != -1) {
gpfield = tup.getField(gbfield_);
if (gpfield == null) {
throw new NoSuchElementException("not have the gbfield in tup from mergeTupleIntoGroup");
}
} else {
gpfield = null;
}
afield = (StringField) tup.getField(afield_);
if (!GpMap_.containsKey(gpfield)) {
GpMap_.put(gpfield, new ArrayList<StringField>());
}
GpMap_.get(gpfield).add(afield); //最后加入到group feild为key的StringFeild序列中.
}
此外,StringAggregator需要有一个完整的迭代器的实现.这个迭代器是基于GpMap这个“容器”进行实现的.其数据成员如下:
private StringAggregator aggregator_;
private Iterator<Field> gvalueIt_;
private Field currGroup_;
public StringAggregatorOpIterator(StringAggregator saggregator) {
aggregator_ = saggregator;
gvalueIt_ = null;
currGroup_ = null;
}
public void open() throws DbException, TransactionAbortedException{
gvalueIt_ = aggregator_.GpMap_.keySet().iterator();
currGroup_ = null;
}
aggregator
指的是该迭代器所属的StringAggregator
,gvalueIt
则是aggregator中的GpMap
中key的迭代器,迭代器中每一项也就对应一个group.因此该迭代器每次遍历的单位就是一个group.其中关于next的实现如下:
public Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {
Tuple tuple = null;
tuple = new Tuple(aggregator_.desc_);
currGroup_ = gvalueIt_.next();
IntField intfield = new IntField(aggregator_.GpMap_.get(currGroup_).size());
if (aggregator_.gbfield_ == -1 && aggregator_.GpMap_.containsKey(null)) { //这个时候就只会固定在第一个null为key的里面
tuple.setField(0,intfield);
} else {
tuple.setField(0,currGroup_);
tuple.setField(1,intfield);
}
return tuple;
}
public boolean hasNext() throws DbException, TransactionAbortedException {
return gvalueIt_.hasNext();
}
其中返回的Tuple的desc再aggregator
中就已经确定,其中value,直接通过aggregator_.GpMap_.get(currGroup_).size()
即可获得.至于hasNext
的实现,只要还有next
group就可以,因此也就是返回gvalueIt_.hasNext()
.
在练习2中Aggregate
仍然是一个operator
,其基本模式与上面的Fielter,Join
基本一样.其作用在于对于某个给定的关系执行某种聚类操作.数据成员如下:
private OpIterator child_; //执行聚类操作的关系
private Aggregator aggregator_; //代表聚类操作
private int gfield_; //关系中的group field
private int afield_; //关系中的aggregate field
private Aggregator.Op aop_; //aggregator操作的类型
private TupleDesc desc_; //返回Tuple的描述符
private OpIterator opIt_; //aggregator中的迭代器
这里的思路,主要在于将child_中的Tuple一个个Merge到aggregator中,然后借助该aggregator的迭代器实现fetchNext
等迭代器的操作.其中fetchNext
实现如下:
protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// some code goes here
//return null;
if (opIt_.hasNext()) {
return opIt_.next();
}
return null;
}
public void open() throws NoSuchElementException, DbException,
TransactionAbortedException {
// some code goes here
child_.open();
if (opIt_ == null) {
MergeToAggregator();
opIt_ = aggregator_.iterator();
}
opIt_.open();
super.open();
}
public void close() {
// some code goes here
opIt_.close();
child_.close();
super.close();
}
当opIt_
被填充好后,直接根据opIt_
的next实现即可.open
的实现中,需要对opIt_
进行初始化,其中MergeToAggregator用于将child_
这个关系中的Tuple一个个地加入到aggregator
中.
3) Exercise 3
练习3中,需要实现对某个表进行insert/delete操作,其实现遵循HeapPage->HeapFile->BufferPool
自底向上的线索.在HeapPage
中:
public void insertTuple(Tuple t) throws DbException {
// some code goes here
// not necessary for lab1
TupleDesc tupleDesc = t.getTupleDesc();
if (getNumEmptySlots() == 0 || !tupleDesc.equals(td)) { //外部调用时需要进行检查
throw new DbException("cant insert tuple to HeapPage");
}
int k; //找出一个空闲位置
for (k = 0;k < numSlots ; k ++) {
if (!isSlotUsed(k)) {
break;
}
}
t.resetRecordId(pid,k);
tuples[k] = t;
markSlotUsed(k,true);
}
在这里插入一个insert需要遍历bitmap中的每一个bit,当遇到空闲位置则插入,插入时需要对Tuple中的Record进行重置,从而适应最终插入的位置,设置对应的tuples,最终还需要标记Used.关于delete的操作与此类似,不同的是直接定位出要删除的Tuple的位置,然后直接将SlotUsed进行设置即可.在HeapPage
中还有一些关于Dirty的操作,这用于BufferPool
中的flush,evit等操作.其实现比较简单.
public void markDirty(boolean dirty, TransactionId tid) {
// some code goes here
// not necessary for lab1
isdirty_ = dirty;
if (dirty) {
lastTid_ = tid;
}
}
public TransactionId isDirty() {
// some code goes here
// Not necessary for lab1
//return lastTid_;
if (isdirty_) {
return lastTid_;
} else {
return null;
}
}
在HeapFile
中,delete操作比较简单,将某个Tuple对应的PageId提取出来后,然后借助BufferPool.getPage
读取出来该页,然后调用该页的deletTuple即可.而insert相对而言比较复杂.其实现如下:
public List<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// some code goes here
// not necessary for lab1
List<Page> list = new ArrayList<Page>();
HeapPage result = null;
RecordId recordId = t.getRecordId();
int pageNum = numPages(),i;
for (i = 0; i <pageNum; i ++) {
HeapPageId pageId = new HeapPageId(heapId_,i);
HeapPage page = (HeapPage) Database.getBufferPool().getPage(tid,pageId,Permissions.READ_WRITE);
if (page.getNumEmptySlots() > 0) {
page.insertTuple(t);
result = page;
break;
}
}
if (i >= pageNum) { //需要增加新的页,前面的已经写满了
byte[] data = new byte[BufferPool.getPageSize()];
HeapPageId heapPageId = new HeapPageId(heapId_,i);
HeapPage newPgae = new HeapPage(heapPageId,data);
newPgae.insertTuple(t);
writePage(newPgae);
result = newPgae;
}
list.add(result);
return list;
}
需要遍历该PageFile中当前已有的Page,当遍历到仍然有空位的Page就执行insert,如果当前已有的Page中并没有空余的,就创建一个Page,然后将其插入到新的Page中,并将这个Page写入到该文件.为什么返回值是List
或者ArrayList
呢?这个返回值用于被BufferPool
获取,当BufferPool获取后,就知道哪些页发生了更新的动作,进而将BufferPool中当前已有的对应的Page进行更新.其中BufferPool中关于insertTuple和deleteTuple的操作是基于HeapFile的insert/delete实现的.最后将返回的List<page>
进行更新(调用updatePagePool
).
private void updatePagePool(List<Page> pagelist,TransactionId tid) throws DbException{
int size = pagelist.size();
for (int i = 0 ; i < size; i ++) {
HeapPage page = (HeapPage) pagelist.get(i);
page.markDirty(true,tid);
int hashcode = page.pid.hashCode(); //注意是要得出pid的hashcode
if (pages_.containsKey(hashcode)) {
Page oldPage = pages_.get(hashcode);
oldPage = page; //更新
} else {
//暂且先不考虑超出的情况
if(pages_.size() >= maxPageNum_) {
evictPage();
}
pages_.put(hashcode,page);
}
}
}
关于flushPage,flushAllPages
的实现,这里这里获取该Page对应的文件后,根据isDirty进行writePage即可.
private synchronized void flushPage(PageId pid) throws IOException {
// some code goes here
// not necessary for lab1
int tableId = pid.getTableId();
HeapFile file = (HeapFile) Database.getCatalog().getDatabaseFile(tableId);
int hashCode = pid.hashCode();
if (pages_.containsKey(hashCode)) {
HeapPage page = (HeapPage) pages_.get(hashCode);
if (page.isDirty() != null) {
file.writePage(page);
}
} //写入
}
4) Exercise 4
实现Delete,Insert
操作,这里的实现仍然遵循Fielter,Join
等模式,实现open,close,fetchNext
等.两者的实现差不多,只说明Insert.
private TransactionId tid_;
private OpIterator child_;
private int tableId_;
private TupleDesc desc_;
private boolean isNext_;
数据成员如上,这个迭代器用于将child_
中的所有Tuple插入到tableId对应的表中,需要特别注意它的next的意义,一个next操作则表示将整个child_
插入到table中.其返回Tuple的类型只有一个field,表示的是成功插入的Tuple的数量.
下面是fetchNext
的实现:
protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// some code goes here
if (!isNext_) {
return null;
}
int cnt = 0;
while (child_.hasNext()) {
Tuple tuple = child_.next();
boolean insertOk = true;
try {
Database.getBufferPool().insertTuple(tid_,tableId_,tuple);
} catch (Exception e) {
insertOk = false;
}
if (insertOk) {
cnt += 1;
}
}
Tuple result = new Tuple(desc_);
IntField field = new IntField(cnt);
result.setField(0,field);
isNext_ = false;
return result;
}
如果之前已经通过fetchNext执行过一次插入了,就返回null,这告知上面的hasNext是false的.在近插入时则遍历child,逐个借助insertTuple进行插入,最终返回插入后的结果.这里的insert则是借助BufferPool
的insert接口实现的.
5) Exercise 5
最后这一部分主要是BufferPool
中关于置换算法的实现,这里的实现采用FIFO的置换方法,实现起来非常简单:
//增加的新的数据成员
private LinkedList<Integer> fifoQueue_;
//evitPage实现
private synchronized void evictPage() throws DbException {
// some code goes here
// not necessary for lab1
int hashcode = fifoQueue_.pop();
HeapPage page = (HeapPage) pages_.get(hashcode);
HeapPageId pageId = page.getId();
try {
flushPage(pageId);
} catch (Exception e) {
System.out.printf("flushPage error in evictPage from pagepool");
}
pages_.remove(hashcode); //移除
}
fifoQueue_
在每次有新的Page加入后都会在里面加入新的Page.注意对于驱逐出来的Page注意flush到File中.
3.总结
lab2中讲opertaor定义为迭代器的形式,此外其中next和hasNext的处理很精妙,这使得派生类都可以共用next,hasNext,只需要重写fetchNext,open等.