-
Notifications
You must be signed in to change notification settings - Fork 10
/
two_pc.rs
124 lines (106 loc) · 3.61 KB
/
two_pc.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::collections::BTreeMap;
use crate::apaxos::greater_equal::GreaterEqual;
use crate::commonly_used::quorum_set::all::All;
use crate::commonly_used::rebuild::Partitioned;
use crate::commonly_used::transport::DirectCall;
use crate::Types;
/// Implement TwoPC with abstract-paxos
#[derive(Debug, Clone)]
struct TwoPC {}
/// Unlike Paxos rebuilding proposal from a single [`Acceptor`]'s data, TwoPC
/// can rebuild the proposal data from [`Acceptor`]s' data with different
/// `accept_time`(vballot in paxos), but with the same propose-time.
///
/// ```text
/// p3: rebuild from v2,v1,v1,v1,v1; because they have the same propose-time
///
/// p3 |
/// | +------------+ a1..a5
/// p2 | v2
/// | +------------+ a3..a7
/// p1 | v1 v1 v1 v1 v1
/// | +------------+ a3..a7
/// ---|----------------------
/// | a1 a2 a3 a4 a5 a6 a7
/// ```
impl Types for TwoPC {
type Time = TwoPCTime;
type Value = BTreeMap<u64, String>;
type Part = String;
type QuorumSet = All<TwoPC>;
type Transport = DirectCall<TwoPC>;
type Distribute = Partitioned<TwoPC>;
}
/// The pseudo time for TwoPC represents the mutual exclusion between
/// transactions.
///
/// There is a GE relation if either:
/// 1. they belong to the same txn, or
/// 2. the other txn has not yet locked the resource.
///
/// TwoPCTime is not transitive and can form a cycle.
#[derive(Clone, Copy, Debug, Default)]
#[derive(PartialEq, Eq)]
struct TwoPCTime {
txn_id: u64,
locked: bool,
}
/// There is a GE relation if either:
/// 1. they belong to the same txn, or
/// 2. the other txn has not yet locked the resource.
impl GreaterEqual for TwoPCTime {
fn greater_equal(&self, other: &Self) -> bool {
self.txn_id == other.txn_id || !other.locked
}
}
impl TwoPCTime {
pub fn new(txn_id: u64) -> Self {
Self {
txn_id,
locked: true,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use maplit::btreemap;
use crate::apaxos::acceptor::Acceptor;
use crate::apaxos::proposer::Proposer;
use crate::commonly_used::quorum_set::all::All;
use crate::commonly_used::rebuild::Partitioned;
use crate::commonly_used::transport::DirectCall;
use crate::implementations::two_pc::TwoPC;
use crate::implementations::two_pc::TwoPCTime;
use crate::APaxos;
#[test]
fn test_two_pc() {
let acceptor_ids = [1, 2, 3];
let mut acceptors = BTreeMap::new();
for id in acceptor_ids {
acceptors.insert(id, Acceptor::default());
}
let quorum_set = All::new(acceptor_ids);
let transport = DirectCall::new(acceptors.clone());
let rebuild = Partitioned::<TwoPC>::new(acceptor_ids);
let mut apaxos = APaxos::<TwoPC>::new(acceptor_ids, quorum_set, rebuild, transport);
let t1 = TwoPCTime::new(1);
let tx1_data = btreemap! {1u64=>s("hello"), 2u64=>s("world"), 3u64=>s("")};
let mut proposer = Proposer::new(&mut apaxos, t1, tx1_data.clone());
let committed = proposer.run();
assert_eq!(committed.propose_time, t1);
assert_eq!(committed.data, tx1_data);
// Tx2 is expected to fail:
{
let t2 = TwoPCTime::new(2);
let tx2_data = btreemap! {1u64=>s("bye"), 2u64=>s("planet"), 3u64=>s("earth")};
let mut proposer = Proposer::new(&mut apaxos, t2, tx2_data.clone());
let committed = proposer.run();
assert_eq!(committed.propose_time, t2);
assert_eq!(committed.data, tx2_data);
}
}
fn s(x: impl ToString) -> String {
x.to_string()
}
}