summaryrefslogtreecommitdiff
path: root/src/mon/Paxos.h
blob: 6f255bd3a4d871d03cd35e7572b45c1d85566b97 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software 
 * Foundation.  See file COPYING.
 * 
 */

/*
time---->

cccccccccccccccccca????????????????????????????????????????
cccccccccccccccccca????????????????????????????????????????
cccccccccccccccccca???????????????????????????????????????? leader
cccccccccccccccccc????????????????????????????????????????? 
ccccc?????????????????????????????????????????????????????? 

last_committed

pn_from
pn

a 12v 
b 12v
c 14v
d
e 12v


*/


/*
 * NOTE: This libary is based on the Paxos algorithm, but varies in a few key ways:
 *  1- Only a single new value is generated at a time, simplifying the recovery logic.
 *  2- Nodes track "committed" values, and share them generously (and trustingly)
 *  3- A 'leasing' mechism is built-in, allowing nodes to determine when it is safe to 
 *     "read" their copy of the last committed value.
 *
 * This provides a simple replication substrate that services can be built on top of.
 * See PaxosService.h
 */

#ifndef CEPH_MON_PAXOS_H
#define CEPH_MON_PAXOS_H

#include "include/types.h"
#include "mon_types.h"
#include "include/buffer.h"
#include "messages/PaxosServiceMessage.h"
#include "msg/msg_types.h"

#include "include/Context.h"

#include "common/Timer.h"

class Monitor;
class MMonPaxos;
class Paxos;


// i am one state machine.
class Paxos {
  Monitor *mon;

  // my state machine info
  int machine_id;
  const char *machine_name;

  friend class Monitor;
  friend class PaxosService;
  friend class PaxosObserver;



  // LEADER+PEON

  // -- generic state --
public:
  const static int STATE_RECOVERING = 1;  // leader|peon: recovering paxos state
  const static int STATE_ACTIVE     = 2;  // leader|peon: idle.  peon may or may not have valid lease
  const static int STATE_UPDATING   = 3;  // leader|peon: updating to new value
  static const char *get_statename(int s) {
    switch (s) {
    case STATE_RECOVERING: return "recovering";
    case STATE_ACTIVE: return "active";
    case STATE_UPDATING: return "updating";
    default: assert(0); return 0;
    }
  }

private:
  int state;

public:
  bool is_recovering() const { return state == STATE_RECOVERING; }
  bool is_active() const { return state == STATE_ACTIVE; }
  bool is_updating() const { return state == STATE_UPDATING; }

private:
  // recovery (phase 1)
  version_t first_committed_any;
  version_t first_committed;
  version_t last_pn;
  version_t last_committed;
  utime_t last_commit_time;
  version_t accepted_pn;
  version_t accepted_pn_from;

  // active (phase 2)
  utime_t lease_expire;
  list<Context*> waiting_for_active;
  list<Context*> waiting_for_readable;

  version_t latest_stashed;

  // -- leader --
  // recovery (paxos phase 1)
  unsigned   num_last;
  version_t  uncommitted_v;
  version_t  uncommitted_pn;
  bufferlist uncommitted_value;

  Context    *collect_timeout_event;

  // active
  set<int>   acked_lease;
  Context    *lease_renew_event;
  Context    *lease_ack_timeout_event;
  Context    *lease_timeout_event;

  // updating (paxos phase 2)
  bufferlist new_value;
  set<int>   accepted;

  Context    *accept_timeout_event;

  list<Context*> waiting_for_writeable;
  list<Context*> waiting_for_commit;

  // observers
  struct Observer {
    entity_inst_t inst;
    version_t last_version;
    utime_t timeout;
    Observer(entity_inst_t& ei, version_t v) : inst(ei), last_version(v) { }
  };
  map<entity_inst_t, Observer *> observers;

  //synchronization warnings
  utime_t last_clock_drift_warn;
  int clock_drift_warned;


