Libra 源码分析:内存池 mempool 模块解读 – 2

mempool 模块对于 Tx 的管理核心全部集中在 TransactionStore 这个结构,他对外对接的是 CoreMemPool 结构.
从 TransactionStore 可以清楚看出缓冲池中 Tx 增删改查的逻辑.

作为缓冲池,我们先大致说一下这几个功能要考虑的问题.

1. TransactionStore 中的增删改查

1.1 增以及改

增就是向缓冲池中添加新的 Tx, 改则是修改已经在缓冲池中的 Tx 了.
先说为什么会有修改,主要因素实际上只有一个就是 GasPrice, 对于同一个账户的同一个 Sequence_number(就是以太坊中的 Nonce) 的 Tx, 如果存在 Gas 更高的,就会被替换。对于公链有这样的需求,比如想被更快打包。那么对于联盟链也有这样的需求?并且 Libra 号称是 1000TPS. 为什么有这个功能希望大家能够一起来讨论这个问题.

说完了改,我们重点考虑增:
增主要是因为收到了用户提交的新的 Tx, 增的时候就要进行分类,分成两类:可以立即被打包的 Tx 和不能被立即打包的 Tx. 原因也只有一个就是 Sequence_number 是否连得起来.
增的时候还要考虑其他问题,主要就是为了服务删查,如同数据库一样,因为如果没有删查需求,那很简单,一个文件追加写就 ok 了。正是因为了有了删查需求,才会有各种索引.

TransactionStore 六中索引来查找删除 Tx, 后面我们会展开.

1.2 删

  1. 当缓冲池中的 Tx 被打包以后,肯定要删;
  2. 当一个 Tx 用户指定的过期时间到了,也要删. (每个 Tx 都有一个过期时间,这个是 Libra 的独创,在比特币以太坊源码中是没有的);
  3. 当一个 Tx 在缓冲池中呆很久都不能被打包,也要删.

1.3 查

  1. 新的 Tx 来的时候要做重复性检查,这是要查
  2. 当共识模块需要下一块可以被打包的交易,这时候要快速查
  3. 节点之间需要同步 Tx, 那么要查哪些 Tx 已经同步,哪些没有,同步到了什么位置.

2. 增删改查的实现

2.1 用到的索引

为了管理 Tx, 在 TransactionStore 同时用到了好几种索引方式,简单介绍一下.

2.1.1 PriorityIndex

看名字,就是一个优先级队列。它内部用 BTreeSet 进行组织,排序方式则是 gas_price,expiration_time,address,sequence_number. 也就是 gas_price 高的优先,其次是 expiration_time 等.

1
2
3
pub struct PriorityIndex {
    data: BTreeSet<OrderedQueueKey>,
}

顺便说一下 rust 中的运算符重载,这个和 C++ 中是一样的,如果一个自定义结构想实现 <,>,==, 那么可以实现 Ord 这个 trait, 为了直观,我们这里展示一下 OrderedQueueKey 对 Ord 的实现。其他几种索引方式大同小异.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#[derive(Eq, PartialEq, Clone, Debug, Hash)]
pub struct OrderedQueueKey {
    pub gas_price: u64,
    pub expiration_time: Duration,
    pub address: AccountAddress,
    pub sequence_number: u64,
}

