summaryrefslogtreecommitdiff
path: root/happybase/connection.py
blob: e0eb549cda69da0e8c2c216e491587603cc88a9f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# coding: UTF-8

"""
HappyBase connection module.
"""

import logging

from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TFramedTransport
from thrift.protocol import TBinaryProtocol

from .hbase import Hbase
from .hbase.ttypes import ColumnDescriptor
from .table import Table
from .util import pep8_to_camel_case

logger = logging.getLogger(__name__)

COMPAT_MODES = ('0.90', '0.92', '0.94')
THRIFT_TRANSPORTS = dict(
    buffered=TBufferedTransport,
    framed=TFramedTransport,
)

DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 9090
DEFAULT_TRANSPORT = 'buffered'
DEFAULT_COMPAT = '0.94'


class Connection(object):
    """Connection to an HBase Thrift server.

    The `host` and `port` parameters specify the host name and TCP port
    of the HBase Thrift server to connect to. If omitted or ``None``,
    a connection to the default port on ``localhost`` is made. If
    specifed, the `timeout` parameter specifies the socket timeout in
    milliseconds.

    If `autoconnect` is `True` (the default) the connection is made directly,
    otherwise :py:meth:`Connection.open` must be called explicitly before first
    use.

    The optional `table_prefix` and `table_prefix_separator` arguments specify
    a prefix and a separator string to be prepended to all table names, e.g.
    when :py:meth:`Connection.table` is invoked. For example, if `table_prefix`
    is ``myproject``, all tables tables will have names like ``myproject_XYZ``.

    The optional `compat` parameter sets the compatibility level for this
    connection. Older HBase versions have slightly different Thrift interfaces,
    and using the wrong protocol can lead to crashes caused by communication
    errors, so make sure to use the correct one. This value can be either the
    string ``0.92`` (the default) for use with HBase 0.92.x and later versions,
    or ``0.90`` for use with HBase 0.90.x.

    The optional `transport` parameter specifies the Thrift transport mode to
    use. Supported values for this parameter are ``buffered`` (the default) and
    ``framed``. Make sure to choose the right one, since otherwise you might
    see non-obvious connection errors or program hangs when making
    a connection. HBase versions before 0.94 always use the buffered transport.
    Starting with HBase 0.94, the Thrift server optionally uses a framed
    transport, depending on the parameter passed to the ``hbase-daemon.sh start
    thrift`` command. The default ``-threadpool`` mode uses the buffered
    transport; the ``-hsha``, ``-nonblocking``, and ``-threadedselector`` modes
    use the framed transport.

    .. versionadded:: 0.5
       `timeout` parameter

    .. versionadded:: 0.4
       `table_prefix_separator` parameter

    .. versionadded:: 0.4
       support for framed Thrift transports

    :param str host: The host to connect to
    :param int port: The port to connect to
    :param int timeout: The socket timeout in milliseconds (optional)
    :param bool autoconnect: Whether the connection should be opened directly
    :param str table_prefix: Prefix used to construct table names (optional)
    :param str table_prefix_separator: Separator used for `table_prefix`
    :param str compat: Compatibility mode (optional)
    :param str transport: Thrift transport mode (optional)
    """
    def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, timeout=None,
                 autoconnect=True, table_prefix=None,
                 table_prefix_separator='_', compat=DEFAULT_COMPAT,
                 transport=DEFAULT_TRANSPORT):

        if transport not in THRIFT_TRANSPORTS:
            raise ValueError("'transport' must be one of %s"
                             % ", ".join(THRIFT_TRANSPORTS.keys()))

        if table_prefix is not None \
                and not isinstance(table_prefix, basestring):
            raise TypeError("'table_prefix' must be a string")

        if not isinstance(table_prefix_separator, basestring):
            raise TypeError("'table_prefix_separator' must be a string")

        if compat not in COMPAT_MODES:
            raise ValueError("'compat' must be one of %s"
                             % ", ".join(COMPAT_MODES))

        # Allow host and port to be None, which may be easier for
        # applications wrapping a Connection instance.
        self.host = host or DEFAULT_HOST
        self.port = port or DEFAULT_PORT
        self.timeout = timeout
        self.table_prefix = table_prefix
        self.table_prefix_separator = table_prefix_separator
        self.compat = compat

        self._transport_class = THRIFT_TRANSPORTS[transport]
        self._refresh_thrift_client()

        if autoconnect:
            self.open()

        self._initialized = True

    def _refresh_thrift_client(self):
        """Refresh the Thrift socket, transport, and client."""
        socket = TSocket(self.host, self.port)
        if self.timeout is not None:
            socket.setTimeout(self.timeout)

        self.transport = self._transport_class(socket)
        protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
        self.client = Hbase.Client(protocol)

    def _table_name(self, name):
        """Construct a table name by optionally adding a table name prefix."""
        if self.table_prefix is None:
            return name

        return self.table_prefix + self.table_prefix_separator + name

    def open(self):
        """Open the underlying transport to the HBase instance.

        This method opens the underlying Thrift transport (TCP connection).
        """
        if self.transport.isOpen():
            return

        logger.debug("Opening Thrift transport to %s:%d", self.host, self.port)
        self.transport.open()

    def close(self):
        """Close the underyling transport to the HBase instance.

        This method closes the underlying Thrift transport (TCP connection).
        """
        if not self.transport.isOpen():
            return

        if logger is not None:
            # If called from __del__(), module variables may no longer
            # exist.
            logger.debug(
                "Closing Thrift transport to %s:%d",
                self.host, self.port)

        self.transport.close()

    def __del__(self):
        try:
            self._initialized
        except AttributeError:
            # Failure from constructor
            return
        else:
            self.close()

    def table(self, name, use_prefix=True):
        """Return a table object.

        Returns a :py:class:`happybase.Table` instance for the table named
        `name`. This does not result in a round-trip to the server, and the
        table is not checked for existence.

        The optional `use_prefix` parameter specifies whether the table prefix
        (if any) is prepended to the specified `name`. Set this to `False` if
        you want to use a table that resides in another ‘prefix namespace’,
        e.g. a table from a ‘friendly’ application co-hosted on the same HBase
        instance. See the `table_prefix` parameter to the
        :py:class:`Connection` constructor for more information.

        :param str name: the name of the table
        :param bool use_prefix: whether to use the table prefix (if any)
        :return: Table instance
        :rtype: :py:class:`Table`
        """
        if use_prefix:
            name = self._table_name(name)
        return Table(name, self)

    #
    # Table administration and maintenance
    #

    def tables(self):
        """Return a list of table names available in this HBase instance.

        If a `table_prefix` was set for this :py:class:`Connection`, only
        tables that have the specified prefix will be listed.

        :return: The table names
        :rtype: List of strings
        """
        names = self.client.getTableNames()

        # Filter using prefix, and strip prefix from names
        if self.table_prefix is not None:
            prefix = self._table_name('')
            offset = len(prefix)
            names = [n[offset:] for n in names if n.startswith(prefix)]

        return names

    def create_table(self, name, families):
        """Create a table.

        :param str name: The table name
        :param dict families: The name and options for each column family

        The `families` parameter is a dictionary mapping column family
        names to a dictionary containing the options for this column
        family, e.g.

        ::

            families = {
                'cf1': dict(max_versions=10),
                'cf2': dict(max_versions=1, block_cache_enabled=False),
                'cf3': dict(),  # use defaults
            }
            connection.create_table('mytable', families)

        These options correspond to the ColumnDescriptor structure in
        the Thrift API, but note that the names should be provided in
        Python style, not in camel case notation, e.g. `time_to_live`,
        not `timeToLive`. The following options are supported:

        * ``max_versions`` (`int`)
        * ``compression`` (`str`)
        * ``in_memory`` (`bool`)
        * ``bloom_filter_type`` (`str`)
        * ``bloom_filter_vector_size`` (`int`)
        * ``bloom_filter_nb_hashes`` (`int`)
        * ``block_cache_enabled`` (`bool`)
        * ``time_to_live`` (`int`)
        """
        name = self._table_name(name)
        if not isinstance(families, dict):
            raise TypeError("'families' arg must be a dictionary")

        if not families:
            raise ValueError(
                "Cannot create table %r (no column families specified)"
                % name)

        column_descriptors = []
        for cf_name, options in families.iteritems():
            if options is None:
                options = dict()

            kwargs = dict()
            for option_name, value in options.iteritems():
                kwargs[pep8_to_camel_case(option_name)] = value

            if not cf_name.endswith(':'):
                cf_name += ':'
            kwargs['name'] = cf_name

            column_descriptors.append(ColumnDescriptor(**kwargs))

        self.client.createTable(name, column_descriptors)

    def delete_table(self, name, disable=False):
        """Delete the specified table.

        .. versionadded:: 0.5
           the `disable` parameter

        In HBase, a table always needs to be disabled before it can be deleted.
        If the `disable` parameter is `True`, this method first disables the
        table if it wasn't already and then deletes it.

        :param str name: The table name
        :param bool disable: Whether to first disable the table if needed
        """
        if disable and self.is_table_enabled(name):
            self.disable_table(name)

        name = self._table_name(name)
        self.client.deleteTable(name)

    def enable_table(self, name):
        """Enable the specified table.

        :param str name: The table name
        """
        name = self._table_name(name)
        self.client.enableTable(name)

    def disable_table(self, name):
        """Disable the specified table.

        :param str name: The table name
        """
        name = self._table_name(name)
        self.client.disableTable(name)

    def is_table_enabled(self, name):
        """Return whether the specified table is enabled.

        :param str name: The table name

        :return: whether the table is enabled
        :rtype: bool
        """
        name = self._table_name(name)
        return self.client.isTableEnabled(name)

    def compact_table(self, name, major=False):
        """Compact the specified table.

        :param str name: The table name
        :param bool major: Whether to perform a major compaction.
        """
        name = self._table_name(name)
        if major:
            self.client.majorCompact(name)
        else:
            self.client.compact(name)