mit 6.830 数据库系统: lab 2

总的来说,lab2的难度也不是太大,主要难的在于一些关于迭代器的实现.

1.总体架构

lab2中所需要实现的部分并不像lab1那样有那么强的结构性和整体性,相对来说比较零散.总的来说分为Join,Field,Group,Add,Delete等操作,还有bufferpool的置换算法.

1) Operator,OpIterator为基础的迭代器体系

其中Opiterator是所有运算操作所需要实现的基本接口,位于最底层,其中的需要实现的函数包含了一些迭代器中最基本的函数,比如说open,close,hasNext,next等.

Operator则是以OpIterator为基础所实现的一个抽象类,其中将next,hasNext,close等操作进行实现,其中尤其是next,hasNext理解这部分的实现能够有利于我们知道Opertaor的子类中FetchNext的作用.

private Tuple next = null;
private boolean open = false;
public boolean hasNext() throws DbException, TransactionAbortedException {
        if (!this.open)
            throw new IllegalStateException("Operator not yet open");
        
        if (next == null)
            next = fetchNext();
        return next != null;
    }

public Tuple next() throws DbException, TransactionAbortedException,
            NoSuchElementException {
        if (next == null) {
            next = fetchNext();
            if (next == null)
                throw new NoSuchElementException();
        }

        Tuple result = next;
        next = null;
        return result;
 }

在其中有两个变量next和open,其中open是一个表示是否已经开启的flag,这个flag的真假将会影响到hasNext等操作的执行,也就是该迭代器的状态.next则用于保存fetchNext的结果,当调用next的时候,可以认为是将next中储存的FetchNext的结果取出,如果next中没有储存的结果,就通过调用FetchNext取出一个来,然后再取出.而hasNext则是通过检查当前next有没有存储的结果来实现的,如果没有则尝试能不能FetchNext出一个结果,只有在没有储存的next结果并且取不出一个新的next的时候,hasNext为false.

在这里面fetchNext是一个抽象方法,通常用于在继承Operator的类中实现,比如说Filter,Join等.

protected abstract Tuple fetchNext() throws DbException,
            TransactionAbortedException;

结合hasNext,next以及后来派生类的实现,可以将这个模型概括为:

  • 变量next是一个缓冲区(size为1).
  • next从缓冲区中取走一个值,如果没有就通过fetchNext拉一个值,再取出.
  • hasNext则检查该缓冲区有没有值,如果没有,就看能不能拉出来一个,如果拉取不出来,则就false.
  • 在派生类中,往往又会有operator的操作对象,可能是一个关系,也可能是两个关系,通过派生类中的关系及其操作类型,fetchNext将返回一个Tuple提供给next.

所以在lab中,只有实现了fetchNext方法,才算是实现了一个完整的迭代器.

这种迭代器的设计方式看似比较别扭,但是其巧妙之处在于,可以使不同的迭代器共用相同的next和hasnext,只需要实现不同的fetchNext即可,降低了代码的耦合度.

2)join,field

这一部分操作的基础为断言(Predicate).用来表示的是,像“a == b,a > b,a != b”等条件式.而在于数据库所需要的关系运算中,这种二元运算就往往需要在关系中进行,因此“二元”变成了关系中的两个filed.因此一个Predicate的基类的数据成员如下,恰好对应了二元条件表达式:

private int field_; //对应了关系中的某一列的所有field
private Op op_; //描述=,!=,>等操作符
private Field operand_; //只用来对应一个Field

JoinPredicatePredicate的实现和功能类似,但是其中最主要的区别在于其数据成员中,包含的field的部分是两个表示field的索引,因此JoinPredicate的比较对象是关系中的两个列整体进行OP操作,而Predacate则用于将关系的一个列和一个固定的field进行比较.

private int field1_;
private int field2_;
private Predicate.Op op_;

因此,JoinPredicate用于实现Join,而Predicate用于实现filter.

3)Aggregate

首先是Aggregator,这是一个接口,这个接口是对于聚类函数的抽象,比如说MIN,MAX,SUM,AVG等.此外还包括mergeTupleIntoGroup这个函数,因此每当有一个新的Tuple加入,聚类操作的结果就要发生变化.而关于迭代器的实现,将结合被聚类关系进行操作,也就是说,这个迭代器用于遍历被聚类关系上的一个个元组,每当next遍历一次,就要执行mergeTupleIntoGroup进行更新.而StringAggregatorIntegerAggregator是两个Aggregator的派生类,

