summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2023-03-12 13:13:49 +0400
committerGitHub <noreply@github.com>2023-03-12 13:13:49 +0400
commit09c8a05e373c798d93c8e5224f42afca16e32919 (patch)
treea39c8237cfa8d2d1a71f62ba1924abff875560a4
parent6d4ab6818c057b1ef7d3c917064288a5125e3160 (diff)
parent64d4e4f63a9ab4d52e64390516b4e30d860aad97 (diff)
downloadrabbitmq-server-git-09c8a05e373c798d93c8e5224f42afca16e32919.tar.gz
Merge pull request #7594 from rabbitmq/fix-3-12-beta-1-mqtt-cluster-creation
MQTT: Fix 3.12-beta.1 cluster creation
-rw-r--r--deps/rabbitmq_mqtt/src/mqtt_node.erl24
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl4
2 files changed, 19 insertions, 9 deletions
diff --git a/deps/rabbitmq_mqtt/src/mqtt_node.erl b/deps/rabbitmq_mqtt/src/mqtt_node.erl
index 4f08209d69..bd3eca1a7f 100644
--- a/deps/rabbitmq_mqtt/src/mqtt_node.erl
+++ b/deps/rabbitmq_mqtt/src/mqtt_node.erl
@@ -59,7 +59,7 @@ start(Delay, AttemptsLeft) ->
%% Trigger an election.
%% This is required when we start a node for the first time.
%% Using default timeout because it supposed to reply fast.
- rabbit_log:info("MQTT: discovered ~tp cluster peers that support client ID tracking", [length(Peers)]),
+ rabbit_log:info("MQTT: discovered cluster peers that support client ID tracking: ~p", [Peers]),
ok = start_server(),
_ = join_peers(NodeId, Peers),
ra:trigger_election(NodeId, ?RA_OPERATION_TIMEOUT)
@@ -86,7 +86,8 @@ start_server() ->
log_init_args => #{uid => UId},
tick_timeout => Timeout,
machine => {module, mqtt_machine, #{}}
- },
+ },
+ rabbit_log:info("MQTT: starting Ra server with initial members: ~p", [Nodes]),
ra:start_server(?RA_SYSTEM, Conf).
trigger_election() ->
@@ -147,14 +148,23 @@ delete(_) ->
ok;
_ ->
rabbit_log:info("Deleting Ra cluster ~s ...", [?ID_NAME]),
- try ra:delete_cluster(RaNodes, ?RA_OPERATION_TIMEOUT) of
+ try ra:delete_cluster(RaNodes, 15_000) of
{ok, _Leader} ->
rabbit_log:info("Successfully deleted Ra cluster ~s", [?ID_NAME]),
ok;
- {error, _} = Err ->
- rabbit_log:info("Failed to delete Ra cluster ~s: ~p", [?ID_NAME, Err]),
- Err
- catch exit:{{shutdown, delete}, _Stacktrace} ->
+ {error, Reason} ->
+ rabbit_log:info("Failed to delete Ra cluster ~s: ~p", [?ID_NAME, Reason]),
+ ServerId = server_id(),
+ case ra:force_delete_server(?RA_SYSTEM, ServerId) of
+ ok ->
+ rabbit_log:info("Successfully force deleted Ra server ~p", [ServerId]),
+ ok;
+ Error ->
+ rabbit_log:error("Failed to force delete Ra server ~p: ~p",
+ [ServerId, Error]),
+ {error, Error}
+ end
+ catch exit:{{shutdown, delete}, _StackTrace} ->
rabbit_log:info("Ra cluster ~s already being deleted", [?ID_NAME]),
ok
end
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index 18411706e3..8be17132a2 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -186,7 +186,7 @@ process_connect(
send_conn_ack(?CONNACK_ACCEPT, SessPresent, ProtoVerAtom, SendFun),
{ok, State};
{error, ReturnErrCode} = Err
- when is_number(ReturnErrCode) ->
+ when is_integer(ReturnErrCode) ->
%% If a server sends a CONNACK packet containing a non-zero return
%% code it MUST set Session Present to 0 [MQTT-3.2.2-4].
SessPresent = false,
@@ -1637,7 +1637,7 @@ mailbox_soft_limit_exceeded() ->
is_socket_busy(Socket) ->
case rabbit_net:getstat(Socket, [send_pend]) of
{ok, [{send_pend, NumBytes}]}
- when is_number(NumBytes) andalso NumBytes > 0 ->
+ when is_integer(NumBytes) andalso NumBytes > 0 ->
true;
_ ->
false