require_relative '../common' require 'net/ssh/transport/session' require 'net/ssh/proxy/http' require 'logger' # mocha adds #verify to Object, which throws off the host-key-verifier part of # these tests. # can't use .include? because ruby18 uses strings and ruby19 uses symbols :/ Object.send(:undef_method, :verify) if Object.instance_methods.any? { |v| v.to_sym == :verify } module Transport class TestSession < NetSSHTest include Net::SSH::Transport::Constants TEST_HOST = "net.ssh.test" TEST_PORT = 22 def test_constructor_defaults assert_equal TEST_HOST, session.host assert_equal TEST_PORT, session.port assert_instance_of( Net::SSH::Verifiers::AcceptNewOrLocalTunnel, session.host_key_verifier ) end def test_verify_host_key_true_uses_accept_new_or_local_tunnel_verifier Kernel.expects(:warn).with( 'verify_host_key: true is deprecated, use :accept_new_or_local_tunnel' ) assert_instance_of( Net::SSH::Verifiers::AcceptNewOrLocalTunnel, session(verify_host_key: true).host_key_verifier ) end def test_verify_host_key_accept_new_or_local_tunnel_uses_accept_new_or_local_tunnel_verifier assert_instance_of( Net::SSH::Verifiers::AcceptNewOrLocalTunnel, session(verify_host_key: :accept_new_or_local_tunnel).host_key_verifier ) end def test_verify_host_key_nil_uses_accept_new_or_local_tunnel_verifier assert_instance_of( Net::SSH::Verifiers::AcceptNewOrLocalTunnel, session(verify_host_key: nil).host_key_verifier ) end def test_verify_host_key_very_uses_accept_new_verifier Kernel.expects(:warn).with('verify_host_key: :very is deprecated, use :accept_new') assert_instance_of( Net::SSH::Verifiers::AcceptNew, session(verify_host_key: :very).host_key_verifier ) end def test_verify_host_key_accept_new_uses_accept_new_verifier assert_instance_of( Net::SSH::Verifiers::AcceptNew, session(verify_host_key: :accept_new).host_key_verifier ) end def test_verify_host_key_secure_uses_always_verifier Kernel.expects(:warn).with('verify_host_key: :secure is deprecated, use :always') assert_instance_of( Net::SSH::Verifiers::Always, session(verify_host_key: :secure).host_key_verifier ) end def test_verify_host_key_false_uses_never_verifier Kernel.expects(:warn).with('verify_host_key: false is deprecated, use :never') assert_instance_of( Net::SSH::Verifiers::Never, session(verify_host_key: false).host_key_verifier ) end def test_verify_host_key_null_uses_never_verifier assert_instance_of( Net::SSH::Verifiers::Never, session(verify_host_key: :never).host_key_verifier ) end def test_unknown_verify_host_key_value_raises_exception_if_value_does_not_respond_to_verify assert_raises(ArgumentError) { session(verify_host_key: :bogus).host_key_verifier } end def test_verify_host_key_value_responding_to_verify_should_pass_muster object = stub("thingy", verify: true, verify_signature: true) assert_equal object, session(verify_host_key: object).host_key_verifier end def test_deprecated_host_key_verifier Kernel.expects(:warn).with('Warning: verifier without :verify_signature is deprecated') object = stub("thingy", verify: true) assert_not_nil session(verify_host_key: object).host_key_verifier end def test_host_as_string_should_return_host_and_ip_when_port_is_default session! socket.stubs(:peer_ip).returns("1.2.3.4") assert_equal "#{TEST_HOST},1.2.3.4", session.host_as_string end def test_host_as_string_should_return_host_and_ip_with_port_when_port_is_not_default session(port: 1234) # force session to be instantiated socket.stubs(:peer_ip).returns("1.2.3.4") assert_equal "[#{TEST_HOST}]:1234,[1.2.3.4]:1234", session.host_as_string end def test_host_as_string_should_return_only_host_when_host_is_ip session!(host: "1.2.3.4") socket.stubs(:peer_ip).returns("1.2.3.4") assert_equal "1.2.3.4", session.host_as_string end def test_host_as_string_should_return_only_host_and_port_when_host_is_ip_and_port_is_not_default session!(host: "1.2.3.4", port: 1234) socket.stubs(:peer_ip).returns("1.2.3.4") assert_equal "[1.2.3.4]:1234", session.host_as_string end def test_host_as_string_should_return_only_host_when_proxy_command_is_set session!(host: "1.2.3.4") socket.stubs(:peer_ip).returns(Net::SSH::Transport::PacketStream::PROXY_COMMAND_HOST_IP) assert_equal "1.2.3.4", session.host_as_string end def test_host_as_string_should_return_only_host_and_port_when_host_is_ip_and_port_is_not_default_and_proxy_command_is_set session!(host: "1.2.3.4", port: 1234) socket.stubs(:peer_ip).returns(Net::SSH::Transport::PacketStream::PROXY_COMMAND_HOST_IP) assert_equal "[1.2.3.4]:1234", session.host_as_string end def test_close_should_cleanup_and_close_socket session! socket.expects(:cleanup) socket.expects(:close) session.close end def test_service_request_should_return_buffer assert_equal "\005\000\000\000\004sftp", session.service_request('sftp').to_s end def test_rekey_when_kex_is_pending_should_do_nothing algorithms.stubs(pending?: true) algorithms.expects(:rekey!).never session.rekey! end def test_rekey_when_no_kex_is_pending_should_initiate_rekey_and_block_until_it_completes algorithms.stubs(pending?: false) algorithms.expects(:rekey!) session.expects(:wait).yields algorithms.expects(:initialized?).returns(true) session.rekey! end def test_rekey_as_needed_when_kex_is_pending_should_do_nothing session! algorithms.stubs(pending?: true) socket.expects(:if_needs_rekey?).never session.rekey_as_needed end def test_rekey_as_needed_when_no_kex_is_pending_and_no_rekey_is_needed_should_do_nothing session! algorithms.stubs(pending?: false) socket.stubs(if_needs_rekey?: false) session.expects(:rekey!).never session.rekey_as_needed end def test_rekey_as_needed_when_no_kex_is_pending_and_rekey_is_needed_should_initiate_rekey_and_block session! algorithms.stubs(pending?: false) socket.expects(:if_needs_rekey?).yields session.expects(:rekey!) session.rekey_as_needed end def test_peer_should_return_hash_of_info_about_peer session! socket.stubs(peer_ip: "1.2.3.4") assert_equal({ ip: "1.2.3.4", port: TEST_PORT, host: TEST_HOST, canonized: "net.ssh.test,1.2.3.4" }, session.peer) end def test_next_message_should_block_until_next_message_is_available session.expects(:poll_message).with(:block) session.next_message end def test_poll_message_should_query_next_packet_using_the_given_blocking_parameter session! socket.expects(:next_packet).with(:blocking_parameter, nil).returns(nil) session.poll_message(:blocking_parameter) end def test_poll_message_should_query_next_packet_using_the_timeout_option session!(timeout: 7) socket.expects(:next_packet).with(:nonblock, 7).returns(nil) session.poll_message end def test_poll_message_should_default_to_non_blocking session! socket.expects(:next_packet).with(:nonblock, nil).returns(nil) session.poll_message end def test_poll_message_should_silently_handle_disconnect_packets session! socket.expects(:next_packet).returns(P(:byte, DISCONNECT, :long, 1, :string, "testing", :string, "")) assert_raises(Net::SSH::Disconnect) { session.poll_message } end def test_poll_message_should_silently_handle_ignore_packets session! socket.expects(:next_packet).times(2).returns(P(:byte, IGNORE, :string, "test"), nil) assert_nil session.poll_message end def test_poll_message_should_silently_handle_unimplemented_packets session! socket.expects(:next_packet).times(2).returns(P(:byte, UNIMPLEMENTED, :long, 15), nil) assert_nil session.poll_message end def test_poll_message_should_silently_handle_debug_packets_with_always_display session! socket.expects(:next_packet).times(2).returns(P(:byte, DEBUG, :bool, true, :string, "testing", :string, ""), nil) assert_nil session.poll_message end def test_poll_message_should_silently_handle_debug_packets_without_always_display session! socket.expects(:next_packet).times(2).returns(P(:byte, DEBUG, :bool, false, :string, "testing", :string, ""), nil) assert_nil session.poll_message end def test_poll_message_should_silently_handle_kexinit_packets session! packet = P(:byte, KEXINIT, :raw, "lasdfalksdjfa;slkdfja;slkfjsdfaklsjdfa;df") socket.expects(:next_packet).times(2).returns(packet, nil) algorithms.expects(:accept_kexinit).with(packet) assert_nil session.poll_message end def test_poll_message_should_return_other_packets session! packet = P(:byte, SERVICE_ACCEPT, :string, "test") socket.expects(:next_packet).returns(packet) assert_equal packet, session.poll_message end def test_poll_message_should_enqueue_packets_when_algorithm_disallows_packet session! packet = P(:byte, SERVICE_ACCEPT, :string, "test") algorithms.stubs(:allow?).with(packet).returns(false) socket.expects(:next_packet).times(2).returns(packet, nil) assert_nil session.poll_message assert_equal [packet], session.queue end def test_poll_message_should_read_from_queue_when_next_in_queue_is_allowed_and_consume_queue_is_true session! packet = P(:byte, SERVICE_ACCEPT, :string, "test") session.push(packet) socket.expects(:next_packet).never assert_equal packet, session.poll_message assert session.queue.empty? end def test_poll_message_should_not_read_from_queue_when_next_in_queue_is_not_allowed session! packet = P(:byte, SERVICE_ACCEPT, :string, "test") algorithms.stubs(:allow?).with(packet).returns(false) session.push(packet) socket.expects(:next_packet).returns(nil) assert_nil session.poll_message assert_equal [packet], session.queue end def test_poll_message_should_not_read_from_queue_when_consume_queue_is_false session! packet = P(:byte, SERVICE_ACCEPT, :string, "test") session.push(packet) socket.expects(:next_packet).returns(nil) assert_nil session.poll_message(:nonblock, false) assert_equal [packet], session.queue end def test_wait_with_block_should_return_immediately_if_block_returns_truth session.expects(:poll_message).never session.wait { true } end def test_wait_should_not_consume_queue_on_reads n = 0 session.expects(:poll_message).with(:nonblock, false).returns(nil) session.wait { (n += 1) > 1 } end def test_wait_without_block_should_return_after_first_read session.expects(:poll_message).returns(nil) session.wait end def test_wait_should_enqueue_packets session! p1 = P(:byte, SERVICE_REQUEST, :string, "test") p2 = P(:byte, SERVICE_ACCEPT, :string, "test") socket.expects(:next_packet).times(2).returns(p1, p2) n = 0 session.wait { (n += 1) > 2 } assert_equal [p1, p2], session.queue end def test_push_should_enqueue_packet packet = P(:byte, SERVICE_ACCEPT, :string, "test") session.push(packet) assert_equal [packet], session.queue end def test_send_message_should_delegate_to_socket session! packet = P(:byte, SERVICE_ACCEPT, :string, "test") socket.expects(:send_packet).with(packet) session.send_message(packet) end def test_enqueue_message_should_delegate_to_socket session! packet = P(:byte, SERVICE_ACCEPT, :string, "test") socket.expects(:enqueue_packet).with(packet) session.enqueue_message(packet) end def test_configure_client_should_pass_options_to_socket_client_state session.configure_client compression: :standard assert_equal :standard, socket.client.compression end def test_configure_server_should_pass_options_to_socket_server_state session.configure_server compression: :standard assert_equal :standard, socket.server.compression end def test_hint_should_set_hint_on_socket assert !socket.hints[:authenticated] session.hint :authenticated assert socket.hints[:authenticated] end class TestLogger < Logger def initialize @strio = StringIO.new super(@strio) end def messages @strio.string end end def test_log_correct_debug_with_proxy logger = TestLogger.new proxy = Net::SSH::Proxy::HTTP.new("") session!(logger: logger, proxy: proxy) assert_match "establishing connection to #{TEST_HOST}:#{TEST_PORT} through proxy", logger.messages end def test_log_correct_debug_without_proxy logger = TestLogger.new session!(logger: logger) assert_match "establishing connection to #{TEST_HOST}:#{TEST_PORT}", logger.messages end private def socket @socket ||= stub("socket", hints: {}) end def server_version @server_version ||= stub("server_version") end def algorithms @algorithms ||= stub("algorithms", initialized?: true, allow?: true, start: true) end def session(options={}) @session ||= begin host = options.delete(:host) || TEST_HOST if (proxy = options[:proxy]) proxy.stubs("open").returns(socket) else Socket.stubs(:tcp).with(host, options[:port] || TEST_PORT, nil, nil, { connect_timeout: options[:timeout] }).returns(socket) end Net::SSH::Transport::ServerVersion.stubs(:new).returns(server_version) Net::SSH::Transport::Algorithms.stubs(:new).returns(algorithms) Net::SSH::Transport::Session.new(host, options) end end # a simple alias to make the tests more self-documenting. the bang # version makes it look more like the session is being instantiated alias session! session end end