summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2021-02-20 00:42:39 +0300
committerMichael Klishin <michael@clojurewerkz.org>2021-02-20 00:42:39 +0300
commita9dd03bdf55e4ca8ab77620f86afe518af29bb9f (patch)
tree4d2a07589c15cffdeeaca1a5a4207fe8473faccb
parent7af5802a0e5aeea143fc23552ea964209c7162a8 (diff)
downloadrabbitmq-server-git-mk-opt-out-of-separate-channel-use.tar.gz
Make it possible to use a single channel for message transfers and commandsmk-opt-out-of-separate-channel-use
For the rare case where federated exchange experience rapid binding changes. As of rabbitmq/rabbitmq-federation#97, such environments have a race condition between binding and message propagation.
-rw-r--r--deps/rabbitmq_federation/include/rabbit_federation.hrl4
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl23
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_parameters.erl7
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_upstream.erl4
-rw-r--r--deps/rabbitmq_federation/test/exchange_SUITE.erl31
-rw-r--r--deps/rabbitmq_federation/test/rabbit_federation_test_util.erl9
6 files changed, 66 insertions, 12 deletions
diff --git a/deps/rabbitmq_federation/include/rabbit_federation.hrl b/deps/rabbitmq_federation/include/rabbit_federation.hrl
index 316ece7de9..9e36a1b7c1 100644
--- a/deps/rabbitmq_federation/include/rabbit_federation.hrl
+++ b/deps/rabbitmq_federation/include/rabbit_federation.hrl
@@ -19,7 +19,9 @@
ha_policy,
name,
bind_nowait,
- resource_cleanup_mode}).
+ resource_cleanup_mode,
+ channel_use_mode
+ }).
-record(upstream_params,
{uri,
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl
index 7eefa62d1a..9c6109042b 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl
@@ -430,11 +430,15 @@ key(#binding{key = Key, args = Args}) -> {Key, Args}.
go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
Unacked = rabbit_federation_link_util:unacked_new(),
-
log_link_startup_attempt(Upstream, DownXName),
rabbit_federation_link_util:start_conn_ch(
fun (Conn, Ch, DConn, DCh) ->
- {ok, CmdCh} = open_cmd_channel(Conn, Upstream, UParams, DownXName, S0),
+ {ok, CmdCh} =
+ case Upstream#upstream.channel_use_mode of
+ single -> reuse_command_channel(Ch, Upstream, DownXName);
+ multiple -> open_command_channel(Conn, Upstream, UParams, DownXName, S0);
+ _ -> open_command_channel(Conn, Upstream, UParams, DownXName, S0)
+ end,
erlang:monitor(process, CmdCh),
Props = pget(server_properties,
amqp_connection:info(Conn, [server_properties])),
@@ -480,11 +484,18 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
{noreply, State#state{internal_exchange_timer = TRef}}
end, Upstream, UParams, DownXName, S0).
-log_link_startup_attempt(OUpstream, DownXName) ->
- rabbit_log_federation:debug("Will try to start a federation link for ~s, upstream: '~s'",
- [rabbit_misc:rs(DownXName), OUpstream#upstream.name]).
+log_link_startup_attempt(#upstream{name = Name, channel_use_mode = ChMode}, DownXName) ->
+ rabbit_log_federation:debug("Will try to start a federation link for ~s, upstream: '~s', channel use mode: ~s",
+ [rabbit_misc:rs(DownXName), Name, ChMode]).
+
+%% If channel use mode is 'single', reuse the message transfer channel.
+%% Otherwise open a separate one.
+reuse_command_channel(MainCh, #upstream{name = UName}, DownXName) ->
+ rabbit_log_federation:debug("Will use a single channel for both schema operations and message transfer on links to upstream '~s' for downstream federated ~s",
+ [UName, rabbit_misc:rs(DownXName)]),
+ {ok, MainCh}.
-open_cmd_channel(Conn, Upstream = #upstream{name = UName}, UParams, DownXName, S0) ->
+open_command_channel(Conn, Upstream = #upstream{name = UName}, UParams, DownXName, S0) ->
rabbit_log_federation:debug("Will open a command channel to upstream '~s' for downstream federated ~s",
[UName, rabbit_misc:rs(DownXName)]),
case amqp_connection:open_channel(Conn) of
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl
index 0b3934d112..c0751ae492 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl
@@ -87,9 +87,12 @@ shared_validation() ->
{<<"trust-user-id">>, fun rabbit_parameter_validation:boolean/2, optional},
{<<"ack-mode">>, rabbit_parameter_validation:enum(
['no-ack', 'on-publish', 'on-confirm']), optional},
- {<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum(['default', 'never']), optional},
+ {<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum(
+ ['default', 'never']), optional},
{<<"ha-policy">>, fun rabbit_parameter_validation:binary/2, optional},
- {<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional}].
+ {<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional},
+ {<<"channel-use-mode">>, rabbit_parameter_validation:enum(
+ ['multiple', 'single']), optional}].
validate_uri(Name, Term) when is_binary(Term) ->
case rabbit_parameter_validation:binary(Name, Term) of
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
index a2f1c4bab3..ebb29067b0 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
@@ -139,7 +139,9 @@ from_upstream_or_set(US, Name, U, XorQ) ->
ha_policy = bget('ha-policy', US, U, none),
name = Name,
bind_nowait = bget('bind-nowait', US, U, false),
- resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>))}.
+ resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>)),
+ channel_use_mode = to_atom(bget('channel-use-mode', US, U, multiple))
+ }.
%%----------------------------------------------------------------------------
diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl
index 05aefda0cb..9ba91692c7 100644
--- a/deps/rabbitmq_federation/test/exchange_SUITE.erl
+++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl
@@ -29,6 +29,7 @@
all() ->
[
{group, without_automatic_setup},
+ {group, channel_use_mode_single},
{group, without_disambiguate},
{group, with_disambiguate}
].
@@ -76,6 +77,18 @@ groups() ->
upstream_has_no_federation
]}
]}
+ ]},
+ {channel_use_mode_single, [], [
+ simple,
+ multiple_upstreams,
+ multiple_upstreams_pattern,
+ multiple_uris,
+ multiple_downstreams,
+ e2e,
+ unbind_on_delete,
+ unbind_on_unbind,
+ unbind_gets_transmitted,
+ federate_unfederate
]}
].
@@ -93,6 +106,24 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+%% Some of the "regular" tests but in the single channel mode.
+init_per_group(channel_use_mode_single, Config) ->
+ SetupFederation = [
+ fun(Config) ->
+ rabbit_federation_test_util:setup_federation_with_upstream_params(Config, [
+ {<<"channel-use-mode">>, <<"single">>}
+ ])
+ end
+ ],
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Suffix},
+ {rmq_nodes_clustered, false}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ SetupFederation);
init_per_group(without_automatic_setup, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
diff --git a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
index 471ba0ecee..a9e08dcb09 100644
--- a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
+++ b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
@@ -16,14 +16,19 @@
-import(rabbit_misc, [pget/2]).
setup_federation(Config) ->
+ setup_federation_with_upstream_params(Config, []).
+
+setup_federation_with_upstream_params(Config, ExtraParams) ->
rabbit_ct_broker_helpers:set_parameter(Config, 0,
<<"federation-upstream">>, <<"localhost">>, [
{<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)},
- {<<"consumer-tag">>, <<"fed.tag">>}]),
+ {<<"consumer-tag">>, <<"fed.tag">>}
+ ] ++ ExtraParams),
rabbit_ct_broker_helpers:set_parameter(Config, 0,
<<"federation-upstream">>, <<"local5673">>, [
- {<<"uri">>, <<"amqp://localhost:1">>}]),
+ {<<"uri">>, <<"amqp://localhost:1">>}
+ ] ++ ExtraParams),
rabbit_ct_broker_helpers:set_parameter(Config, 0,
<<"federation-upstream-set">>, <<"upstream">>, [