Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Counter Batches and Reverse table scans #96

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions happybase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .connection import DEFAULT_HOST, DEFAULT_PORT, Connection
from .table import Table
from .batch import Batch
from .counter_batch import CounterBatch
from .pool import ConnectionPool, NoConnectionsAvailable

# TODO: properly handle errors defined in Thrift specification
48 changes: 48 additions & 0 deletions happybase/counter_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from happybase.hbase.ttypes import TIncrement
from collections import defaultdict


class CounterBatch(object):
def __init__(self, table, batch_size=None):
self.table = table
self.batch_size = batch_size
self.batch = defaultdict(int)
self.batch_count = 0

def counter_inc(self, row, column, value=1):
self.batch[(row, column)] += value
self.batch_count += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is incorrect. if the (row, column) tuple was already present this count will be wrong. i guess len(self.batch) is a better count ;)

self._check_send()

def counter_dec(self, row, column, value=1):
self.counter_inc(row, column, -value)

def send(self):
increment_rows = [
TIncrement(table=self.table.name, row=key[0], column=key[1], ammount=value)
for key, value in self.batch.iteritems()
]
self.table.connection.client.incrementRows(increment_rows)
self.batch.clear()
self.batch_count = 0

def _check_send(self):
if self.batch_size and (self.batch_count >= self.batch_size):
self.send()

#
# Context manager methods
#

def __enter__(self):
"""Called upon entering a ``with`` block"""
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Called upon exiting a ``with`` block"""
# TODO: Examine the exception and decide whether or not to send
# For now we always send
if exc_type is not None:
pass

self.send()
25 changes: 24 additions & 1 deletion happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .hbase.ttypes import TScan
from .util import thrift_type_to_dict, str_increment, OrderedDict
from .batch import Batch
from .counter_batch import CounterBatch

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -215,7 +216,7 @@ def cells(self, row, column, versions=None, timestamp=None,
def scan(self, row_start=None, row_stop=None, row_prefix=None,
columns=None, filter=None, timestamp=None,
include_timestamp=False, batch_size=1000, scan_batching=None,
limit=None, sorted_columns=False):
limit=None, sorted_columns=False, reversed=False):
"""Create a scanner for data in the table.

This method returns an iterable that can be used for looping over the
Expand Down Expand Up @@ -270,6 +271,9 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
* The `sorted_columns` argument is only available when using
HBase 0.96 (or up).

* The `reversed` option is only available when using HBase 0.98
(or up).

.. versionadded:: 0.8
`sorted_columns` argument

Expand All @@ -287,6 +291,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
:param bool scan_batching: server-side scan batching (optional)
:param int limit: max number of rows to return
:param bool sorted_columns: whether to return sorted columns
:param bool reversed: whether or not to reverse the row ordering

:return: generator yielding the rows matching the scan
:rtype: iterable of `(row_key, row_data)` tuples
Expand Down Expand Up @@ -369,6 +374,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
filterString=filter,
batchSize=scan_batching,
sortColumns=sorted_columns,
reversed=reversed,
)
scan_id = self.connection.client.scannerOpenWithScan(
self.name, scan, {})
Expand Down Expand Up @@ -497,6 +503,23 @@ def batch(self, timestamp=None, batch_size=None, transaction=False,
del kwargs['self']
return Batch(table=self, **kwargs)

def counter_batch(self, batch_size=None):
"""Create a new batch of counter operation for this table.

This method returns a new :py:class:`CounterBatch` instance that can be used
for mass counter manipulation.

If given, the `batch_size` argument specifies the maximum batch size
after which the batch should send the mutations to the server. By
default this is unbounded.

:param int batch_size: batch size (optional)

:return: CounterBatch instance
:rtype: :py:class:`CounterBatch`
"""
return CounterBatch(table=self, batch_size=batch_size)

#
# Atomic counters
#
Expand Down