Aggregate则是一个Operator类,其中关于迭代器的实现仍然遵循上面所说的模式,具体来说就是根据InterAggregator或者StringAggregator的迭代器进行实现.

4)insert,delete

这一部分遵循HeapPage->HeapFile->BufferPool的层次,BuferPool所提供的关于insert/delete是最外层的接口,BufferPool会调用HeapFile中的insert/delete,然后再将改变的page在BufferPool更新,HeapFile中的insert和delete都需要读取Page,但这个时候借助BufferPool的getPage来读取,对于其中需要求改的Page,则进一步地调用HeapPage中的insert/delete.

2.实现过程

1) Exercise 1

关于Predicate类的实现,其中需要补全的几个部分主要是一些关于Set和Get的函数,比较简单不再多说,其中需要实现filter函数,该函数的作用在于给定一个Tuple,将其与Predicate中的operand_(也就是固定的被比较的域)进行比较,这个函数直接将Tuple中对应的field获取,然后调用compare函数进行比较即可.

public boolean filter(Tuple t) {
       // some code goes here
		Field field_t = t.getField(field_);
   return field_t.compare(op_,operand_); //operand_就是被比较的对象
}

然后基于Predicate实现Fielter,这个类的作用在于对一个给定的关系,将其根据Predicate执行过滤操作.除了几个简单的get,set操作之外,还有有关于迭代器的部分,其中最重要的是fetchNext,结合上面所说的next,hasNext和fetchNext的关系,实现如下:

protected Tuple fetchNext() throws NoSuchElementException,
            TransactionAbortedException, DbException {
        // some code goes here
        while (child_.hasNext()) {
            Tuple tuple = child_.next();
            if (predicate_.filter(tuple)) {
                return tuple;
            }
        }
        return null;
}

这里的filter有两个数据成员,一个用来描述filter所需要满足的条件Predicate,另一个表示作用对象child_,更具体地说,是一个用来操作关系的迭代器.这里的fetchNext基于child_的迭代器实现,不过多了predicate_的filter操作,直到找到下一个符合要求的才会返回,如果没有就返回null.

关于其他部分的实现:

public TupleDesc getTupleDesc() {
       // some code goes here
       return child_.getTupleDesc();
   }
   public void open() throws DbException, NoSuchElementException,
           TransactionAbortedException {
       // some code goes here
       super.open(); //自己对应的迭代器
       child_.open(); //child的迭代器
   }
   public void close() {
       // some code goes here
       child_.close();
       super.close();
   }
   public void rewind() throws DbException, TransactionAbortedException {
       // some code goes here
       //child_.rewind();
       close();
       open();
   }

由于这是一个基于child_的迭代器所实现的迭代器,所以在open,close,rewind等操作中,需要开启/关闭的迭代器既包含child_所对应的迭代器,也包含它自己的迭代器,对于TupleDesc来说,过滤操作不改变关系自身的模式,只是根据条件筛选出一部分Tuple而已,因此和child的TupleDesc相同.

Join部分的目标在于,根据给定的JoinPredicate将两个关系中的一部分Tuple进行合并.关于合并两个Tuple的操作如下

private Tuple joinTuples(Tuple tuple1,Tuple tuple2) { //根据是否符合某个条件将两者合并
        if (joinPredicate_.filter(tuple1,tuple2)) {
            TupleDesc tupleDesc = TupleDesc.merge(tuple1.getTupleDesc(),tuple2.getTupleDesc());
            Tuple tuple = new Tuple(tupleDesc);

            int fieldNum_1 = tuple1.getTupleDesc().numFields(),fieldNum_2 = tuple2.getTupleDesc().numFields();
            for (int i = 0; i < fieldNum_1;i ++) {
                tuple.setField(i,tuple1.getField(i));
            }
            for (int i = 0; i < fieldNum_2;i ++) {
                tuple.setField(fieldNum_1 + i,tuple2.getField(i));
            }
            return tuple;
        }
        return null;
}

首先判断两个Tuple是否符合Predicate的要求,否则直接返回null.对于符合条件的,先得出合并后的TupleDesc,然后得出两个tuple的所有field,然后通过setField进行设置,最终返回.

而其中关于fetchNext的实现是比较烧脑的,实现如下.