  class C_CollectTimeout : public Context {
    Paxos *paxos;
  public:
    C_CollectTimeout(Paxos *p) : paxos(p) {}
    void finish(int r) {
      paxos->collect_timeout();
    }
  };

  class C_AcceptTimeout : public Context {
    Paxos *paxos;
  public:
    C_AcceptTimeout(Paxos *p) : paxos(p) {}
    void finish(int r) {
      paxos->accept_timeout();
    }
  };

  class C_LeaseAckTimeout : public Context {
    Paxos *paxos;
  public:
    C_LeaseAckTimeout(Paxos *p) : paxos(p) {}
    void finish(int r) {
      paxos->lease_ack_timeout();
    }
  };

  class C_LeaseTimeout : public Context {
    Paxos *paxos;
  public:
    C_LeaseTimeout(Paxos *p) : paxos(p) {}
    void finish(int r) {
      paxos->lease_timeout();
    }
  };

  class C_LeaseRenew : public Context {
    Paxos *paxos;
  public:
    C_LeaseRenew(Paxos *p) : paxos(p) {}
    void finish(int r) {
      paxos->lease_renew_timeout();
    }
  };


  void collect(version_t oldpn);
  void handle_collect(MMonPaxos*);
  void handle_last(MMonPaxos*);
  void collect_timeout();

  void begin(bufferlist& value);
  void handle_begin(MMonPaxos*);
  void handle_accept(MMonPaxos*);
  void accept_timeout();

  void commit();
  void handle_commit(MMonPaxos*);
  void extend_lease();
  void handle_lease(MMonPaxos*);
  void handle_lease_ack(MMonPaxos*);

  void lease_ack_timeout();    // on leader, if lease isn't acked by all peons
  void lease_renew_timeout();  // on leader, to renew the lease
  void lease_timeout();        // on peon, if lease isn't extended

  void cancel_events();

  version_t get_new_proposal_number(version_t gt=0);
  
  void warn_on_future_time(utime_t t, entity_name_t from);

public:
  Paxos(Monitor *m,
	int mid) : mon(m),
		   machine_id(mid), 
		   machine_name(get_paxos_name(mid)),
		   state(STATE_RECOVERING),
		   collect_timeout_event(0),
		   lease_renew_event(0),
		   lease_ack_timeout_event(0),
		   lease_timeout_event(0),
		   accept_timeout_event(0),
		   clock_drift_warned(0) { }

  const char *get_machine_name() const {
    return machine_name;
  }

  void dispatch(PaxosServiceMessage *m);

  void init();

  void election_starting();
  void leader_init();
  void peon_init();

  void share_state(MMonPaxos *m, version_t first_committed, version_t last_committed);
  void store_state(MMonPaxos *m);


  // -- service interface --
  void wait_for_active(Context *c) {
    waiting_for_active.push_back(c);
  }

  void trim_to(version_t first);
  
  // read
  version_t get_version() { return last_committed; }
  bool is_readable(version_t seen=0);
  bool read(version_t v, bufferlist &bl);
  version_t read_current(bufferlist &bl);
  void wait_for_readable(Context *onreadable) {
    //assert(!is_readable());
    waiting_for_readable.push_back(onreadable);
  }

  // write
  bool is_leader();
  bool is_writeable();
  void wait_for_writeable(Context *c) {
    assert(!is_writeable());
    waiting_for_writeable.push_back(c);
  }

  bool propose_new_value(bufferlist& bl, Context *oncommit=0);
  void wait_for_commit(Context *oncommit) {
    waiting_for_commit.push_back(oncommit);
  }
  void wait_for_commit_front(Context *oncommit) {
    waiting_for_commit.push_front(oncommit);
  }

  // if state values are incrementals, it is usefult to keep
  // the latest copy of the complete structure.
  void stash_latest(version_t v, bufferlist& bl);
  version_t get_latest(bufferlist& bl);

  void register_observer(entity_inst_t inst, version_t v);
  void update_observers();
};



#endif