summaryrefslogtreecommitdiff
path: root/sql/sql_window.cc
diff options
context:
space:
mode:
authorVicențiu Ciorbaru <vicentiu@mariadb.org>2016-09-22 18:26:55 +0200
committerVicențiu Ciorbaru <vicentiu@mariadb.org>2016-09-24 15:12:34 +0200
commit53cf265b3b6be949a19294661cb3e0ce25d9c712 (patch)
tree4d80b82ecc26a88e94c3afb5056fac9b742023ea /sql/sql_window.cc
parent29b227c33565596f903cc6ef5aa2d8a76324e28c (diff)
downloadmariadb-git-53cf265b3b6be949a19294661cb3e0ce25d9c712.tar.gz
Implement LEAD and LAG and NTH_VALUE functions
Refactour out (into a copy for now) the logic of Item_sum_hybrid, to allow for multiple arguments. It does not contain the comparator members. The result is the class Item_sum_hybrid_simple. LEAD and LAG make use of this Item to store previous rows in a chache. It also helps in specifying the field type. Currently LEAD/LAG do not support default values. NTH_VALUE behaves identical to LEAD and LAG, except that the starting position cursor is placed on the top of the frame instead of the current row.
Diffstat (limited to 'sql/sql_window.cc')
-rw-r--r--sql/sql_window.cc162
1 files changed, 120 insertions, 42 deletions
diff --git a/sql/sql_window.cc b/sql/sql_window.cc
index 15045924e83..1a111e52c75 100644
--- a/sql/sql_window.cc
+++ b/sql/sql_window.cc
@@ -273,7 +273,7 @@ int compare_order_lists(SQL_I_List<ORDER> *part_list1,
return CMP_GT_C;
if (elem2)
return CMP_LT_C;
- return CMP_EQ;
+ return CMP_EQ;
}
@@ -686,7 +686,17 @@ public:
if ((res= Table_read_cursor::next()) ||
(res= fetch()))
+ {
+ /* TODO(cvicentiu) This does not consider table read failures.
+ Perhaps assuming end of table like this is fine in that case. */
+
+ /* This row is the final row in the table. To maintain semantics
+ that cursors always point to the last valid row, move back one step,
+ but mark end_of_partition as true. */
+ Table_read_cursor::prev();
+ end_of_partition= true;
return res;
+ }
if (bound_tracker.compare_with_cache())
{
@@ -1886,19 +1896,25 @@ private:
/* A cursor that follows a target cursor. Each time a new row is added,
the window functions are cleared and only have the row at which the target
is point at added to them.
+
+ The window functions are cleared if the bounds or the position cursors are
+ outside computational bounds.
*/
class Frame_positional_cursor : public Frame_cursor
{
public:
Frame_positional_cursor(const Frame_cursor &position_cursor) :
- position_cursor(position_cursor), bound(NULL), offset(NULL),
+ position_cursor(position_cursor), top_bound(NULL),
+ bottom_bound(NULL), offset(NULL), overflowed(false),
negative_offset(false) {}
Frame_positional_cursor(const Frame_cursor &position_cursor,
- const Frame_cursor &bound,
+ const Frame_cursor &top_bound,
+ const Frame_cursor &bottom_bound,
Item &offset,
bool negative_offset) :
- position_cursor(position_cursor), bound(&bound), offset(&offset),
+ position_cursor(position_cursor), top_bound(&top_bound),
+ bottom_bound(&bottom_bound), offset(&offset),
negative_offset(negative_offset) {}
void init(READ_RECORD *info)
@@ -1908,35 +1924,26 @@ class Frame_positional_cursor : public Frame_cursor
void pre_next_partition(ha_rows rownum)
{
- clear_sum_functions();
+ /* The offset is dependant on the current row values. We can only get
+ * it here accurately. When fetching other rows, it changes. */
+ save_offset_value();
}
void next_partition(ha_rows rownum)
{
- ha_rows position= get_current_position();
- if (position_is_within_bounds(position))
- {
- cursor.move_to(position);
- cursor.fetch();
- add_value_to_items();
- }
+ save_positional_value();
}
void pre_next_row()
{
+ /* The offset is dependant on the current row values. We can only get
+ * it here accurately. When fetching other rows, it changes. */
+ save_offset_value();
}
void next_row()
{
- ha_rows position= get_current_position();
- if (!position_is_within_bounds(position))
- clear_sum_functions();
- else
- {
- cursor.move_to(position_cursor.get_curr_rownum());
- cursor.fetch();
- add_value_to_items();
- }
+ save_positional_value();
}
ha_rows get_curr_rownum() const
@@ -1947,30 +1954,25 @@ class Frame_positional_cursor : public Frame_cursor
private:
/* Check if a our position is within bounds.
* The position is passed as a parameter to avoid recalculating it. */
- bool position_is_within_bounds(ha_rows position)
+ bool position_is_within_bounds()
{
if (!offset)
return !position_cursor.is_outside_computation_bounds();
+ if (overflowed)
+ return false;
+
/* No valid bound to compare to. */
if (position_cursor.is_outside_computation_bounds() ||
- bound->is_outside_computation_bounds())
+ top_bound->is_outside_computation_bounds() ||
+ bottom_bound->is_outside_computation_bounds())
return false;
- if (negative_offset)
- {
- if (position_cursor.get_curr_rownum() < position)
- return false; /* Overflow below 0. */
- if (position < bound->get_curr_rownum()) /* We are over the bound. */
- return false;
- }
- else
- {
- if (position_cursor.get_curr_rownum() > position)
- return false; /* Overflow over MAX_HA_ROWS. */
- if (position > bound->get_curr_rownum()) /* We are over the bound. */
- return false;
- }
+ /* We are over the bound. */
+ if (position < top_bound->get_curr_rownum())
+ return false;
+ if (position > bottom_bound->get_curr_rownum())
+ return false;
return true;
}
@@ -1978,18 +1980,55 @@ private:
/* Get the current position, accounting for the offset value, if present.
NOTE: This function does not check over/underflow.
*/
- ha_rows get_current_position()
+ void get_current_position()
{
- ha_rows position = position_cursor.get_curr_rownum();
+ position = position_cursor.get_curr_rownum();
+ overflowed= false;
if (offset)
- position += offset->val_int() * (negative_offset ? -1 : 1);
- return position;
+ {
+ if (offset_value < 0 &&
+ position + offset_value > position)
+ {
+ overflowed= true;
+ }
+ if (offset_value > 0 &&
+ position + offset_value < position)
+ {
+ overflowed= true;
+ }
+ position += offset_value;
+ }
+ }
+
+ void save_offset_value()
+ {
+ if (offset)
+ offset_value= offset->val_int() * (negative_offset ? -1 : 1);
+ else
+ offset_value= 0;
+ }
+
+ void save_positional_value()
+ {
+ get_current_position();
+ if (!position_is_within_bounds())
+ clear_sum_functions();
+ else
+ {
+ cursor.move_to(position);
+ cursor.fetch();
+ add_value_to_items();
+ }
}
const Frame_cursor &position_cursor;
- const Frame_cursor *bound;
+ const Frame_cursor *top_bound;
+ const Frame_cursor *bottom_bound;
Item *offset;
Table_read_cursor cursor;
+ ha_rows position;
+ longlong offset_value;
+ bool overflowed;
bool negative_offset;
};
@@ -2107,6 +2146,7 @@ void add_special_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
{
Window_spec *spec= window_func->window_spec;
Item_sum *item_sum= window_func->window_func();
+ DBUG_PRINT("info", ("Get arg count: %d", item_sum->get_arg_count()));
Frame_cursor *fc;
switch (item_sum->sum_func())
{
@@ -2135,6 +2175,44 @@ void add_special_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
fc->add_sum_func(item_sum);
cursor_manager->add_cursor(fc);
break;
+ case Item_sum::LEAD_FUNC:
+ case Item_sum::LAG_FUNC:
+ {
+ Frame_cursor *bottom_bound= new Frame_unbounded_following(thd,
+ spec->partition_list,
+ spec->order_list);
+ Frame_cursor *top_bound= new Frame_unbounded_preceding(thd,
+ spec->partition_list,
+ spec->order_list);
+ Frame_cursor *current_row_pos= new Frame_rows_current_row_bottom;
+ cursor_manager->add_cursor(bottom_bound);
+ cursor_manager->add_cursor(top_bound);
+ cursor_manager->add_cursor(current_row_pos);
+ DBUG_ASSERT(item_sum->fixed);
+ bool negative_offset= item_sum->sum_func() == Item_sum::LAG_FUNC;
+ fc= new Frame_positional_cursor(*current_row_pos,
+ *top_bound, *bottom_bound,
+ *item_sum->get_arg(1),
+ negative_offset);
+ fc->add_sum_func(item_sum);
+ cursor_manager->add_cursor(fc);
+ break;
+ }
+ case Item_sum::NTH_VALUE_FUNC:
+ {
+ Frame_cursor *bottom_bound= get_frame_cursor(thd, spec, false);
+ Frame_cursor *top_bound= get_frame_cursor(thd, spec, true);
+ cursor_manager->add_cursor(bottom_bound);
+ cursor_manager->add_cursor(top_bound);
+ DBUG_ASSERT(item_sum->fixed);
+ fc= new Frame_positional_cursor(*top_bound,
+ *top_bound, *bottom_bound,
+ *item_sum->get_arg(1),
+ false);
+ fc->add_sum_func(item_sum);
+ cursor_manager->add_cursor(fc);
+ break;
+ }
default:
fc= new Frame_unbounded_preceding(
thd, spec->partition_list, spec->order_list);