summaryrefslogtreecommitdiff
path: root/storage/rocksdb/rdb_compact_filter.h
blob: 93767b23787a90cb11ab271eebd207a903793518 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
/*
   Portions Copyright (c) 2016-Present, Facebook, Inc.
   Portions Copyright (c) 2012, Monty Program Ab

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
#pragma once

#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation  // gcc: Class implementation
#endif

/* C++ system header files */
#include <time.h>
#include <string>
#include <ctime>

/* RocksDB includes */
#include "rocksdb/compaction_filter.h"

/* MyRocks includes */
#include "./ha_rocksdb_proto.h"
#include "./rdb_datadic.h"

namespace myrocks {

class Rdb_compact_filter : public rocksdb::CompactionFilter {
 public:
  Rdb_compact_filter(const Rdb_compact_filter &) = delete;
  Rdb_compact_filter &operator=(const Rdb_compact_filter &) = delete;

  explicit Rdb_compact_filter(uint32_t _cf_id) : m_cf_id(_cf_id) {}
  ~Rdb_compact_filter() {
    // Increment stats by num expired at the end of compaction
    rdb_update_global_stats(ROWS_EXPIRED, m_num_expired);
  }

  // keys are passed in sorted order within the same sst.
  // V1 Filter is thread safe on our usage (creating from Factory).
  // Make sure to protect instance variables when switching to thread
  // unsafe in the future.
  virtual bool Filter(int level, const rocksdb::Slice &key,
                      const rocksdb::Slice &existing_value,
                      std::string *new_value,
                      bool *value_changed) const override {
    DBUG_ASSERT(key.size() >= sizeof(uint32));

    GL_INDEX_ID gl_index_id;
    gl_index_id.cf_id = m_cf_id;
    gl_index_id.index_id = rdb_netbuf_to_uint32((const uchar *)key.data());
    DBUG_ASSERT(gl_index_id.index_id >= 1);

    if (gl_index_id != m_prev_index) {
      m_should_delete =
          rdb_get_dict_manager()->is_drop_index_ongoing(gl_index_id);

      if (!m_should_delete) {
        get_ttl_duration_and_offset(gl_index_id, &m_ttl_duration,
                                    &m_ttl_offset);

        if (m_ttl_duration != 0 && m_snapshot_timestamp == 0) {
          /*
            For efficiency reasons, we lazily call GetIntProperty to get the
            oldest snapshot time (occurs once per compaction).
          */
          rocksdb::DB *const rdb = rdb_get_rocksdb_db();
          if (!rdb->GetIntProperty(rocksdb::DB::Properties::kOldestSnapshotTime,
                                   &m_snapshot_timestamp) ||
              m_snapshot_timestamp == 0) {
            m_snapshot_timestamp = static_cast<uint64_t>(std::time(nullptr));
          }

#ifndef DBUG_OFF
          int snapshot_ts = rdb_dbug_set_ttl_snapshot_ts();
          if (snapshot_ts) {
            m_snapshot_timestamp =
                static_cast<uint64_t>(std::time(nullptr)) + snapshot_ts;
          }
#endif
        }
      }

      m_prev_index = gl_index_id;
    }

    if (m_should_delete) {
      m_num_deleted++;
      return true;
    } else if (m_ttl_duration > 0 &&
               should_filter_ttl_rec(key, existing_value)) {
      m_num_expired++;
      return true;
    }

    return false;
  }

  virtual bool IgnoreSnapshots() const override { return true; }

  virtual const char *Name() const override { return "Rdb_compact_filter"; }

  void get_ttl_duration_and_offset(const GL_INDEX_ID &gl_index_id,
                                   uint64 *ttl_duration,
                                   uint32 *ttl_offset) const {
    DBUG_ASSERT(ttl_duration != nullptr);
    /*
      If TTL is disabled set ttl_duration to 0.  This prevents the compaction
      filter from dropping expired records.
    */
    if (!rdb_is_ttl_enabled()) {
      *ttl_duration = 0;
      return;
    }

    /*
      If key is part of system column family, it's definitely not a TTL key.
    */
    rocksdb::ColumnFamilyHandle *s_cf = rdb_get_dict_manager()->get_system_cf();
    if (s_cf == nullptr || gl_index_id.cf_id == s_cf->GetID()) {
      *ttl_duration = 0;
      return;
    }

    struct Rdb_index_info index_info;
    if (!rdb_get_dict_manager()->get_index_info(gl_index_id, &index_info)) {
      // NO_LINT_DEBUG
      sql_print_error(
          "RocksDB: Could not get index information "
          "for Index Number (%u,%u)",
          gl_index_id.cf_id, gl_index_id.index_id);
    }

#ifndef DBUG_OFF
    if (rdb_dbug_set_ttl_ignore_pk() &&
        index_info.m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY) {
      *ttl_duration = 0;
      return;
    }
#endif

    *ttl_duration = index_info.m_ttl_duration;
    if (Rdb_key_def::has_index_flag(index_info.m_index_flags,
                                    Rdb_key_def::TTL_FLAG)) {
      *ttl_offset = Rdb_key_def::calculate_index_flag_offset(
          index_info.m_index_flags, Rdb_key_def::TTL_FLAG);
    }
  }

  bool should_filter_ttl_rec(const rocksdb::Slice &key,
                             const rocksdb::Slice &existing_value) const {
    uint64 ttl_timestamp;
    Rdb_string_reader reader(&existing_value);
    if (!reader.read(m_ttl_offset) || reader.read_uint64(&ttl_timestamp)) {
      std::string buf;
      buf = rdb_hexdump(existing_value.data(), existing_value.size(),
                        RDB_MAX_HEXDUMP_LEN);
      // NO_LINT_DEBUG
      sql_print_error(
          "Decoding ttl from PK value failed in compaction filter, "
          "for index (%u,%u), val: %s",
          m_prev_index.cf_id, m_prev_index.index_id, buf.c_str());
      abort();
    }

    /*
      Filter out the record only if it is older than the oldest snapshot
      timestamp.  This prevents any rows from expiring in the middle of
      long-running transactions.
    */
    return ttl_timestamp + m_ttl_duration <= m_snapshot_timestamp;
  }

 private:
  // Column family for this compaction filter
  const uint32_t m_cf_id;
  // Index id of the previous record
  mutable GL_INDEX_ID m_prev_index = {0, 0};
  // Number of rows deleted for the same index id
  mutable uint64 m_num_deleted = 0;
  // Number of rows expired for the TTL index
  mutable uint64 m_num_expired = 0;
  // Current index id should be deleted or not (should be deleted if true)
  mutable bool m_should_delete = false;
  // TTL duration for the current index if TTL is enabled
  mutable uint64 m_ttl_duration = 0;
  // TTL offset for all records in the current index
  mutable uint32 m_ttl_offset = 0;
  // Oldest snapshot timestamp at the time a TTL index is discovered
  mutable uint64_t m_snapshot_timestamp = 0;
};

class Rdb_compact_filter_factory : public rocksdb::CompactionFilterFactory {
 public:
  Rdb_compact_filter_factory(const Rdb_compact_filter_factory &) = delete;
  Rdb_compact_filter_factory &operator=(const Rdb_compact_filter_factory &) =
      delete;
  Rdb_compact_filter_factory() = default;

  ~Rdb_compact_filter_factory() = default;

  const char *Name() const override { return "Rdb_compact_filter_factory"; }

  std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
      const rocksdb::CompactionFilter::Context &context) override {
    return std::unique_ptr<rocksdb::CompactionFilter>(
        new Rdb_compact_filter(context.column_family_id));
  }
};

}  // namespace myrocks