%% The contents of this file are subject to the Mozilla Public License %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License at %% http://www.mozilla.org/MPL/ %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the %% License for the specific language governing rights and limitations %% under the License. %% %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. %% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. %% -module(gm). %% Guaranteed Multicast %% ==================== %% %% This module provides the ability to create named groups of %% processes to which members can be dynamically added and removed, %% and for messages to be broadcast within the group that are %% guaranteed to reach all members of the group during the lifetime of %% the message. The lifetime of a message is defined as being, at a %% minimum, the time from which the message is first sent to any %% member of the group, up until the time at which it is known by the %% member who published the message that the message has reached all %% group members. %% %% The guarantee given is that provided a message, once sent, makes it %% to members who do not all leave the group, the message will %% continue to propagate to all group members. %% %% Another way of stating the guarantee is that if member P publishes %% messages m and m', then for all members P', if P' is a member of %% the group prior to the publication of m, and P' receives m', then %% P' will receive m. %% %% Note that only local-ordering is enforced: i.e. if member P sends %% message m and then message m', then for-all members P', if P' %% receives m and m', then they will receive m' after m. Causality %% ordering is _not_ enforced. I.e. if member P receives message m %% and as a result publishes message m', there is no guarantee that %% other members P' will receive m before m'. %% %% %% API Use %% ------- %% %% Mnesia must be started. Use the idempotent create_tables/0 function %% to create the tables required. %% %% start_link/3 %% Provide the group name, the callback module name, and any arguments %% you wish to be passed into the callback module's functions. The %% joined/2 function will be called when we have joined the group, %% with the arguments passed to start_link and a list of the current %% members of the group. See the callbacks specs and the comments %% below for further details of the callback functions. %% %% leave/1 %% Provide the Pid. Removes the Pid from the group. The callback %% handle_terminate/2 function will be called. %% %% broadcast/2 %% Provide the Pid and a Message. The message will be sent to all %% members of the group as per the guarantees given above. This is a %% cast and the function call will return immediately. There is no %% guarantee that the message will reach any member of the group. %% %% confirmed_broadcast/2 %% Provide the Pid and a Message. As per broadcast/2 except that this %% is a call, not a cast, and only returns 'ok' once the Message has %% reached every member of the group. Do not call %% confirmed_broadcast/2 directly from the callback module otherwise %% you will deadlock the entire group. %% %% info/1 %% Provide the Pid. Returns a proplist with various facts, including %% the group name and the current group members. %% %% validate_members/2 %% Check whether a given member list agrees with the chosen member's %% view. Any differences will be communicated via the members_changed %% callback. If there are no differences then there will be no reply. %% Note that members will not necessarily share the same view. %% %% forget_group/1 %% Provide the group name. Removes its mnesia record. Makes no attempt %% to ensure the group is empty. %% %% Implementation Overview %% ----------------------- %% %% One possible means of implementation would be a fan-out from the %% sender to every member of the group. This would require that the %% group is fully connected, and, in the event that the original %% sender of the message disappears from the group before the message %% has made it to every member of the group, raises questions as to %% who is responsible for sending on the message to new group members. %% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] - %% if the sender dies part way through, who is responsible for %% ensuring that the remaining Members receive the Msg? In the event %% that within the group, messages sent are broadcast from a subset of %% the members, the fan-out arrangement has the potential to %% substantially impact the CPU and network workload of such members, %% as such members would have to accommodate the cost of sending each %% message to every group member. %% %% Instead, if the members of the group are arranged in a chain, then %% it becomes easier to reason about who within the group has received %% each message and who has not. It eases issues of responsibility: in %% the event of a group member disappearing, the nearest upstream %% member of the chain is responsible for ensuring that messages %% continue to propagate down the chain. It also results in equal %% distribution of sending and receiving workload, even if all %% messages are being sent from just a single group member. This %% configuration has the further advantage that it is not necessary %% for every group member to know of every other group member, and %% even that a group member does not have to be accessible from all %% other group members. %% %% Performance is kept high by permitting pipelining and all %% communication between joined group members is asynchronous. In the %% chain A -> B -> C -> D, if A sends a message to the group, it will %% not directly contact C or D. However, it must know that D receives %% the message (in addition to B and C) before it can consider the %% message fully sent. A simplistic implementation would require that %% D replies to C, C replies to B and B then replies to A. This would %% result in a propagation delay of twice the length of the chain. It %% would also require, in the event of the failure of C, that D knows %% to directly contact B and issue the necessary replies. Instead, the %% chain forms a ring: D sends the message on to A: D does not %% distinguish A as the sender, merely as the next member (downstream) %% within the chain (which has now become a ring). When A receives %% from D messages that A sent, it knows that all members have %% received the message. However, the message is not dead yet: if C %% died as B was sending to C, then B would need to detect the death %% of C and forward the message on to D instead: thus every node has %% to remember every message published until it is told that it can %% forget about the message. This is essential not just for dealing %% with failure of members, but also for the addition of new members. %% %% Thus once A receives the message back again, it then sends to B an %% acknowledgement for the message, indicating that B can now forget %% about the message. B does so, and forwards the ack to C. C forgets %% the message, and forwards the ack to D, which forgets the message %% and finally forwards the ack back to A. At this point, A takes no %% further action: the message and its acknowledgement have made it to %% every member of the group. The message is now dead, and any new %% member joining the group at this point will not receive the %% message. %% %% We therefore have two roles: %% %% 1. The sender, who upon receiving their own messages back, must %% then send out acknowledgements, and upon receiving their own %% acknowledgements back perform no further action. %% %% 2. The other group members who upon receiving messages and %% acknowledgements must update their own internal state accordingly %% (the sending member must also do this in order to be able to %% accommodate failures), and forwards messages on to their downstream %% neighbours. %% %% %% Implementation: It gets trickier %% -------------------------------- %% %% Chain A -> B -> C -> D %% %% A publishes a message which B receives. A now dies. B and D will %% detect the death of A, and will link up, thus the chain is now B -> %% C -> D. B forwards A's message on to C, who forwards it to D, who %% forwards it to B. Thus B is now responsible for A's messages - both %% publications and acknowledgements that were in flight at the point %% at which A died. Even worse is that this is transitive: after B %% forwards A's message to C, B dies as well. Now C is not only %% responsible for B's in-flight messages, but is also responsible for %% A's in-flight messages. %% %% Lemma 1: A member can only determine which dead members they have %% inherited responsibility for if there is a total ordering on the %% conflicting additions and subtractions of members from the group. %% %% Consider the simultaneous death of B and addition of B' that %% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or %% C is responsible for in-flight messages from B. It is easy to %% ensure that at least one of them thinks they have inherited B, but %% if we do not ensure that exactly one of them inherits B, then we %% could have B' converting publishes to acks, which then will crash C %% as C does not believe it has issued acks for those messages. %% %% More complex scenarios are easy to concoct: A -> B -> C -> D -> E %% becoming A -> C' -> E. Who has inherited which of B, C and D? %% %% However, for non-conflicting membership changes, only a partial %% ordering is required. For example, A -> B -> C becoming A -> A' -> %% B. The addition of A', between A and B can have no conflicts with %% the death of C: it is clear that A has inherited C's messages. %% %% For ease of implementation, we adopt the simple solution, of %% imposing a total order on all membership changes. %% %% On the death of a member, it is ensured the dead member's %% neighbours become aware of the death, and the upstream neighbour %% now sends to its new downstream neighbour its state, including the %% messages pending acknowledgement. The downstream neighbour can then %% use this to calculate which publishes and acknowledgements it has %% missed out on, due to the death of its old upstream. Thus the %% downstream can catch up, and continues the propagation of messages %% through the group. %% %% Lemma 2: When a member is joining, it must synchronously %% communicate with its upstream member in order to receive its %% starting state atomically with its addition to the group. %% %% New members must start with the same state as their nearest %% upstream neighbour. This ensures that it is not surprised by %% acknowledgements they are sent, and that should their downstream %% neighbour die, they are able to send the correct state to their new %% downstream neighbour to ensure it can catch up. Thus in the %% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' -> %% C, A' must start with the state of A, so that it can send C the %% correct state when B dies, allowing C to detect any missed %% messages. %% %% If A' starts by adding itself to the group membership, A could then %% die, without A' having received the necessary state from A. This %% would leave A' responsible for in-flight messages from A, but %% having the least knowledge of all, of those messages. Thus A' must %% start by synchronously calling A, which then immediately sends A' %% back its state. A then adds A' to the group. If A dies at this %% point then A' will be able to see this (as A' will fail to appear %% in the group membership), and thus A' will ignore the state it %% receives from A, and will simply repeat the process, trying to now %% join downstream from some other member. This ensures that should %% the upstream die as soon as the new member has been joined, the new %% member is guaranteed to receive the correct state, allowing it to %% correctly process messages inherited due to the death of its %% upstream neighbour. %% %% The canonical definition of the group membership is held by a %% distributed database. Whilst this allows the total ordering of %% changes to be achieved, it is nevertheless undesirable to have to %% query this database for the current view, upon receiving each %% message. Instead, we wish for members to be able to cache a view of %% the group membership, which then requires a cache invalidation %% mechanism. Each member maintains its own view of the group %% membership. Thus when the group's membership changes, members may %% need to become aware of such changes in order to be able to %% accurately process messages they receive. Because of the %% requirement of a total ordering of conflicting membership changes, %% it is not possible to use the guaranteed broadcast mechanism to %% communicate these changes: to achieve the necessary ordering, it %% would be necessary for such messages to be published by exactly one %% member, which can not be guaranteed given that such a member could %% die. %% %% The total ordering we enforce on membership changes gives rise to a %% view version number: every change to the membership creates a %% different view, and the total ordering permits a simple %% monotonically increasing view version number. %% %% Lemma 3: If a message is sent from a member that holds view version %% N, it can be correctly processed by any member receiving the %% message with a view version >= N. %% %% Initially, let us suppose that each view contains the ordering of %% every member that was ever part of the group. Dead members are %% marked as such. Thus we have a ring of members, some of which are %% dead, and are thus inherited by the nearest alive downstream %% member. %% %% In the chain A -> B -> C, all three members initially have view %% version 1, which reflects reality. B publishes a message, which is %% forward by C to A. B now dies, which A notices very quickly. Thus A %% updates the view, creating version 2. It now forwards B's %% publication, sending that message to its new downstream neighbour, %% C. This happens before C is aware of the death of B. C must become %% aware of the view change before it interprets the message its %% received, otherwise it will fail to learn of the death of B, and %% thus will not realise it has inherited B's messages (and will %% likely crash). %% %% Thus very simply, we have that each subsequent view contains more %% information than the preceding view. %% %% However, to avoid the views growing indefinitely, we need to be %% able to delete members which have died _and_ for which no messages %% are in-flight. This requires that upon inheriting a dead member, we %% know the last publication sent by the dead member (this is easy: we %% inherit a member because we are the nearest downstream member which %% implies that we know at least as much than everyone else about the %% publications of the dead member), and we know the earliest message %% for which the acknowledgement is still in flight. %% %% In the chain A -> B -> C, when B dies, A will send to C its state %% (as C is the new downstream from A), allowing C to calculate which %% messages it has missed out on (described above). At this point, C %% also inherits B's messages. If that state from A also includes the %% last message published by B for which an acknowledgement has been %% seen, then C knows exactly which further acknowledgements it must %% receive (also including issuing acknowledgements for publications %% still in-flight that it receives), after which it is known there %% are no more messages in flight for B, thus all evidence that B was %% ever part of the group can be safely removed from the canonical %% group membership. %% %% Thus, for every message that a member sends, it includes with that %% message its view version. When a member receives a message it will %% update its view from the canonical copy, should its view be older %% than the view version included in the message it has received. %% %% The state held by each member therefore includes the messages from %% each publisher pending acknowledgement, the last publication seen %% from that publisher, and the last acknowledgement from that %% publisher. In the case of the member's own publications or %% inherited members, this last acknowledgement seen state indicates %% the last acknowledgement retired, rather than sent. %% %% %% Proof sketch %% ------------ %% %% We need to prove that with the provided operational semantics, we %% can never reach a state that is not well formed from a well-formed %% starting state. %% %% Operational semantics (small step): straight-forward message %% sending, process monitoring, state updates. %% %% Well formed state: dead members inherited by exactly one non-dead %% member; for every entry in anyone's pending-acks, either (the %% publication of the message is in-flight downstream from the member %% and upstream from the publisher) or (the acknowledgement of the %% message is in-flight downstream from the publisher and upstream %% from the member). %% %% Proof by induction on the applicable operational semantics. %% %% %% Related work %% ------------ %% %% The ring configuration and double traversal of messages around the %% ring is similar (though developed independently) to the LCR %% protocol by [Levy 2008]. However, LCR differs in several %% ways. Firstly, by using vector clocks, it enforces a total order of %% message delivery, which is unnecessary for our purposes. More %% significantly, it is built on top of a "group communication system" %% which performs the group management functions, taking %% responsibility away from the protocol as to how to cope with safely %% adding and removing members. When membership changes do occur, the %% protocol stipulates that every member must perform communication %% with every other member of the group, to ensure all outstanding %% deliveries complete, before the entire group transitions to the new %% view. This, in total, requires two sets of all-to-all synchronous %% communications. %% %% This is not only rather inefficient, but also does not explain what %% happens upon the failure of a member during this process. It does %% though entirely avoid the need for inheritance of responsibility of %% dead members that our protocol incorporates. %% %% In [Marandi et al 2010], a Paxos-based protocol is described. This %% work explicitly focuses on the efficiency of communication. LCR %% (and our protocol too) are more efficient, but at the cost of %% higher latency. The Ring-Paxos protocol is itself built on top of %% IP-multicast, which rules it out for many applications where %% point-to-point communication is all that can be required. They also %% have an excellent related work section which I really ought to %% read... %% %% %% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008. %% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast %% Protocol -behaviour(gen_server2). -export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3, confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/3]). %% For INSTR_MOD callbacks -export([call/3, cast/2, monitor/1, demonitor/1]). -ifndef(use_specs). -export([behaviour_info/1]). -endif. -export([table_definitions/0]). -define(GROUP_TABLE, gm_group). -define(MAX_BUFFER_SIZE, 100000000). %% 100MB -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). -define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). -record(state, { self, left, right, group_name, module, view, pub_count, members_state, callback_args, confirms, broadcast_buffer, broadcast_buffer_sz, broadcast_timer, txn_executor }). -record(gm_group, { name, version, members }). -record(view_member, { id, aliases, left, right }). -record(member, { pending_ack, last_pub, last_ack }). -define(TABLE, {?GROUP_TABLE, [{record_name, gm_group}, {attributes, record_info(fields, gm_group)}]}). -define(TABLE_MATCH, {match, #gm_group { _ = '_' }}). -define(TAG, '$gm'). -ifdef(use_specs). -export_type([group_name/0]). -type(group_name() :: any()). -type(txn_fun() :: fun((fun(() -> any())) -> any())). -spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}). -spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) -> rabbit_types:ok_pid_or_error()). -spec(leave/1 :: (pid()) -> 'ok'). -spec(broadcast/2 :: (pid(), any()) -> 'ok'). -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(validate_members/2 :: (pid(), [pid()]) -> 'ok'). -spec(forget_group/1 :: (group_name()) -> 'ok'). %% The joined, members_changed and handle_msg callbacks can all return %% any of the following terms: %% %% 'ok' - the callback function returns normally %% %% {'stop', Reason} - the callback indicates the member should stop %% with reason Reason and should leave the group. %% %% {'become', Module, Args} - the callback indicates that the callback %% module should be changed to Module and that the callback functions %% should now be passed the arguments Args. This allows the callback %% module to be dynamically changed. %% Called when we've successfully joined the group. Supplied with Args %% provided in start_link, plus current group members. -callback joined(Args :: term(), Members :: [pid()]) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. %% Supplied with Args provided in start_link, the list of new members %% and the list of members previously known to us that have since %% died. Note that if a member joins and dies very quickly, it's %% possible that we will never see that member appear in either births %% or deaths. However we are guaranteed that (1) we will see a member %% joining either in the births here, or in the members passed to %% joined/2 before receiving any messages from it; and (2) we will not %% see members die that we have not seen born (or supplied in the %% members to joined/2). -callback members_changed(Args :: term(), Births :: [pid()], Deaths :: [pid()]) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. %% Supplied with Args provided in start_link, the sender, and the %% message. This does get called for messages injected by this member, %% however, in such cases, there is no special significance of this %% invocation: it does not indicate that the message has made it to %% any other members, let alone all other members. -callback handle_msg(Args :: term(), From :: pid(), Message :: term()) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. %% Called on gm member termination as per rules in gen_server, with %% the Args provided in start_link plus the termination Reason. -callback handle_terminate(Args :: term(), Reason :: term()) -> ok | term(). -else. behaviour_info(callbacks) -> [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {handle_terminate, 2}]; behaviour_info(_Other) -> undefined. -endif. create_tables() -> create_tables([?TABLE]). create_tables([]) -> ok; create_tables([{Table, Attributes} | Tables]) -> case mnesia:create_table(Table, Attributes) of {atomic, ok} -> create_tables(Tables); {aborted, {already_exists, Table}} -> create_tables(Tables); Err -> Err end. table_definitions() -> {Name, Attributes} = ?TABLE, [{Name, [?TABLE_MATCH | Attributes]}]. start_link(GroupName, Module, Args, TxnFun) -> gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). leave(Server) -> gen_server2:cast(Server, leave). broadcast(Server, Msg) -> broadcast(Server, Msg, 0). broadcast(Server, Msg, SizeHint) -> gen_server2:cast(Server, {broadcast, Msg, SizeHint}). confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). info(Server) -> gen_server2:call(Server, info, infinity). validate_members(Server, Members) -> gen_server2:cast(Server, {validate_members, Members}). forget_group(GroupName) -> {atomic, ok} = mnesia:sync_transaction( fun () -> mnesia:delete({?GROUP_TABLE, GroupName}) end), ok. init([GroupName, Module, Args, TxnFun]) -> put(process_name, {?MODULE, GroupName}), {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), gen_server2:cast(self(), join), {ok, #state { self = Self, left = {Self, undefined}, right = {Self, undefined}, group_name = GroupName, module = Module, view = undefined, pub_count = -1, members_state = undefined, callback_args = Args, confirms = queue:new(), broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({confirmed_broadcast, _Msg}, _From, State = #state { members_state = undefined }) -> reply(not_joined, State); handle_call({confirmed_broadcast, Msg}, _From, State = #state { self = Self, right = {Self, undefined}, module = Module, callback_args = Args }) -> handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> {Result, State1 = #state { pub_count = PubCount, confirms = Confirms }} = internal_broadcast(Msg, 0, State), Confirms1 = queue:in({PubCount, From}, Confirms), handle_callback_result({Result, flush_broadcast_buffer( State1 #state { confirms = Confirms1 })}); handle_call(info, _From, State = #state { members_state = undefined }) -> reply(not_joined, State); handle_call(info, _From, State = #state { group_name = GroupName, module = Module, view = View }) -> reply([{group_name, GroupName}, {module, Module}, {group_members, get_pids(alive_view_members(View))}], State); handle_call({add_on_right, _NewMember}, _From, State = #state { members_state = undefined }) -> reply(not_ready, State); handle_call({add_on_right, NewMember}, _From, State = #state { self = Self, group_name = GroupName, members_state = MembersState, txn_executor = TxnFun }) -> Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun), View1 = group_to_view(Group), MembersState1 = remove_erased_members(MembersState, View1), ok = send_right(NewMember, View1, {catchup, Self, prepare_members_state(MembersState1)}), {Result, State1} = change_view(View1, State #state { members_state = MembersState1 }), handle_callback_result({Result, {ok, Group}, State1}). %% add_on_right causes a catchup to be sent immediately from the left, %% so we can never see this from the left neighbour. However, it's %% possible for the right neighbour to send us a check_neighbours %% immediately before that. We can't possibly handle it, but if we're %% in this state we know a catchup is coming imminently anyway. So %% just ignore it. handle_cast({?TAG, _ReqVer, check_neighbours}, State = #state { members_state = undefined }) -> noreply(State); handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, members_state = MembersState, group_name = GroupName }) -> {Result, State1} = case needs_view_update(ReqVer, View) of true -> View1 = group_to_view(dirty_read_group(GroupName)), MemberState1 = remove_erased_members(MembersState, View1), change_view(View1, State #state { members_state = MemberState1 }); false -> {ok, State} end, handle_callback_result( if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); handle_cast({broadcast, _Msg, _SizeHint}, State = #state { members_state = undefined }) -> noreply(State); handle_cast({broadcast, Msg, _SizeHint}, State = #state { self = Self, right = {Self, undefined}, module = Module, callback_args = Args }) -> handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), State}); handle_cast({broadcast, Msg, SizeHint}, State) -> {Result, State1} = internal_broadcast(Msg, SizeHint, State), handle_callback_result({Result, maybe_flush_broadcast_buffer(State1)}); handle_cast(join, State = #state { self = Self, group_name = GroupName, members_state = undefined, module = Module, callback_args = Args, txn_executor = TxnFun }) -> View = join_group(Self, GroupName, TxnFun), MembersState = case alive_view_members(View) of [Self] -> blank_member_state(); _ -> undefined end, State1 = check_neighbours(State #state { view = View, members_state = MembersState }), handle_callback_result( {Module:joined(Args, get_pids(all_known_members(View))), State1}); handle_cast({validate_members, OldMembers}, State = #state { view = View, module = Module, callback_args = Args }) -> NewMembers = get_pids(all_known_members(View)), Births = NewMembers -- OldMembers, Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> noreply(State); _ -> Result = Module:members_changed(Args, Births, Deaths), handle_callback_result({Result, State}) end; handle_cast(leave, State) -> {stop, normal, State}. handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); handle_info(timeout, State) -> noreply(flush_broadcast_buffer(State)); handle_info({'DOWN', MRef, process, _Pid, Reason}, State = #state { self = Self, left = Left, right = Right, group_name = GroupName, confirms = Confirms, txn_executor = TxnFun }) -> Member = case {Left, Right} of {{Member1, MRef}, _} -> Member1; {_, {Member1, MRef}} -> Member1; _ -> undefined end, case {Member, Reason} of {undefined, _} -> noreply(State); {_, {shutdown, ring_shutdown}} -> noreply(State); _ -> %% In the event of a partial partition we could see another member %% go down and then remove them from Mnesia. While they can %% recover from this they'd have to restart the queue - not %% ideal. So let's sleep here briefly just in case this was caused %% by a partial partition; in which case by the time we record the %% member death in Mnesia we will probably be in a full %% partition and will not be assassinating another member. timer:sleep(100), View1 = group_to_view(record_dead_member_in_group( Member, GroupName, TxnFun)), handle_callback_result( case alive_view_members(View1) of [Self] -> maybe_erase_aliases( State #state { members_state = blank_member_state(), confirms = purge_confirms(Confirms) }, View1); _ -> change_view(View1, State) end) end. terminate(Reason, State = #state { module = Module, callback_args = Args }) -> flush_broadcast_buffer(State), Module:handle_terminate(Args, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. prioritise_info(flush, _Len, _State) -> 1; %% DOWN messages should not overtake initial catchups; if they do we %% will receive a DOWN we do not know what to do with. prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, #state { members_state = undefined }) -> 0; %% We should not prioritise DOWN messages from our left since %% otherwise the DOWN can overtake any last activity from the left, %% causing that activity to be lost. prioritise_info({'DOWN', _MRef, process, LeftPid, _Reason}, _Len, #state { left = {{_LeftVer, LeftPid}, _MRef2} }) -> 0; %% But prioritise all other DOWNs - we want to make sure we are not %% sending activity into the void for too long because our right is %% down but we don't know it. prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, _State) -> 1; prioritise_info(_, _Len, _State) -> 0. handle_msg(check_neighbours, State) -> %% no-op - it's already been done by the calling handle_cast {ok, State}; handle_msg({catchup, Left, MembersStateLeft}, State = #state { self = Self, left = {Left, _MRefL}, right = {Right, _MRefR}, view = View, members_state = undefined }) -> ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), MembersStateLeft1 = build_members_state(MembersStateLeft), {ok, State #state { members_state = MembersStateLeft1 }}; handle_msg({catchup, Left, MembersStateLeft}, State = #state { self = Self, left = {Left, _MRefL}, view = View, members_state = MembersState }) when MembersState =/= undefined -> MembersStateLeft1 = build_members_state(MembersStateLeft), AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++ ?DICT:fetch_keys(MembersStateLeft1)), {MembersState1, Activity} = lists:foldl( fun (Id, MembersStateActivity) -> #member { pending_ack = PALeft, last_ack = LA } = find_member_or_blank(Id, MembersStateLeft1), with_member_acc( fun (#member { pending_ack = PA } = Member, Activity1) -> case is_member_alias(Id, Self, View) of true -> {_AcksInFlight, Pubs, _PA1} = find_prefix_common_suffix(PALeft, PA), {Member #member { last_ack = LA }, activity_cons(Id, pubs_from_queue(Pubs), [], Activity1)}; false -> {Acks, _Common, Pubs} = find_prefix_common_suffix(PA, PALeft), {Member, activity_cons(Id, pubs_from_queue(Pubs), acks_from_queue(Acks), Activity1)} end end, Id, MembersStateActivity) end, {MembersState, activity_nil()}, AllMembers), handle_msg({activity, Left, activity_finalise(Activity)}, State #state { members_state = MembersState1 }); handle_msg({catchup, _NotLeft, _MembersState}, State) -> {ok, State}; handle_msg({activity, Left, Activity}, State = #state { self = Self, left = {Left, _MRefL}, view = View, members_state = MembersState, confirms = Confirms }) when MembersState =/= undefined -> {MembersState1, {Confirms1, Activity1}} = lists:foldl( fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) -> with_member_acc( fun (Member = #member { pending_ack = PA, last_pub = LP, last_ack = LA }, {Confirms2, Activity2}) -> case is_member_alias(Id, Self, View) of true -> {ToAck, PA1} = find_common(queue_from_pubs(Pubs), PA, queue:new()), LA1 = last_ack(Acks, LA), AckNums = acks_from_queue(ToAck), Confirms3 = maybe_confirm( Self, Id, Confirms2, AckNums), {Member #member { pending_ack = PA1, last_ack = LA1 }, {Confirms3, activity_cons( Id, [], AckNums, Activity2)}}; false -> PA1 = apply_acks(Acks, join_pubs(PA, Pubs)), LA1 = last_ack(Acks, LA), LP1 = last_pub(Pubs, LP), {Member #member { pending_ack = PA1, last_pub = LP1, last_ack = LA1 }, {Confirms2, activity_cons(Id, Pubs, Acks, Activity2)}} end end, Id, MembersStateConfirmsActivity) end, {MembersState, {Confirms, activity_nil()}}, Activity), State1 = State #state { members_state = MembersState1, confirms = Confirms1 }, Activity3 = activity_finalise(Activity1), ok = maybe_send_activity(Activity3, State1), {Result, State2} = maybe_erase_aliases(State1, View), if_callback_success( Result, fun activity_true/3, fun activity_false/3, Activity3, State2); handle_msg({activity, _NotLeft, _Activity}, State) -> {ok, State}. noreply(State) -> {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. reply(Reply, State) -> {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. flush_timeout(#state{broadcast_buffer = []}) -> hibernate; flush_timeout(_) -> 0. ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = undefined }) -> State; ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = TRef }) -> erlang:cancel_timer(TRef), State #state { broadcast_timer = undefined }; ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush), State #state { broadcast_timer = TRef }; ensure_broadcast_timer(State) -> State. internal_broadcast(Msg, SizeHint, State = #state { self = Self, pub_count = PubCount, module = Module, callback_args = Args, broadcast_buffer = Buffer, broadcast_buffer_sz = BufferSize }) -> PubCount1 = PubCount + 1, {Module:handle_msg(Args, get_pid(Self), Msg), State #state { pub_count = PubCount1, broadcast_buffer = [{PubCount1, Msg} | Buffer], broadcast_buffer_sz = BufferSize + SizeHint}}. %% The Erlang distribution mechanism has an interesting quirk - it %% will kill the VM cold with "Absurdly large distribution output data %% buffer" if you attempt to send a message which serialises out to %% more than 2^31 bytes in size. It's therefore a very good idea to %% make sure that we don't exceed that size! %% %% Now, we could figure out the size of messages as they come in using %% size(term_to_binary(Msg)) or similar. The trouble is, that requires %% us to serialise the message only to throw the serialised form %% away. Hard to believe that's a sensible thing to do. So instead we %% accept a size hint from the application, via broadcast/3. This size %% hint can be the size of anything in the message which we expect %% could be large, and we just ignore the size of any small bits of %% the message term. Therefore MAX_BUFFER_SIZE is set somewhat %% conservatively at 100MB - but the buffer is only to allow us to %% buffer tiny messages anyway, so 100MB is plenty. maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) -> case Size > ?MAX_BUFFER_SIZE of true -> flush_broadcast_buffer(State); false -> State end. flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> State; flush_broadcast_buffer(State = #state { self = Self, members_state = MembersState, broadcast_buffer = Buffer, pub_count = PubCount }) -> [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount Pubs = lists:reverse(Buffer), Activity = activity_cons(Self, Pubs, [], activity_nil()), ok = maybe_send_activity(activity_finalise(Activity), State), MembersState1 = with_member( fun (Member = #member { pending_ack = PA }) -> PA1 = queue:join(PA, queue:from_list(Pubs)), Member #member { pending_ack = PA1, last_pub = PubCount } end, Self, MembersState), State #state { members_state = MembersState1, broadcast_buffer = [], broadcast_buffer_sz = 0}. %% --------------------------------------------------------------------------- %% View construction and inspection %% --------------------------------------------------------------------------- needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer. view_version({Ver, _View}) -> Ver. is_member_alive({dead, _Member}) -> false; is_member_alive(_) -> true. is_member_alias(Self, Self, _View) -> true; is_member_alias(Member, Self, View) -> ?SETS:is_element(Member, ((fetch_view_member(Self, View)) #view_member.aliases)). dead_member_id({dead, Member}) -> Member. store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> {Ver, ?DICT:store(Id, VMember, View)}. with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). blank_view(Ver) -> {Ver, ?DICT:new()}. alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). all_known_members({_Ver, View}) -> ?DICT:fold( fun (Member, #view_member { aliases = Aliases }, Acc) -> ?SETS:to_list(Aliases) ++ [Member | Acc] end, [], View). group_to_view(#gm_group { members = Members, version = Ver }) -> Alive = lists:filter(fun is_member_alive/1, Members), [_|_] = Alive, %% ASSERTION - can't have all dead members add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members). link_view([Left, Middle, Right | Rest], View) -> case find_view_member(Middle, View) of error -> link_view( [Middle, Right | Rest], store_view_member(#view_member { id = Middle, aliases = ?SETS:new(), left = Left, right = Right }, View)); {ok, _} -> View end; link_view(_, View) -> View. add_aliases(View, Members) -> Members1 = ensure_alive_suffix(Members), {EmptyDeadSet, View1} = lists:foldl( fun (Member, {DeadAcc, ViewAcc}) -> case is_member_alive(Member) of true -> {?SETS:new(), with_view_member( fun (VMember = #view_member { aliases = Aliases }) -> VMember #view_member { aliases = ?SETS:union(Aliases, DeadAcc) } end, ViewAcc, Member)}; false -> {?SETS:add_element(dead_member_id(Member), DeadAcc), ViewAcc} end end, {?SETS:new(), View}, Members1), 0 = ?SETS:size(EmptyDeadSet), %% ASSERTION View1. ensure_alive_suffix(Members) -> queue:to_list(ensure_alive_suffix1(queue:from_list(Members))). ensure_alive_suffix1(MembersQ) -> {{value, Member}, MembersQ1} = queue:out_r(MembersQ), case is_member_alive(Member) of true -> MembersQ; false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1)) end. %% --------------------------------------------------------------------------- %% View modification %% --------------------------------------------------------------------------- join_group(Self, GroupName, TxnFun) -> join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun). join_group(Self, GroupName, {error, not_found}, TxnFun) -> join_group(Self, GroupName, prune_or_create_group(Self, GroupName, TxnFun), TxnFun); join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) -> group_to_view(Group); join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> case lists:member(Self, Members) of true -> group_to_view(Group); false -> case lists:filter(fun is_member_alive/1, Members) of [] -> join_group(Self, GroupName, prune_or_create_group(Self, GroupName, TxnFun), TxnFun); Alive -> Left = lists:nth(random:uniform(length(Alive)), Alive), Handler = fun () -> join_group( Self, GroupName, record_dead_member_in_group( Left, GroupName, TxnFun), TxnFun) end, try case neighbour_call(Left, {add_on_right, Self}) of {ok, Group1} -> group_to_view(Group1); not_ready -> join_group(Self, GroupName, TxnFun) end catch exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown -> Handler(); exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> Handler() end end end. dirty_read_group(GroupName) -> case mnesia:dirty_read(?GROUP_TABLE, GroupName) of [] -> {error, not_found}; [Group] -> Group end. read_group(GroupName) -> case mnesia:read({?GROUP_TABLE, GroupName}) of [] -> {error, not_found}; [Group] -> Group end. write_group(Group) -> mnesia:write(?GROUP_TABLE, Group, write), Group. prune_or_create_group(Self, GroupName, TxnFun) -> TxnFun( fun () -> GroupNew = #gm_group { name = GroupName, members = [Self], version = get_version(Self) }, case read_group(GroupName) of {error, not_found} -> write_group(GroupNew); Group = #gm_group { members = Members } -> case lists:any(fun is_member_alive/1, Members) of true -> Group; false -> write_group(GroupNew) end end end). record_dead_member_in_group(Member, GroupName, TxnFun) -> TxnFun( fun () -> Group = #gm_group { members = Members, version = Ver } = read_group(GroupName), case lists:splitwith( fun (Member1) -> Member1 =/= Member end, Members) of {_Members1, []} -> %% not found - already recorded dead Group; {Members1, [Member | Members2]} -> Members3 = Members1 ++ [{dead, Member} | Members2], write_group(Group #gm_group { members = Members3, version = Ver + 1 }) end end). record_new_member_in_group(NewMember, Left, GroupName, TxnFun) -> TxnFun( fun () -> Group = #gm_group { members = Members, version = Ver } = read_group(GroupName), {Prefix, [Left | Suffix]} = lists:splitwith(fun (M) -> M =/= Left end, Members), write_group(Group #gm_group { members = Prefix ++ [Left, NewMember | Suffix], version = Ver + 1 }) end). erase_members_in_group(Members, GroupName, TxnFun) -> DeadMembers = [{dead, Id} || Id <- Members], TxnFun( fun () -> Group = #gm_group { members = [_|_] = Members1, version = Ver } = read_group(GroupName), case Members1 -- DeadMembers of Members1 -> Group; Members2 -> write_group( Group #gm_group { members = Members2, version = Ver + 1 }) end end). maybe_erase_aliases(State = #state { self = Self, group_name = GroupName, members_state = MembersState, txn_executor = TxnFun }, View) -> #view_member { aliases = Aliases } = fetch_view_member(Self, View), {Erasable, MembersState1} = ?SETS:fold( fun (Id, {ErasableAcc, MembersStateAcc} = Acc) -> #member { last_pub = LP, last_ack = LA } = find_member_or_blank(Id, MembersState), case can_erase_view_member(Self, Id, LA, LP) of true -> {[Id | ErasableAcc], erase_member(Id, MembersStateAcc)}; false -> Acc end end, {[], MembersState}, Aliases), View1 = case Erasable of [] -> View; _ -> group_to_view( erase_members_in_group(Erasable, GroupName, TxnFun)) end, change_view(View1, State #state { members_state = MembersState1 }). can_erase_view_member(Self, Self, _LA, _LP) -> false; can_erase_view_member(_Self, _Id, N, N) -> true; can_erase_view_member(_Self, _Id, _LA, _LP) -> false. neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg). neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity). %% --------------------------------------------------------------------------- %% View monitoring and maintanence %% --------------------------------------------------------------------------- ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> {Self, undefined}; ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> ok = neighbour_cast(RealNeighbour, {?TAG, Ver, check_neighbours}), {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> true = ?INSTR_MOD:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, ok = neighbour_cast(RealNeighbour, Msg), ok = case Neighbour of Self -> ok; _ -> neighbour_cast(Neighbour, Msg) end, {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor( Self, Self) -> undefined; maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, right = Right, view = View, broadcast_buffer = Buffer }) -> #view_member { left = VLeft, right = VRight } = fetch_view_member(Self, View), Ver = view_version(View), Left1 = ensure_neighbour(Ver, Self, Left, VLeft), Right1 = ensure_neighbour(Ver, Self, Right, VRight), Buffer1 = case Right1 of {Self, undefined} -> []; _ -> Buffer end, State1 = State #state { left = Left1, right = Right1, broadcast_buffer = Buffer1 }, ok = maybe_send_catchup(Right, State1), State1. maybe_send_catchup(Right, #state { right = Right }) -> ok; maybe_send_catchup(_Right, #state { self = Self, right = {Self, undefined} }) -> ok; maybe_send_catchup(_Right, #state { members_state = undefined }) -> ok; maybe_send_catchup(_Right, #state { self = Self, right = {Right, _MRef}, view = View, members_state = MembersState }) -> send_right(Right, View, {catchup, Self, prepare_members_state(MembersState)}). %% --------------------------------------------------------------------------- %% Catch_up delta detection %% --------------------------------------------------------------------------- find_prefix_common_suffix(A, B) -> {Prefix, A1} = find_prefix(A, B, queue:new()), {Common, Suffix} = find_common(A1, B, queue:new()), {Prefix, Common, Suffix}. %% Returns the elements of A that occur before the first element of B, %% plus the remainder of A. find_prefix(A, B, Prefix) -> case {queue:out(A), queue:out(B)} of {{{value, Val}, _A1}, {{value, Val}, _B1}} -> {Prefix, A}; {{empty, A1}, {{value, _A}, _B1}} -> {Prefix, A1}; {{{value, {NumA, _MsgA} = Val}, A1}, {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB -> find_prefix(A1, B, queue:in(Val, Prefix)); {_, {empty, _B1}} -> {A, Prefix} %% Prefix well be empty here end. %% A should be a prefix of B. Returns the commonality plus the %% remainder of B. find_common(A, B, Common) -> case {queue:out(A), queue:out(B)} of {{{value, Val}, A1}, {{value, Val}, B1}} -> find_common(A1, B1, queue:in(Val, Common)); {{empty, _A}, _} -> {Common, B} end. %% --------------------------------------------------------------------------- %% Members helpers %% --------------------------------------------------------------------------- with_member(Fun, Id, MembersState) -> store_member( Id, Fun(find_member_or_blank(Id, MembersState)), MembersState). with_member_acc(Fun, Id, {MembersState, Acc}) -> {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc), {store_member(Id, MemberState, MembersState), Acc1}. find_member_or_blank(Id, MembersState) -> case ?DICT:find(Id, MembersState) of {ok, Result} -> Result; error -> blank_member() end. erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. blank_member_state() -> ?DICT:new(). store_member(Id, MemberState, MembersState) -> ?DICT:store(Id, MemberState, MembersState). prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). make_member(GroupName) -> {case dirty_read_group(GroupName) of #gm_group { version = Version } -> Version; {error, not_found} -> ?VERSION_START end, self()}. remove_erased_members(MembersState, View) -> lists:foldl(fun (Id, MembersState1) -> store_member(Id, find_member_or_blank(Id, MembersState), MembersState1) end, blank_member_state(), all_known_members(View)). get_version({Version, _Pid}) -> Version. get_pid({_Version, Pid}) -> Pid. get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% --------------------------------------------------------------------------- %% Activity assembly %% --------------------------------------------------------------------------- activity_nil() -> queue:new(). activity_cons( _Id, [], [], Tail) -> Tail; activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail). activity_finalise(Activity) -> queue:to_list(Activity). maybe_send_activity([], _State) -> ok; maybe_send_activity(Activity, #state { self = Self, right = {Right, _MRefR}, view = View }) -> send_right(Right, View, {activity, Self, Activity}). send_right(Right, View, Msg) -> ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}). callback(Args, Module, Activity) -> Result = lists:foldl( fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) -> lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) -> case Module2:handle_msg( Args2, get_pid(Id), Pub) of ok -> Acc; {become, Module3, Args3} -> {Args3, Module3, ok}; {stop, _Reason} = Error -> Error end; (_, Error = {stop, _Reason}) -> Error end, {Args1, Module1, ok}, Pubs); (_, Error = {stop, _Reason}) -> Error end, {Args, Module, ok}, Activity), case Result of {Args, Module, ok} -> ok; {Args1, Module1, ok} -> {become, Module1, Args1}; {stop, _Reason} = Error -> Error end. change_view(View, State = #state { view = View0, module = Module, callback_args = Args }) -> OldMembers = all_known_members(View0), NewMembers = all_known_members(View), Births = NewMembers -- OldMembers, Deaths = OldMembers -- NewMembers, Result = case {Births, Deaths} of {[], []} -> ok; _ -> Module:members_changed( Args, get_pids(Births), get_pids(Deaths)) end, {Result, check_neighbours(State #state { view = View })}. handle_callback_result({Result, State}) -> if_callback_success( Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State); handle_callback_result({Result, Reply, State}) -> if_callback_success( Result, fun reply_true/3, fun reply_false/3, Reply, State). no_reply_true (_Result, _Undefined, State) -> noreply(State). no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}. reply_true (_Result, Reply, State) -> reply(Reply, State). reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}. handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State). handle_msg_false(Result, _Msg, State) -> {Result, State}. activity_true(_Result, Activity, State = #state { module = Module, callback_args = Args }) -> {callback(Args, Module, Activity), State}. activity_false(Result, _Activity, State) -> {Result, State}. if_callback_success(ok, True, _False, Arg, State) -> True(ok, Arg, State); if_callback_success( {become, Module, Args} = Result, True, _False, Arg, State) -> True(Result, Arg, State #state { module = Module, callback_args = Args }); if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) -> False(Result, Arg, State). maybe_confirm(_Self, _Id, Confirms, []) -> Confirms; maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) -> case queue:out(Confirms) of {empty, _Confirms} -> Confirms; {{value, {PubNum, From}}, Confirms1} -> gen_server2:reply(From, ok), maybe_confirm(Self, Self, Confirms1, PubNums); {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum -> maybe_confirm(Self, Self, Confirms, PubNums) end; maybe_confirm(_Self, _Id, Confirms, _PubNums) -> Confirms. purge_confirms(Confirms) -> [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)], queue:new(). %% --------------------------------------------------------------------------- %% Msg transformation %% --------------------------------------------------------------------------- acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. pubs_from_queue(Q) -> queue:to_list(Q). queue_from_pubs(Pubs) -> queue:from_list(Pubs). apply_acks( [], Pubs) -> Pubs; apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs), Pubs1. join_pubs(Q, []) -> Q; join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). last_ack( [], LA) -> LA; last_ack(List, LA) -> LA1 = lists:last(List), true = LA1 > LA, %% ASSERTION LA1. last_pub( [], LP) -> LP; last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), true = PubNum > LP, %% ASSERTION PubNum. %% --------------------------------------------------------------------------- %% Uninstrumented versions call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout). cast(Pid, Msg) -> gen_server2:cast(Pid, Msg). monitor(Pid) -> erlang:monitor(process, Pid). demonitor(MRef) -> erlang:demonitor(MRef).