summaryrefslogtreecommitdiff
path: root/storage/ndb/src/common/transporter/Transporter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/common/transporter/Transporter.cpp')
-rw-r--r--storage/ndb/src/common/transporter/Transporter.cpp208
1 files changed, 208 insertions, 0 deletions
diff --git a/storage/ndb/src/common/transporter/Transporter.cpp b/storage/ndb/src/common/transporter/Transporter.cpp
new file mode 100644
index 00000000000..124ed5f7241
--- /dev/null
+++ b/storage/ndb/src/common/transporter/Transporter.cpp
@@ -0,0 +1,208 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <TransporterRegistry.hpp>
+#include <TransporterCallback.hpp>
+#include "Transporter.hpp"
+#include "TransporterInternalDefinitions.hpp"
+#include <NdbSleep.h>
+#include <SocketAuthenticator.hpp>
+#include <InputStream.hpp>
+#include <OutputStream.hpp>
+
+#include <EventLogger.hpp>
+extern EventLogger g_eventLogger;
+
+Transporter::Transporter(TransporterRegistry &t_reg,
+ TransporterType _type,
+ const char *lHostName,
+ const char *rHostName,
+ int s_port,
+ bool _isMgmConnection,
+ NodeId lNodeId,
+ NodeId rNodeId,
+ NodeId serverNodeId,
+ int _byteorder,
+ bool _compression, bool _checksum, bool _signalId)
+ : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
+ isServer(lNodeId==serverNodeId), isMgmConnection(_isMgmConnection),
+ m_packer(_signalId, _checksum),
+ m_type(_type),
+ m_transporter_registry(t_reg)
+{
+ DBUG_ENTER("Transporter::Transporter");
+ if (rHostName && strlen(rHostName) > 0){
+ strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
+ Ndb_getInAddr(&remoteHostAddress, rHostName);
+ }
+ else
+ {
+ if (!isServer) {
+ ndbout << "Unable to setup transporter. Node " << rNodeId
+ << " must have hostname. Update configuration." << endl;
+ exit(-1);
+ }
+ remoteHostName[0]= 0;
+ }
+ strncpy(localHostName, lHostName, sizeof(localHostName));
+
+ if (strlen(lHostName) > 0)
+ Ndb_getInAddr(&localHostAddress, lHostName);
+
+ DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
+ remoteNodeId, localNodeId, isServer,
+ remoteHostName, localHostName,
+ s_port));
+
+ byteOrder = _byteorder;
+ compressionUsed = _compression;
+ checksumUsed = _checksum;
+ signalIdUsed = _signalId;
+
+ m_connected = false;
+ m_timeOutMillis = 1000;
+
+ if(s_port<0)
+ s_port= -s_port; // was dynamic
+
+ if (isServer)
+ m_socket_client= 0;
+ else
+ m_socket_client= new SocketClient(remoteHostName, s_port,
+ new SocketAuthSimple("ndbd",
+ "ndbd passwd"));
+ DBUG_VOID_RETURN;
+}
+
+Transporter::~Transporter(){
+ if (m_socket_client)
+ delete m_socket_client;
+}
+
+bool
+Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
+ // all initial negotiation is done in TransporterRegistry::connect_server
+ DBUG_ENTER("Transporter::connect_server");
+
+ if(m_connected)
+ {
+ DBUG_RETURN(true); // TODO assert(0);
+ }
+
+ bool res = connect_server_impl(sockfd);
+ if(res){
+ m_connected = true;
+ m_errorCount = 0;
+ }
+
+ DBUG_RETURN(res);
+}
+
+bool
+Transporter::connect_client() {
+ NDB_SOCKET_TYPE sockfd;
+
+ if(m_connected)
+ return true;
+
+ if(isMgmConnection)
+ sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
+ else
+ sockfd= m_socket_client->connect();
+
+ return connect_client(sockfd);
+}
+
+bool
+Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
+
+ if(m_connected)
+ return true;
+
+ if (sockfd == NDB_INVALID_SOCKET)
+ return false;
+
+ DBUG_ENTER("Transporter::connect_client");
+
+ DBUG_PRINT("info",("port %d isMgmConnection=%d",m_s_port,isMgmConnection));
+
+ SocketOutputStream s_output(sockfd);
+ SocketInputStream s_input(sockfd);
+
+ // send info about own id
+ // send info about own transporter type
+
+ s_output.println("%d %d", localNodeId, m_type);
+ // get remote id
+ int nodeId, remote_transporter_type= -1;
+
+ char buf[256];
+ if (s_input.gets(buf, 256) == 0) {
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+
+ int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
+ switch (r) {
+ case 2:
+ break;
+ case 1:
+ // we're running version prior to 4.1.9
+ // ok, but with no checks on transporter configuration compatability
+ break;
+ default:
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+
+ DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
+ nodeId, remote_transporter_type));
+
+ if (remote_transporter_type != -1)
+ {
+ if (remote_transporter_type != m_type)
+ {
+ DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
+ m_type, remote_transporter_type));
+ NDB_CLOSE_SOCKET(sockfd);
+ g_eventLogger.error("Incompatible configuration: transporter type "
+ "mismatch with node %d", nodeId);
+ DBUG_RETURN(false);
+ }
+ }
+ else if (m_type == tt_SHM_TRANSPORTER)
+ {
+ g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
+ }
+
+ bool res = connect_client_impl(sockfd);
+ if(res){
+ m_connected = true;
+ m_errorCount = 0;
+ }
+ DBUG_RETURN(res);
+}
+
+void
+Transporter::doDisconnect() {
+
+ if(!m_connected)
+ return; //assert(0); TODO will fail
+
+ m_connected= false;
+ disconnectImpl();
+}