protected Tuple fetchNext() throws TransactionAbortedException, DbException {
        // some code goes here
        while (currTuple1_ != null || child1_.hasNext()) {
            if (currTuple1_ == null) {
                if (child1_.hasNext()) {
                    currTuple1_ = child1_.next();
                } else {
                    return null;
                }
            }
            if (!child2_.hasNext()) {
                if(child1_.hasNext()){
                    child2_.rewind();
                    currTuple1_ = child1_.next();
                }else{
                    return null;
                }
            }
            while (child2_.hasNext()) {
                currTuple2_ = child2_.next();
                Tuple tuple = joinTuples(currTuple1_,currTuple2_);
                if (tuple != null) {
                    return tuple;
                }
            }
        }
        return null;
    }

2) Exercise 2

在此以StringAggregator为例.

GpMap中,每个key对应加入的Tuple中的不同的group Field,其中value则是group Filed下的所有的StringField的集合.

private int gbfield_; //group所根据的field index
private Type gbfieldtype_; //上面索引中所对应的数据类型,也就是group域的数值类型
private int afield_; //聚合函数所对应的的field index
private Op what_; //聚合函数的类型
private HashMap<Field,ArrayList<StringField>> GpMap_; //每一个group field对应一个StringField的序列
private TupleDesc desc_; //代表聚类结果的TupleDesc

public StringAggregator(int gbfield, Type gbfieldtype, int afield, Op what)  throws IllegalArgumentException{
       // some code goes here
       if (!what.equals(Op.COUNT)) {
           throw new IllegalArgumentException("the what not COUNT in StringAggregator");
       }
       gbfield_ = gbfield;
       gbfieldtype_ = gbfieldtype;
       afield_ = afield;
       what_ = what;
       desc_ = null;
       GpMap_ = new HashMap<Field,ArrayList<StringField>>();
}

其中TupleDesc的实现主要区分好两种情况,一种是一个field的情况,这种情况通常是gbfield_为-1或者该索引不存在的情况,除此之外则是两个field的情况.关于mergeTupleIntoGroup这个函数主要的作用在于将一个特定的Tuple加入到该类维护的容器(GpMap)中,在这里仍然注意对于gpfield两种情况的区分,如果gpfield是-1的话,则map中对应的key为null.

public void mergeTupleIntoGroup(Tuple tup) throws NoSuchElementException {
        // some code goes here
        setTupleDesc(tup);
        Field gpfield;
        StringField afield;
        if (gbfield_ != -1) {
            gpfield = tup.getField(gbfield_);
            if (gpfield == null) {
                throw new NoSuchElementException("not have the gbfield in tup from mergeTupleIntoGroup");
            }
        } else {
            gpfield = null;
        }
        afield = (StringField) tup.getField(afield_);

        if (!GpMap_.containsKey(gpfield)) {
            GpMap_.put(gpfield, new ArrayList<StringField>());
        }
        GpMap_.get(gpfield).add(afield); //最后加入到group feild为key的StringFeild序列中.
    } 

此外,StringAggregator需要有一个完整的迭代器的实现.这个迭代器是基于GpMap这个“容器”进行实现的.其数据成员如下:

private StringAggregator aggregator_;
private Iterator<Field> gvalueIt_;
private Field currGroup_;

public StringAggregatorOpIterator(StringAggregator saggregator) {
      aggregator_ = saggregator;
      gvalueIt_ = null;
      currGroup_ = null;
 }
public void open() throws DbException, TransactionAbortedException{
      gvalueIt_ = aggregator_.GpMap_.keySet().iterator();
      currGroup_ = null;
 }

aggregator指的是该迭代器所属的StringAggregator,gvalueIt则是aggregator中的GpMap中key的迭代器,迭代器中每一项也就对应一个group.因此该迭代器每次遍历的单位就是一个group.其中关于next的实现如下:

public Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {
            Tuple tuple = null;
            tuple = new Tuple(aggregator_.desc_);
            currGroup_ = gvalueIt_.next();
            IntField intfield = new IntField(aggregator_.GpMap_.get(currGroup_).size());

            if (aggregator_.gbfield_ == -1 && aggregator_.GpMap_.containsKey(null)) { //这个时候就只会固定在第一个null为key的里面
                tuple.setField(0,intfield);
            } else {
                tuple.setField(0,currGroup_);
                tuple.setField(1,intfield);
            }
            return tuple;
}
public boolean hasNext() throws DbException, TransactionAbortedException {
            return gvalueIt_.hasNext();
}

其中返回的Tuple的desc再aggregator中就已经确定,其中value,直接通过aggregator_.GpMap_.get(currGroup_).size()即可获得.至于hasNext的实现,只要还有next group就可以,因此也就是返回gvalueIt_.hasNext().