impl PartialOrd for OrderedQueueKey {
    fn partial_cmp(&self, other: &OrderedQueueKey) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for OrderedQueueKey {
    fn cmp(&self, other: &OrderedQueueKey) -> Ordering {
        match self.gas_price.cmp(&other.gas_price) {
            Ordering::Equal => {}
            ordering => return ordering,
        }
        match self.expiration_time.cmp(&other.expiration_time).reverse() {
            Ordering::Equal => {}
            ordering => return ordering,
        }
        match self.address.cmp(&other.address) {
            Ordering::Equal => {}
            ordering => return ordering,
        }
        self.sequence_number.cmp(&other.sequence_number).reverse()
    }
}

2.1.2 TTLIndex

这个是按照过期时间排序,过期时间总共有两种,一种是在缓冲池中呆太久了,另一种是用户指定的过期时间.

1
2
3
4
pub struct TTLIndex {
    data: BTreeSet<TTLOrderingKey>,
    get_expiration_time: Box<dyn Fn(&MempoolTransaction) -> Duration + Send + Sync>,
}

其中 get_expiration_time 这个回调函数就是用来从MempoolTransaction 获取不同的时间用的.

2.1.3 TimelineIndex

这个索引方式主要是服务节点间同步,为每一个 Tx 都给与一个唯一的编号,这样向其他节点推送 Tx 的时候只需记住一个整数就知道下次从什么位置开始推送了.

1
2
3
4
5
6
7
8
9
10
11
12
/// TimelineIndex is ordered log of all transactions that are "ready" for broadcast
/// we only add transaction to index if it has a chance to be included in next consensus block
/// it means it's status != NotReady or it's sequential to other "ready" transaction
///
/// It's represented as Map <timeline_id, (Address, sequence_number)>
///    where timeline_id is auto increment unique id of "ready" transaction in local Mempool
///    (Address, sequence_number) is a logical reference to transaction content in main storage
///  //按照添加顺序对Tx排序,用于节点间的mempool的同步
pub struct TimelineIndex {
    timeline_id: u64, //无异这个结构是单线程的,单增
    timeline: BTreeMap<u64, (AccountAddress, u64)>,
}

这里的 timeline_id 则是那个唯一的编号,timeline 这是 timeline_id 到 Tx ID 的映射。上一篇文章我们提到了 Libra 中 Tx 的这个特征.

2.1.4 ParkingLotIndex

ParkingLotIndex 主要是记录那些因为 seq_number 不连续还不能被打包的 Tx. 一旦来了新的交易就有可能让不连续的 seq_number 变成连续的。或者打包的块中更新了 seq_number, 从而也可能连起来.
后一种情况可能不太直观,比如我本地缓冲池中有 AccountA 的 Tx [2,3,5,6,7], 因为 4 不存在,导致 [5,6,7] 不可能被打包。但是突然链上已经被打包的交易中出现了 4, 这就意味着 [5,6,7] 都已经 Ready 了,4 只是因为同步延迟我没有收到而已.

1
2
3
4
5
6
7
8
9
10
11
/// ParkingLotIndex keeps track of "not_ready" transactions
/// e.g. transactions that can't be included in next block
/// (because their sequence number is too high)
/// we keep separate index to be able to efficiently evict them when Mempool is full
/// 暂时不能打包的交易,seq没有连起来,有间隔
pub struct ParkingLotIndex {
    data: BTreeSet<TxnPointer>,
}
/// Logical pointer to `MempoolTransaction`
/// Includes Account's address and transaction sequence number
pub type TxnPointer = (AccountAddress, u64);

2.2 TransactionStore 的定义

有了上面的脚手架,再来看 TransactionStore 就会容易理解的多.
说句题外话,有了 Map 这个结构,数据组织以及管理真是轻松了很多,怪不得 Go 要把 Map 作为内置的.
我们提到的各种 Index, 都是用的有序 BTreeMapBTreeSet 本身就是一个特殊的 BTreeMap.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/// TransactionStore is in-memory storage for all transactions in mempool
pub struct TransactionStore {
    // main DS
    transactions: HashMap<AccountAddress, AccountTransactions>, /* 地址=>{seq=>Tx} 二重map,所有收集到的合法的Tx */

