diff options
author | Michael Klishin <michael@clojurewerkz.org> | 2021-02-20 00:42:39 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2021-02-20 00:42:39 +0300 |
commit | a9dd03bdf55e4ca8ab77620f86afe518af29bb9f (patch) | |
tree | 4d2a07589c15cffdeeaca1a5a4207fe8473faccb | |
parent | 7af5802a0e5aeea143fc23552ea964209c7162a8 (diff) | |
download | rabbitmq-server-git-mk-opt-out-of-separate-channel-use.tar.gz |
Make it possible to use a single channel for message transfers and commandsmk-opt-out-of-separate-channel-use
For the rare case where federated exchange experience rapid
binding changes. As of rabbitmq/rabbitmq-federation#97,
such environments have a race condition between binding
and message propagation.
6 files changed, 66 insertions, 12 deletions
diff --git a/deps/rabbitmq_federation/include/rabbit_federation.hrl b/deps/rabbitmq_federation/include/rabbit_federation.hrl index 316ece7de9..9e36a1b7c1 100644 --- a/deps/rabbitmq_federation/include/rabbit_federation.hrl +++ b/deps/rabbitmq_federation/include/rabbit_federation.hrl @@ -19,7 +19,9 @@ ha_policy, name, bind_nowait, - resource_cleanup_mode}). + resource_cleanup_mode, + channel_use_mode + }). -record(upstream_params, {uri, diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl index 7eefa62d1a..9c6109042b 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl @@ -430,11 +430,15 @@ key(#binding{key = Key, args = Args}) -> {Key, Args}. go(S0 = {not_started, {Upstream, UParams, DownXName}}) -> Unacked = rabbit_federation_link_util:unacked_new(), - log_link_startup_attempt(Upstream, DownXName), rabbit_federation_link_util:start_conn_ch( fun (Conn, Ch, DConn, DCh) -> - {ok, CmdCh} = open_cmd_channel(Conn, Upstream, UParams, DownXName, S0), + {ok, CmdCh} = + case Upstream#upstream.channel_use_mode of + single -> reuse_command_channel(Ch, Upstream, DownXName); + multiple -> open_command_channel(Conn, Upstream, UParams, DownXName, S0); + _ -> open_command_channel(Conn, Upstream, UParams, DownXName, S0) + end, erlang:monitor(process, CmdCh), Props = pget(server_properties, amqp_connection:info(Conn, [server_properties])), @@ -480,11 +484,18 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) -> {noreply, State#state{internal_exchange_timer = TRef}} end, Upstream, UParams, DownXName, S0). -log_link_startup_attempt(OUpstream, DownXName) -> - rabbit_log_federation:debug("Will try to start a federation link for ~s, upstream: '~s'", - [rabbit_misc:rs(DownXName), OUpstream#upstream.name]). +log_link_startup_attempt(#upstream{name = Name, channel_use_mode = ChMode}, DownXName) -> + rabbit_log_federation:debug("Will try to start a federation link for ~s, upstream: '~s', channel use mode: ~s", + [rabbit_misc:rs(DownXName), Name, ChMode]). + +%% If channel use mode is 'single', reuse the message transfer channel. +%% Otherwise open a separate one. +reuse_command_channel(MainCh, #upstream{name = UName}, DownXName) -> + rabbit_log_federation:debug("Will use a single channel for both schema operations and message transfer on links to upstream '~s' for downstream federated ~s", + [UName, rabbit_misc:rs(DownXName)]), + {ok, MainCh}. -open_cmd_channel(Conn, Upstream = #upstream{name = UName}, UParams, DownXName, S0) -> +open_command_channel(Conn, Upstream = #upstream{name = UName}, UParams, DownXName, S0) -> rabbit_log_federation:debug("Will open a command channel to upstream '~s' for downstream federated ~s", [UName, rabbit_misc:rs(DownXName)]), case amqp_connection:open_channel(Conn) of diff --git a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl index 0b3934d112..c0751ae492 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl @@ -87,9 +87,12 @@ shared_validation() -> {<<"trust-user-id">>, fun rabbit_parameter_validation:boolean/2, optional}, {<<"ack-mode">>, rabbit_parameter_validation:enum( ['no-ack', 'on-publish', 'on-confirm']), optional}, - {<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum(['default', 'never']), optional}, + {<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum( + ['default', 'never']), optional}, {<<"ha-policy">>, fun rabbit_parameter_validation:binary/2, optional}, - {<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional}]. + {<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional}, + {<<"channel-use-mode">>, rabbit_parameter_validation:enum( + ['multiple', 'single']), optional}]. validate_uri(Name, Term) when is_binary(Term) -> case rabbit_parameter_validation:binary(Name, Term) of diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index a2f1c4bab3..ebb29067b0 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -139,7 +139,9 @@ from_upstream_or_set(US, Name, U, XorQ) -> ha_policy = bget('ha-policy', US, U, none), name = Name, bind_nowait = bget('bind-nowait', US, U, false), - resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>))}. + resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>)), + channel_use_mode = to_atom(bget('channel-use-mode', US, U, multiple)) + }. %%---------------------------------------------------------------------------- diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl index 05aefda0cb..9ba91692c7 100644 --- a/deps/rabbitmq_federation/test/exchange_SUITE.erl +++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl @@ -29,6 +29,7 @@ all() -> [ {group, without_automatic_setup}, + {group, channel_use_mode_single}, {group, without_disambiguate}, {group, with_disambiguate} ]. @@ -76,6 +77,18 @@ groups() -> upstream_has_no_federation ]} ]} + ]}, + {channel_use_mode_single, [], [ + simple, + multiple_upstreams, + multiple_upstreams_pattern, + multiple_uris, + multiple_downstreams, + e2e, + unbind_on_delete, + unbind_on_unbind, + unbind_gets_transmitted, + federate_unfederate ]} ]. @@ -93,6 +106,24 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +%% Some of the "regular" tests but in the single channel mode. +init_per_group(channel_use_mode_single, Config) -> + SetupFederation = [ + fun(Config) -> + rabbit_federation_test_util:setup_federation_with_upstream_params(Config, [ + {<<"channel-use-mode">>, <<"single">>} + ]) + end + ], + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Suffix}, + {rmq_nodes_clustered, false} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + SetupFederation); init_per_group(without_automatic_setup, Config) -> Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), Config1 = rabbit_ct_helpers:set_config(Config, [ diff --git a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl index 471ba0ecee..a9e08dcb09 100644 --- a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl +++ b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl @@ -16,14 +16,19 @@ -import(rabbit_misc, [pget/2]). setup_federation(Config) -> + setup_federation_with_upstream_params(Config, []). + +setup_federation_with_upstream_params(Config, ExtraParams) -> rabbit_ct_broker_helpers:set_parameter(Config, 0, <<"federation-upstream">>, <<"localhost">>, [ {<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)}, - {<<"consumer-tag">>, <<"fed.tag">>}]), + {<<"consumer-tag">>, <<"fed.tag">>} + ] ++ ExtraParams), rabbit_ct_broker_helpers:set_parameter(Config, 0, <<"federation-upstream">>, <<"local5673">>, [ - {<<"uri">>, <<"amqp://localhost:1">>}]), + {<<"uri">>, <<"amqp://localhost:1">>} + ] ++ ExtraParams), rabbit_ct_broker_helpers:set_parameter(Config, 0, <<"federation-upstream-set">>, <<"upstream">>, [ |