在练习2中Aggregate仍然是一个operator,其基本模式与上面的Fielter,Join基本一样.其作用在于对于某个给定的关系执行某种聚类操作.数据成员如下:

private OpIterator child_; //执行聚类操作的关系
private Aggregator aggregator_; //代表聚类操作
private int gfield_; //关系中的group field
private int afield_; //关系中的aggregate field
private Aggregator.Op aop_; //aggregator操作的类型
private TupleDesc desc_; //返回Tuple的描述符
private OpIterator opIt_; //aggregator中的迭代器

这里的思路,主要在于将child_中的Tuple一个个Merge到aggregator中,然后借助该aggregator的迭代器实现fetchNext等迭代器的操作.其中fetchNext实现如下:

protected Tuple fetchNext() throws TransactionAbortedException, DbException {
        // some code goes here
        //return null;
        if (opIt_.hasNext()) {
            return opIt_.next();
        }
        return null;
}
public void open() throws NoSuchElementException, DbException,
            TransactionAbortedException {
        // some code goes here
        child_.open();
        if (opIt_ == null) {
            MergeToAggregator();
            opIt_ = aggregator_.iterator();
        }
        opIt_.open();
        super.open();
    }
    public void close() {
        // some code goes here
        opIt_.close();
        child_.close();
        super.close();
    }

opIt_被填充好后,直接根据opIt_的next实现即可.open的实现中,需要对opIt_进行初始化,其中MergeToAggregator用于将child_这个关系中的Tuple一个个地加入到aggregator中.

3) Exercise 3

练习3中,需要实现对某个表进行insert/delete操作,其实现遵循HeapPage->HeapFile->BufferPool自底向上的线索.在HeapPage中:

public void insertTuple(Tuple t) throws DbException {
    // some code goes here
    // not necessary for lab1
    TupleDesc tupleDesc = t.getTupleDesc();
    if (getNumEmptySlots() == 0 || !tupleDesc.equals(td)) { //外部调用时需要进行检查
        throw new DbException("cant insert tuple to HeapPage");
    }
    int k; //找出一个空闲位置
    for (k = 0;k < numSlots ; k ++) {
        if (!isSlotUsed(k)) {
            break;
        }
    }
    t.resetRecordId(pid,k);
    tuples[k] = t;
    markSlotUsed(k,true);
}

在这里插入一个insert需要遍历bitmap中的每一个bit,当遇到空闲位置则插入,插入时需要对Tuple中的Record进行重置,从而适应最终插入的位置,设置对应的tuples,最终还需要标记Used.关于delete的操作与此类似,不同的是直接定位出要删除的Tuple的位置,然后直接将SlotUsed进行设置即可.在HeapPage中还有一些关于Dirty的操作,这用于BufferPool中的flush,evit等操作.其实现比较简单.

   public void markDirty(boolean dirty, TransactionId tid) {
       // some code goes here
// not necessary for lab1
       isdirty_ = dirty;
       if (dirty) {
           lastTid_ = tid;
       }
   }
   public TransactionId isDirty() {
       // some code goes here
// Not necessary for lab1
       //return lastTid_;
       if (isdirty_) {
           return lastTid_;
       } else {
           return null;
       }
   }

HeapFile中,delete操作比较简单,将某个Tuple对应的PageId提取出来后,然后借助BufferPool.getPage读取出来该页,然后调用该页的deletTuple即可.而insert相对而言比较复杂.其实现如下:

public List<Page> insertTuple(TransactionId tid, Tuple t)
            throws DbException, IOException, TransactionAbortedException {
        // some code goes here
        // not necessary for lab1
        List<Page> list = new ArrayList<Page>();
        HeapPage result = null;
        RecordId recordId = t.getRecordId();
        int pageNum = numPages(),i;
        for (i = 0; i <pageNum; i ++) {
            HeapPageId pageId = new HeapPageId(heapId_,i);
            HeapPage page = (HeapPage) Database.getBufferPool().getPage(tid,pageId,Permissions.READ_WRITE);
            if (page.getNumEmptySlots() > 0) {
                page.insertTuple(t);
                result = page;
                break;
            }
        }
        if (i >= pageNum) { //需要增加新的页,前面的已经写满了
            byte[] data = new byte[BufferPool.getPageSize()];
            HeapPageId heapPageId = new HeapPageId(heapId_,i);
            HeapPage newPgae = new HeapPage(heapPageId,data);
            newPgae.insertTuple(t);
            writePage(newPgae);
            result = newPgae;
        }
        list.add(result);
        return list;
    }