    // indexes
    priority_index: PriorityIndex, /* 按照gas_price,expiration_time,address,
                                    * sequence_number顺序排序的所有可以打包的Tx */
    // TTLIndex based on client-specified expiration time
    expiration_time_index: TTLIndex, /* 这个过期时间是用户提交的,这个时间虽然是Duration,
                                      * 但是其实也是绝对时间,保存所有合法的Tx */
    // TTLIndex based on system expiration time
    // we keep it separate from `expiration_time_index` so Mempool can't be clogged
    //  by old transactions even if it hasn't received commit callbacks for a while
    system_ttl_index: TTLIndex, /* 这个时间是由mempool控制,
                                 * 在进入缓冲池的时候会设置成当时的时间加上过期时间,
                                 * 保存所有的合法Tx */
    timeline_index: TimelineIndex, /* 里面保存的timeline_id,用于mempool之间的Tx同步,
                                    * 这里面只会保存来自AC模块的交易,
                                    * 而对来自其他节点推送来的交易不在进行广播 */
    // keeps track of "non-ready" txns (transactions that can't be included in next block)
    parking_lot_index: ParkingLotIndex, //暂时不满足条件,不能打包的Tx

    // configuration
    capacity: usize,
    capacity_per_user: usize,
}

3. TransactionStore 完整的实现

3.1 按照 Tx ID 查

1
2
3
4
5
6
7
8
9
10
11
12
13
/// fetch transaction by account address + sequence_number
    pub(crate) fn get(
        &self,
        address: &AccountAddress,
        sequence_number: u64,
    ) -> Option<SignedTransaction> {
        if let Some(txns) = self.transactions.get(&address) {
            if let Some(txn) = txns.get(&sequence_number) {
                return Some(txn.txn.clone());
            }
        }
        None
    }

3.2 增,改的实现

增很容易,关键是增的时候要考虑删查,所以各种索引都要考虑好.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/// insert transaction into TransactionStore
    /// performs validation checks and updates indexes
    pub(crate) fn insert(
        &mut self,
        txn: MempoolTransaction,
        current_sequence_number: u64, /* current_sequence_number-1表示已经达成共识的seq
                                       * number,之前的都没必要缓存了. 这是下一个ready的number */
    ) -> MempoolAddTransactionStatus {
        //增删改查中的改
        let (is_update, status) = self.check_for_update(&txn);
        if is_update {
            return status;
        }
        if self.check_if_full() {
            return MempoolAddTransactionStatus::MempoolIsFull;
        }

        let address = txn.get_sender();
        let sequence_number = txn.get_sequence_number();

        let txns = self
            .transactions
            .entry(address)
            .or_insert_with(AccountTransactions::new);

        // capacity check
        if txns.len() >= self.capacity_per_user {
            return MempoolAddTransactionStatus::TooManyTransactions;
        }

        //新增很容易,关键是各种索引也要跟着建立
        // insert into storage and other indexes
        println!("insert system ttl");
        self.system_ttl_index.insert(&txn); // 
        println!("insert expiration time");
        self.expiration_time_index.insert(&txn);
        txns.insert(sequence_number, txn);
        OP_COUNTERS.set("txn.system_ttl_index", self.system_ttl_index.size());

        self.process_ready_transactions(&address, current_sequence_number);
        MempoolAddTransactionStatus::Valid
    }

    /// fixes following invariants:
    /// all transactions of given account that are sequential to current sequence number
    /// supposed to be included in both PriorityIndex (ordering for Consensus) and
    /// TimelineIndex (txns for SharedMempool)
    /// Other txns are considered to be "non-ready" and should be added to ParkingLotIndex
    fn process_ready_transactions(
        &mut self,
        address: &AccountAddress,
        current_sequence_number: u64, /* 表示当前已经ready的那个sequence_number,
                                       * 后续可能还有也可能没有tx
                                       * ready,比如3ready了,那么本来没有ready的4,5都应该ready */
    ) {
        if let Some(txns) = self.transactions.get_mut(&address) {
            let mut sequence_number = current_sequence_number;
            while let Some(txn) = txns.get_mut(&sequence_number) {
                self.priority_index.insert(txn);

                if txn.timeline_state == TimelineState::NotReady {
                    self.timeline_index.insert(txn);
                }
                sequence_number += 1;
            }
             //这个有必要进行?原来的没有ready的,肯定也是没有ready
            //增的时候没必要做,但是`commit_transaction`的时候可能会让没有ready的变成ready
            for (_, txn) in txns.range_mut((Bound::Excluded(sequence_number), Bound::Unbounded)) {
                match txn.timeline_state {
                    TimelineState::Ready(_) => {}
                    _ => {
                        self.parking_lot_index.insert(&txn);
                    }
                }
            }
        }
    }

3.3 删

3.3.1 删被打包的交易

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/// handles transaction commit
  /// it includes deletion of all transactions with sequence number <= `sequence_number`
  /// and potential promotion of sequential txns to PriorityIndex/TimelineIndex
  pub(crate) fn commit_transaction(&mut self, account: &AccountAddress, sequence_number: u64) {
      if let Some(txns) = self.transactions.get_mut(&account) {
          // remove all previous seq number transactions for this account
          // This can happen if transactions are sent to multiple nodes and one of
          // nodes has sent the transaction to consensus but this node still has the
          // transaction sitting in mempool
          //小于等于sequence_number都需要移除了
          let mut active = txns.split_off(&(sequence_number + 1));
          let txns_for_removal = txns.clone();
          txns.clear();
          txns.append(&mut active);

          for transaction in txns_for_removal.values() {
              self.index_remove(transaction);
          }
      }
      self.process_ready_transactions(account, sequence_number + 1);
  }

3.3.2 删迟迟不能被打包的交易

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/// GC old transactions
pub(crate) fn gc_by_system_ttl(&mut self) {
    //清除所有过期的交易,这里虽然设置的是now,
    // 但是因为加入的时候都会在过期时间上加上一段时间`system_transaction_timeout`,
    // 因此不用担心会清理掉所有的交易
    let now = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .expect("init timestamp failure");

    self.gc(now, true);
}
 fn gc(&mut self, now: Duration, by_system_ttl: bool) {
    let (index_name, index) = if by_system_ttl {
        ("gc.system_ttl_index", &mut self.system_ttl_index)
    } else {
        ("gc.expiration_time_index", &mut self.expiration_time_index)
    };
    OP_COUNTERS.inc(index_name);

    for key in index.gc(now) {
        if let Some(txns) = self.transactions.get_mut(&key.address) {
            // mark all following transactions as non-ready
            //比如本来seq={3,4,5},但是3过期了,那么{4,5}也不能打包了
            for (_, t) in txns.range((Bound::Excluded(key.sequence_number), Bound::Unbounded)) {
                self.parking_lot_index.insert(&t);
                self.priority_index.remove(&t);
                self.timeline_index.remove(&t);
            }
            if let Some(txn) = txns.remove(&key.sequence_number) {
                let is_active = self.priority_index.contains(&txn);
                let status = if is_active { "active" } else { "parked" };
                OP_COUNTERS.inc(&format!("{}.{}", index_name, status));
                self.index_remove(&txn);
            }
        }
    }
    OP_COUNTERS.set("txn.system_ttl_index", self.system_ttl_index.size());
}

3.3.3 删过了用户指定时间

这里需要说的是这里比较的时间并不是本地时间,而是经过 Validator 集体参与生成的块中的时间.
虽然 Libra 中 Block 的概念被弱化了很多,但是块时间这个因素还是要所有 Validator 签名的.

1
2
3
4
/// GC old transactions based on client-specified expiration time
pub(crate) fn gc_by_expiration_time(&mut self, block_time: Duration) {
    self.gc(block_time, false);
}

结语

不得不说大厂出来的代码质量高,很精炼。也没多少行 core_mempool 总共也就五个文件,一千行代码不到.
其次是因为是联盟链,也不用考虑分叉,所以这部分代码相比以太坊简化了不少。所谓的好与不好,主要看能不能满足需求吧.

本文作者为深入浅出共建者:白振轩

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享