需要遍历该PageFile中当前已有的Page,当遍历到仍然有空位的Page就执行insert,如果当前已有的Page中并没有空余的,就创建一个Page,然后将其插入到新的Page中,并将这个Page写入到该文件.为什么返回值是List或者ArrayList呢?这个返回值用于被BufferPool获取,当BufferPool获取后,就知道哪些页发生了更新的动作,进而将BufferPool中当前已有的对应的Page进行更新.其中BufferPool中关于insertTuple和deleteTuple的操作是基于HeapFile的insert/delete实现的.最后将返回的List<page>进行更新(调用updatePagePool).

private void updatePagePool(List<Page> pagelist,TransactionId tid) throws DbException{
    int size = pagelist.size();
    for (int i = 0 ; i < size; i ++) {
        HeapPage page = (HeapPage) pagelist.get(i);
        page.markDirty(true,tid);
        int hashcode = page.pid.hashCode(); //注意是要得出pid的hashcode
        if (pages_.containsKey(hashcode)) {
            Page oldPage = pages_.get(hashcode);
            oldPage = page; //更新
        } else {
            //暂且先不考虑超出的情况
            if(pages_.size() >= maxPageNum_)  {
                evictPage();
            }
            pages_.put(hashcode,page);
        }
    }
}

关于flushPage,flushAllPages的实现,这里这里获取该Page对应的文件后,根据isDirty进行writePage即可.

private synchronized  void flushPage(PageId pid) throws IOException {
    // some code goes here
    // not necessary for lab1
    int tableId = pid.getTableId();
    HeapFile file = (HeapFile) Database.getCatalog().getDatabaseFile(tableId);
    int hashCode = pid.hashCode();
    if (pages_.containsKey(hashCode)) {
        HeapPage page = (HeapPage) pages_.get(hashCode);
        if (page.isDirty() != null) {
            file.writePage(page);
        }
    } //写入
}

4) Exercise 4

实现Delete,Insert操作,这里的实现仍然遵循Fielter,Join等模式,实现open,close,fetchNext等.两者的实现差不多,只说明Insert.

private TransactionId tid_;
private OpIterator child_; 
private int tableId_;
private TupleDesc desc_;
private boolean isNext_;

数据成员如上,这个迭代器用于将child_中的所有Tuple插入到tableId对应的表中,需要特别注意它的next的意义,一个next操作则表示将整个child_插入到table中.其返回Tuple的类型只有一个field,表示的是成功插入的Tuple的数量.

下面是fetchNext的实现:

protected Tuple fetchNext() throws TransactionAbortedException, DbException {
    // some code goes here
    if (!isNext_) {
        return null;
    }
    int cnt  = 0;
    while (child_.hasNext()) {
        Tuple tuple = child_.next();
        boolean insertOk = true;
        try {
            Database.getBufferPool().insertTuple(tid_,tableId_,tuple);
        } catch (Exception e) {
            insertOk = false;
        }
        if (insertOk) {
            cnt += 1;
        }
    }
    Tuple result = new Tuple(desc_);
    IntField field = new IntField(cnt);
    result.setField(0,field);
    isNext_ = false;
    return result;
}

如果之前已经通过fetchNext执行过一次插入了,就返回null,这告知上面的hasNext是false的.在近插入时则遍历child,逐个借助insertTuple进行插入,最终返回插入后的结果.这里的insert则是借助BufferPool的insert接口实现的.

5) Exercise 5

最后这一部分主要是BufferPool中关于置换算法的实现,这里的实现采用FIFO的置换方法,实现起来非常简单:

//增加的新的数据成员
private LinkedList<Integer> fifoQueue_;
//evitPage实现
private synchronized  void evictPage() throws DbException {
        // some code goes here
        // not necessary for lab1
        int hashcode = fifoQueue_.pop();
        HeapPage page = (HeapPage) pages_.get(hashcode);
        HeapPageId pageId = page.getId();
        try {
            flushPage(pageId);
        } catch (Exception e) {
            System.out.printf("flushPage error in evictPage from pagepool");
        }
        pages_.remove(hashcode); //移除
    }

fifoQueue_在每次有新的Page加入后都会在里面加入新的Page.注意对于驱逐出来的Page注意flush到File中.

3.总结

lab2中讲opertaor定义为迭代器的形式,此外其中next和hasNext的处理很精妙,这使得派生类都可以共用next,hasNext,只需要重写fetchNext,open等.