diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2019-11-22 10:13:14 -0500 |
---|---|---|
committer | Adam Kocoloski <kocolosk@apache.org> | 2019-11-22 10:13:14 -0500 |
commit | 37c8ace821578b0de42841298f6ad57c2a401049 (patch) | |
tree | f95ce97770fd600af6ae8c7e600cbc3c38ba5304 | |
parent | 0155bd884bd8c455cc4057811cf67035b04c427f (diff) | |
parent | 78eba8444c0ba50994c7b686251f3a7c21dc92ce (diff) | |
download | couchdb-37c8ace821578b0de42841298f6ad57c2a401049.tar.gz |
Add 'src/smoosh/' from commit '78eba8444c0ba50994c7b686251f3a7c21dc92ce'
git-subtree-dir: src/smoosh
git-subtree-mainline: 0155bd884bd8c455cc4057811cf67035b04c427f
git-subtree-split: 78eba8444c0ba50994c7b686251f3a7c21dc92ce
-rw-r--r-- | src/smoosh/README.md | 140 | ||||
-rw-r--r-- | src/smoosh/operator_guide.md | 396 | ||||
-rw-r--r-- | src/smoosh/src/smoosh.app.src | 29 | ||||
-rw-r--r-- | src/smoosh/src/smoosh.erl | 69 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_app.erl | 28 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_channel.erl | 306 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_priority_queue.erl | 86 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_server.erl | 594 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_sup.erl | 38 | ||||
-rw-r--r-- | src/smoosh/src/smoosh_utils.erl | 92 | ||||
-rw-r--r-- | src/smoosh/test/exunit/scheduling_window_test.exs | 81 | ||||
-rw-r--r-- | src/smoosh/test/exunit/test_helper.exs | 2 |
12 files changed, 1861 insertions, 0 deletions
diff --git a/src/smoosh/README.md b/src/smoosh/README.md new file mode 100644 index 000000000..9f9a48074 --- /dev/null +++ b/src/smoosh/README.md @@ -0,0 +1,140 @@ +Smoosh +====== + +Smoosh is CouchDB's auto-compaction daemon. It is notified when +databases and views are updated and may then elect to enqueue them for +compaction. + +API +--- + +All API functions are in smoosh.erl and only the exported functions in +this module should be called from outside of the smoosh application. + +Additionally, smoosh responds to config changes dynamically and these +changes are the principal means of interacting with smoosh. + +Top-Level Settings +------------------ + +The main settings one interacts with are: + +<dl> +<dt>db_channels<dd>A comma-separated list of channel names for +databases. +<dt>view_channels<dd>A comma-separated list of channel names for +views. +<dt>staleness<dd>The number of minutes that the (expensive) priority +calculation can be stale for before it is recalculated. Defaults to 5. +</dl> + +Sometimes it's necessary to use the following: + +<dl> +<dt>cleanup_index_files</dt><dd>Whether smoosh cleans up the files +for indexes that have been deleted. Defaults to false and probably +shouldn't be changed unless the cluster is running low on disk space, +and only after considering the ramifications.</dd> +<dt>wait_secs</dt><dd>The time a channel waits before starting compactions +to allow time to observe the system and make a smarter decision about what +to compact first. Hardly ever changed from the default. Default 30 (seconds). +</dd> +</dl> + +Channel Settings +---------------- + +A channel has several important settings that control runtime +behavior. + +<dl> +<dt>capacity<dd>The maximum number of items the channel can hold (lowest priority item is removed to make room for new items). Defaults to 9999. +<dt>concurrency<dd>The maximum number of jobs that can run concurrently. Defaults to 1. +<dt>max_priority<dd>The item must have a priority lower than this to be enqueued. Defaults to infinity. +<dt>max_size<dd>The item must be no larger than this many bytes in length to be enqueued. Defaults to infinity. +<dt>min_priority<dd>The item must have a priority at least this high to be enqueued. Defaults to 5.0 for ratio and 16 mb for slack. +<dt>min_changes<dd>The minimum number of changes since last compaction before the item will be enqueued. Defaults to 0. Currently only works for databases. +<dt>min_size<dd>The item must be at least this many bytes in length to be enqueued. Defaults to 1mb (1048576 bytes). +<dt>priority<dd>The method used to calculate priority. Can be ratio (calculated as disk_size/data_size) or slack (calculated as disk_size-data_size). Defaults to ratio. +</dl> + +Structure +--------- + +Smoosh consists of a central gen_server (smoosh_server) which manages +a number of subordinate smoosh_channel gen_servers. This is not +properly managed by OTP yet. + +Compaction Scheduling Algorithm +------------------------------- + +Smoosh decides whether to compact a database or view by evaluating the +item against the selection criteria of each _channel_ in the order +they are configured. By default there are two channels for databases +("ratio_dbs" and "slack_dbs"), and two channels for views ("ratio_views" +and "slack_views") + +Smoosh will enqueue the new item to the first channel that accepts +it. If none accept it, the item is not enqueued for compaction. + +Notes on the data_size value +---------------------------- + +Every database and view shard has a data_size value. In CouchDB this +accurately reflects the post-compaction file size. In DbCore, it is +the size of the file that we bill for. It excludes the b+tree and +database footer overhead. We also bill customers for the uncompressed +size of their documents, though we store them compressed on disk. +These two systems were developed independently (ours predates +CouchDB's) and DbCore only calculates the billing size value. + +Because of the way our data_size is currently calculated, it can +sometimes be necessary to enqueue databases and views with very low +ratios. Due to this, it is also currently impossible to tell how +optimally compacted a cluster is. + +Example config commands +----------------------- + +Change the set of database channels; + + config:set("smoosh", "db_channels", "small_dbs,medium_dbs,large_dbs"). + +Change the set of database channels on all live nodes in the cluster; + + rpc:multicall(config, set, ["smoosh", "db_channels", "small_dbs,medium_dbs,large_dbs"]). + +Change the concurrency of the ratio_dbs database channel to 2 + + config:set("smoosh.ratio_dbs", "concurrency", "2"). + +Change it on all live nodes in the cluster; + + rpc:multicall(config, set, ["smoosh.ratio_dbs", "concurrency", "2"]). + +Example API commands +-------------------- + +smoosh:status() + +This prints the state of each channel; how many jobs they are +currently running and how many jobs are enqueued (as well as the +lowest and highest priority of those enqueued items). The idea is to +provide, at a glance, sufficient insight into smoosh that an operator +can assess whether smoosh is adequately targeting the reclaimable +space in the cluster. In general, a healthy status output will have +items in the ratio_dbs and ratio_views channels. Owing to the default +settings, the slack_dbs and slack_views will almost certainly have +items in them. Historically, we've not found that the slack channels, +on their own, are particularly adept at keeping things well compacted. + +smoosh:enqueue_all_dbs(), smoosh:enqueue_all_views() + +These functions do just what they say but should not generally need to +be called, smoosh is supposed to be autonomous. Call them if you get +alerted to a disk space issue, they might well help. If they do, that +indicates a bug in smoosh as it should already have enqueued eligible +shards once they met the configured settings. + + + diff --git a/src/smoosh/operator_guide.md b/src/smoosh/operator_guide.md new file mode 100644 index 000000000..a0c981086 --- /dev/null +++ b/src/smoosh/operator_guide.md @@ -0,0 +1,396 @@ +# An operator's guide to smoosh + +Smoosh is the auto-compactor for the databases. It automatically selects and +processes the compacting of database shards on each node. + +## Smoosh Channels + +Smoosh works using the concept of channels. A channel is essentially a queue of pending +compactions. There are separate sets of channels for database and view compactions. Each +channel is assigned a configuration which defines whether a compaction ends up in +the channel's queue and how compactions are prioritised within that queue. + +Smoosh takes each channel and works through the compactions queued in each in priority +order. Each channel is processed concurrently, so the priority levels only matter within +a given channel. + +Finally, each channel has an assigned number of active compactions, which defines how +many compactions happen for that channel in parallel. For example, a cluster with +a lot of database churn but few views might require more active compactions to the +database channel(s). + +It's important to remember that a channel is local to a dbcore node, that is +each node maintains and processes an independent set of compactions. + +### Channel configuration options + +#### Channel types + +Each channel has a basic type for the algorithm it uses to select pending +compactions for its queue and how it prioritises them. + +The two queue types are: + +* **ratio**: this uses the ratio `total_bytes / user_bytes` as its driving +calculation. The result _X_ must be greater than some configurable value _Y_ for a +compaction to be added to the queue. Compactions are then prioritised for +higher values of _X_. + +* **slack**: this uses `total_bytes - user_bytes` as its driving calculation. +The result _X_ must be greater than some configurable value _Y_ for a compaction +to be added to the queue. Compactions are prioritised for higher values of _X_. + +In both cases, _Y_ is set using the `min_priority` configuration variable. The +calculation of _X_ is described in [Priority calculation](#priority-calculation), below. + +Both algorithms operate on two main measures: + +* **user_bytes**: this is the amount of data the user has in the file. It +doesn't include storage overhead: old revisions, on-disk btree structure and +so on. + +* **total_bytes**: the size of the file on disk. + +Channel type is set using the `priority` configuration setting. + +#### Further configuration options + +Beyond its basic type, there are several other configuration options which +can be applied to a queue. + +*All options MUST be set as strings.* See the [smoosh readme][srconfig] for +all settings and their defaults. + +#### Priority calculation + +The algorithm type and certain configuration options feed into the priority +calculation. + +The priority is calculated when a compaction is enqueued. As each channel +has a different configuration, each channel will end up with a different +priority value. The enqueue code checks each channel in turn to see whether the +compaction passes its configured priority threshold (`min_priority`). Once +a channel is found that can accept the compaction, the compaction is added +to that channel's queue and the enqueue process stops. Therefore the +ordering of channels has a bearing in what channel a compaction ends up in. + +If you want to follow this along, the call order is all in `smoosh_server`, +`enqueue_request -> find_channel -> get_priority`. + +The priority calculation is probably the easiest way to understand the effects +of configuration variables. It's defined in `smoosh_server#get_priority/3`, +currently [here][ss]. + +[ss]: https://github.com/apache/couchdb-smoosh/blob/master/src/smoosh_server.erl#L277 +[srconfig]: https://github.com/apache/couchdb-smoosh#channel-settings + +#### Background Detail + +`user_bytes` is called `data_size` in `db_info` blocks. It is the total of all bytes +that are used to store docs and their attachments. + +Since `.couch` files are append only, every update adds data to the file. When +you update a btree, a new leaf node is written and all the nodes back up the +root. In this update, old data is never overwritten and these parts of the +file are no longer live; this includes old btree nodes and document bodies. +Compaction takes this file and writes a new file that only contains live data. + +`total_data` is the number of bytes in the file as reported by `ls -al filename`. + +#### Flaws + +An important flaw in this calculation is that `total_data` takes into account +the compression of data on disk, whereas `user_bytes` does not. This can give +unexpected results to calculations, as the values are not directly comparable. + +However, it's the best measure we currently have. + +[Even more info](https://github.com/apache/couchdb-smoosh#notes-on-the-data_size-value). + + +### Defining a channel + +Defining a channel is done via normal dbcore configuration, with some +convention as to the parameter names. + +Channel configuration is defined using `smoosh.channel_name` top level config +options. Defining a channel is just setting the various options you want +for the channel, then bringing it into smoosh's sets of active channels by +adding it to either `db_channels` or `view_channels`. + +This means that smoosh channels can be defined either for a single node or +globally across a cluster, by setting the configuration either globally or +locally. In the example, we set up a new global channel. + +It's important to choose good channel names. There are some conventional ones: + +* `ratio_dbs`: a ratio channel for dbs, usually using the default settings. +* `slack_dbs`: a slack channel for dbs, usually using the default settings. +* `ratio_views`: a ratio channel for views, usually using the default settings. +* `slack_views`: a slack channel for views, usually using the default settings. + +These four are defined by default if there are no others set ([source][source1]). + +[source1]: https://github.com/apache/couchdb-smoosh/blob/master/src/smoosh_server.erl#L75 + +And some standard names for ones we often have to add: + +* `big_dbs`: a ratio channel for only enqueuing large database shards. What + _large_ means is very workload specific. + +Channels have certain defaults for their configuration, defined in the +[smoosh readme][srconfig]. It's only neccessary to set up how this channel +differs from those defaults. Below, we just need to set the `min_size` and +`concurrency` settings, and allow the `priority` to default to `ratio` +along with the other defaults. + +```bash +# Define the new channel +(couchdb@db1.foo.bar)3> s:set_config("smoosh.big_dbs", "min_size", "20000000000", global). +{[ok,ok,ok],[]} +(couchdb@db1.foo.bar)3> s:set_config("smoosh.big_dbs", "concurrency", "2", global). +{[ok,ok,ok],[]} + +# Add the channel to the db_channels set -- note we need to get the original +# value first so we can add the new one to the existing list! +(couchdb@db1.foo.bar)5> s:get_config("smoosh", "db_channels", global). +{[{'couchdb@db1.foo.bar',"ratio_dbs"}, +{'couchdb@db3.foo.bar',"ratio_dbs"}, +{'couchdb@db2.foo.bar',"ratio_dbs"}], +[]} +(couchdb@db1.foo.bar)6> s:set_config("smoosh", "db_channels", "ratio_dbs,big_dbs", global). +{[ok,ok,ok],[]} +``` + +### Viewing active channels + +```bash +(couchdb@db3.foo.bar)3> s:get_config("smoosh", "db_channels", global). +{[{'couchdb@db3.foo.bar',"ratio_dbs,big_dbs"}, + {'couchdb@db1.foo.bar',"ratio_dbs,big_dbs"}, + {'couchdb@db2.foo.bar',"ratio_dbs,big_dbs"}], + []} +(couchdb@db3.foo.bar)4> s:get_config("smoosh", "view_channels", global). +{[{'couchdb@db3.foo.bar',"ratio_views"}, + {'couchdb@db1.foo.bar',"ratio_views"}, + {'couchdb@db2.foo.bar',"ratio_views"}], + []} +``` + +### Removing a channel + +```bash +# Remove it from the active set +(couchdb@db1.foo.bar)5> s:get_config("smoosh", "db_channels", global). +{[{'couchdb@db1.foo.bar',"ratio_dbs,big_dbs"}, +{'couchdb@db3.foo.bar',"ratio_dbs,big_dbs"}, +{'couchdb@db2.foo.bar',"ratio_dbs,big_dbs"}], +[]} +(couchdb@db1.foo.bar)6> s:set_config("smoosh", "db_channels", "ratio_dbs", global). +{[ok,ok,ok],[]} + +# Delete the config -- you need to do each value +(couchdb@db1.foo.bar)3> rpc:multicall(config, delete, ["smoosh.big_dbs", "concurrency"]). +{[ok,ok,ok],[]} +(couchdb@db1.foo.bar)3> rpc:multicall(config, delete, ["smoosh.big_dbs", "min_size"]). +{[ok,ok,ok],[]} +``` + +### Getting channel configuration + +As far as I know, you have to get each setting separately: + +``` +(couchdb@db1.foo.bar)1> s:get_config("smoosh.big_dbs", "concurrency", global). +{[{'couchdb@db3.foo.bar',"2"}, + {'couchdb@db1.foo.bar',"2"}, + {'couchdb@db2.foo.bar',"2"}], + []} + +``` + +### Setting channel configuration + +The same as defining a channel, you just need to set the new value: + +``` +(couchdb@db1.foo.bar)2> s:set_config("smoosh.ratio_dbs", "concurrency", "1", global). +{[ok,ok,ok],[]} +``` + +It sometimes takes a little while to take affect. + + + +## Standard operating procedures + +There are a few standard things that operators often have to do when responding +to pages. + +In addition to the below, in some circumstances it's useful to define new +channels with certain properties (`big_dbs` is a common one) if smoosh isn't +selecting and prioritising compactions that well. + +### Checking smoosh's status + +You can see the queued items for each channel by going into `remsh` on a node +and using: + +``` +> smoosh:status(). +{ok,[{"ratio_dbs", + [{active,1}, + {starting,0}, + {waiting,[{size,522}, + {min,{5.001569007970237,{1378,394651,323864}}}, + {max,{981756.5441159063,{1380,370286,655752}}}]}]}, + {"slack_views", + [{active,1}, + {starting,0}, + {waiting,[{size,819}, + {min,{16839814,{1375,978920,326458}}}, + {max,{1541336279,{1380,370205,709896}}}]}]}, + {"slack_dbs", + [{active,1}, + {starting,0}, + {waiting,[{size,286}, + {min,{19004944,{1380,295245,887295}}}, + {max,{48770817098,{1380,370185,876596}}}]}]}, + {"ratio_views", + [{active,1}, + {starting,0}, + {waiting,[{size,639}, + {min,{5.0126340031149335,{1380,186581,445489}}}, + {max,{10275.555632057285,{1380,370411,421477}}}]}]}]} +``` + +This gives you the node-local status for each queue. + +Under each channel there is some information about the channel: + +* `active`: number of current compactions in the channel. +* `starting`: number of compactions starting-up. +* `waiting`: number of queued compactions. + * `min` and `max` give an idea of the queued jobs' effectiveness. The values + for these are obviously dependent on whether the queue is ratio or slack. + +For ratio queues, the default minimum for smoosh to enqueue a compaction is 5. In +the example above, we can guess that 981,756 is quite high. This could be a +small database, however, so it doesn't necessarily mean useful compactions +from the point of view of reclaiming disk space. + +For this example, we can see that there are quite a lot of queued compactions, +but we don't know which would be most effective to run to reclaim disk space. +It's also worth noting that the waiting queue sizes are only meaningful +related to other factors on the cluster (e.g., db number and size). + + +### Smoosh IOQ priority + +This is a global setting which affects all channels. Increasing it allows each +active compaction to (hopefully) proceed faster as the compaction work is of +a higher priority relative to other jobs. Decreasing it (hopefully) has the +converse effect. + +By this point you'll [know whether smoosh is backing up](#checking-smooshs-status). +If it's falling behind (big queues), try increasing compaction priority. + +Smoosh's IOQ priority is controlled via the `ioq` -> `compaction` queue. + +``` +> s:get_config("ioq", "compaction", global). +{[{'couchdb@db1.foo.bar',undefined}, + {'couchdb@db2.foo.bar',undefined}, + {'couchdb@db3.foo.bar',undefined}], + []} + +``` + +Priority by convention runs 0 to 1, though the priority can be any positive +number. The default for compaction is 0.01; pretty low. + +If it looks like smoosh has a bunch of work that it's not getting +through, priority can be increased. However, be careful that this +doesn't adversely impact the customer experience. If it will, and +it's urgent, at least drop them a warning. + +``` +> s:set_config("ioq", "compaction", "0.5", global). +{[ok,ok,ok],[]} +``` + +In general, this should be a temporary measure. For some clusters, +a change from the default may be required to help smoosh keep up +with particular workloads. + +### Granting specific channels more workers + +Giving smoosh a higher concurrency for a given channel can allow a backlog +in that channel to catch up. + +Again, some clusters run best with specific channels having more workers. + +From [assessing disk space](#assess-the-space-on-the-disk), you should +know whether the biggest offenders are db or view files. From this, +you can infer whether it's worth giving a specific smoosh channel a +higher concurrency. + +The current setting can be seen for a channel like so: + +``` +> s:get_config("smoosh.ratio_dbs", "concurrency", global). +{[{'couchdb@db1.foo.bar',undefined}, + {'couchdb@db2.foo.bar',undefined}, + {'couchdb@db3.foo.bar',undefined}], + []} +``` + +`undefined` means the default is used. + +If we knew that disk space for DBs was the major user of disk space, we might +want to increase a `_dbs` channel. Experience shows `ratio_dbs` is often best +but evaluate this based on the current status. + +If we want to increase the ratio_dbs setting: + +``` +> s:set_config("smoosh.ratio_dbs", "concurrency", "2", global). +{[ok,ok,ok],[]} +``` + +### Suspending smoosh + +If smoosh itself is causing issues, it's possible to suspend its operation. +This differs from either `application:stop(smoosh).` or setting all channel's +concurrency to zero because it both pauses on going compactions and maintains +the channel queues intact. + +If, for example, a node's compactions are causing disk space issues, smoosh +could be suspended while working out which channel is causing the problem. For +example, a big_dbs channel might be creating huge compaction-in-progress +files if there's not much in the shard to compact away. + +It's therefore useful to use when testing to see if smoosh is causing a +problem. + +``` +# suspend +smoosh:suspend(). + +# resume a suspended smoosh +smoosh:resume(). +``` + +Suspend is currently pretty literal: `erlang:suspend_process(Pid, [unless_suspending])` +is called for each compaction process in each channel. `resume_process` is called +for resume. + +### Restarting Smoosh + +Restarting Smoosh is a long shot and is a brute force approach in the hope that +when Smoosh rescans the DBs that it makes the right decisions. If required to take +this step contact rnewson or davisp so that they can inspect Smoosh and see the bug. + +``` +> exit(whereis(smoosh_server), kill), smoosh:enqueue_all_dbs(), smoosh:enqueue_all_views(). +``` diff --git a/src/smoosh/src/smoosh.app.src b/src/smoosh/src/smoosh.app.src new file mode 100644 index 000000000..a6cdb7f5e --- /dev/null +++ b/src/smoosh/src/smoosh.app.src @@ -0,0 +1,29 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, smoosh, + [ + {description, "Auto-compaction daemon"}, + {vsn, git}, + {registered, [smoosh_server]}, + {applications, [ + kernel, + stdlib, + couch_log, + config, + couch_event, + couch, + mem3 + ]}, + {mod, { smoosh_app, []}}, + {env, []} + ]}. diff --git a/src/smoosh/src/smoosh.erl b/src/smoosh/src/smoosh.erl new file mode 100644 index 000000000..676e7faad --- /dev/null +++ b/src/smoosh/src/smoosh.erl @@ -0,0 +1,69 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(smoosh). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +-export([suspend/0, resume/0, enqueue/1, status/0]). +-export([enqueue_all_dbs/0, enqueue_all_dbs/1, enqueue_all_views/0]). + +suspend() -> + smoosh_server:suspend(). + +resume() -> + smoosh_server:resume(). + +enqueue(Object) -> + smoosh_server:enqueue(Object). + +sync_enqueue(Object) -> + smoosh_server:sync_enqueue(Object). + +sync_enqueue(Object, Timeout) -> + smoosh_server:sync_enqueue(Object, Timeout). + +status() -> + smoosh_server:status(). + +enqueue_all_dbs() -> + fold_local_shards(fun(#shard{name=Name}, _Acc) -> + sync_enqueue(Name) end, ok). + +enqueue_all_dbs(Timeout) -> + fold_local_shards(fun(#shard{name=Name}, _Acc) -> + sync_enqueue(Name, Timeout) end, ok). + +enqueue_all_views() -> + fold_local_shards(fun(#shard{name=Name}, _Acc) -> + catch enqueue_views(Name) end, ok). + +fold_local_shards(Fun, Acc0) -> + mem3:fold_shards(fun(Shard, Acc1) -> + case node() == Shard#shard.node of + true -> + Fun(Shard, Acc1); + false -> + Acc1 + end + end, Acc0). + +enqueue_views(ShardName) -> + DbName = mem3:dbname(ShardName), + {ok, DDocs} = fabric:design_docs(DbName), + [sync_enqueue({ShardName, id(DDoc)}) || DDoc <- DDocs]. + +id(#doc{id=Id}) -> + Id; +id({Props}) -> + couch_util:get_value(<<"_id">>, Props). diff --git a/src/smoosh/src/smoosh_app.erl b/src/smoosh/src/smoosh_app.erl new file mode 100644 index 000000000..eba3579fe --- /dev/null +++ b/src/smoosh/src/smoosh_app.erl @@ -0,0 +1,28 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(smoosh_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + smoosh_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl new file mode 100644 index 000000000..d8a8d14a9 --- /dev/null +++ b/src/smoosh/src/smoosh_channel.erl @@ -0,0 +1,306 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(smoosh_channel). +-behaviour(gen_server). +-vsn(1). +-include_lib("couch/include/couch_db.hrl"). + +% public api. +-export([start_link/1, close/1, suspend/1, resume/1, get_status/1]). +-export([enqueue/3, last_updated/2, flush/1]). + +% gen_server api. +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2]). + +% records. + +-record(state, { + active=[], + name, + waiting=smoosh_priority_queue:new(), + paused=true, + starting=[] +}). + +% public functions. + +start_link(Name) -> + gen_server:start_link(?MODULE, Name, []). + +suspend(ServerRef) -> + gen_server:call(ServerRef, suspend). + +resume(ServerRef) -> + gen_server:call(ServerRef, resume). + +enqueue(ServerRef, Object, Priority) -> + gen_server:cast(ServerRef, {enqueue, Object, Priority}). + +last_updated(ServerRef, Object) -> + gen_server:call(ServerRef, {last_updated, Object}). + +get_status(ServerRef) -> + gen_server:call(ServerRef, status). + +close(ServerRef) -> + gen_server:call(ServerRef, close). + +flush(ServerRef) -> + gen_server:call(ServerRef, flush). + +% gen_server functions. + +init(Name) -> + schedule_unpause(), + erlang:send_after(60 * 1000, self(), check_window), + {ok, #state{name=Name}}. + +handle_call({last_updated, Object}, _From, State0) -> + {ok, State} = code_change(nil, State0, nil), + LastUpdated = smoosh_priority_queue:last_updated(Object, State#state.waiting), + {reply, LastUpdated, State}; + +handle_call(suspend, _From, State0) -> + {ok, State} = code_change(nil, State0, nil), + #state{active = Active} = State, + [catch erlang:suspend_process(Pid, [unless_suspending]) + || {_,Pid} <- Active], + {reply, ok, State#state{paused=true}}; + +handle_call(resume, _From, State0) -> + {ok, State} = code_change(nil, State0, nil), + #state{active = Active} = State, + [catch erlang:resume_process(Pid) || {_,Pid} <- Active], + {reply, ok, State#state{paused=false}}; + +handle_call(status, _From, State0) -> + {ok, State} = code_change(nil, State0, nil), + {reply, {ok, [ + {active, length(State#state.active)}, + {starting, length(State#state.starting)}, + {waiting, smoosh_priority_queue:info(State#state.waiting)} + ]}, State}; + +handle_call(close, _From, State0) -> + {ok, State} = code_change(nil, State0, nil), + {stop, normal, ok, State}; + +handle_call(flush, _From, State0) -> + {ok, State} = code_change(nil, State0, nil), + {reply, ok, State#state{waiting=smoosh_priority_queue:new()}}. + +handle_cast({enqueue, _Object, 0}, State0) -> + {ok, State} = code_change(nil, State0, nil), + {noreply, State}; +handle_cast({enqueue, Object, Priority}, State0) -> + {ok, State} = code_change(nil, State0, nil), + {noreply, maybe_start_compaction(add_to_queue(Object, Priority, State))}. + +% We accept noproc here due to possibly having monitored a restarted compaction +% pid after it finished. +handle_info({'DOWN', Ref, _, Job, Reason}, State0) when Reason == normal; + Reason == noproc -> + {ok, State} = code_change(nil, State0, nil), + #state{active=Active, starting=Starting} = State, + {noreply, maybe_start_compaction( + State#state{active=lists:keydelete(Job, 2, Active), + starting=lists:keydelete(Ref, 1, Starting)})}; + +handle_info({'DOWN', Ref, _, Job, Reason}, State0) -> + {ok, State} = code_change(nil, State0, nil), + #state{active=Active0, starting=Starting0} = State, + case lists:keytake(Job, 2, Active0) of + {value, {Key, _Pid}, Active1} -> + couch_log:warning("exit for compaction of ~p: ~p", [ + smoosh_utils:stringify(Key), Reason]), + {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]), + {noreply, maybe_start_compaction(State#state{active=Active1})}; + false -> + case lists:keytake(Ref, 1, Starting0) of + {value, {_, Key}, Starting1} -> + couch_log:warning("failed to start compaction of ~p: ~p", [ + smoosh_utils:stringify(Key), Reason]), + {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]), + {noreply, maybe_start_compaction(State#state{starting=Starting1})}; + false -> + {noreply, State} + end + end; + +handle_info({Ref, {ok, Pid}}, State0) when is_reference(Ref) -> + {ok, State} = code_change(nil, State0, nil), + case lists:keytake(Ref, 1, State#state.starting) of + {value, {_, Key}, Starting1} -> + couch_log:notice("~s: Started compaction for ~s", + [State#state.name, smoosh_utils:stringify(Key)]), + erlang:monitor(process, Pid), + erlang:demonitor(Ref, [flush]), + {noreply, State#state{active=[{Key, Pid}|State#state.active], + starting=Starting1}}; + false -> + {noreply, State} + end; + +handle_info(check_window, State0) -> + {ok, State} = code_change(nil, State0, nil), + #state{paused = Paused, name = Name} = State, + StrictWindow = smoosh_utils:get(Name, "strict_window", "false"), + FinalState = case {not Paused, smoosh_utils:in_allowed_window(Name)} of + {false, false} -> + % already in desired state + State; + {true, true} -> + % already in desired state + State; + {false, true} -> + % resume is always safe even if we did not previously suspend + {reply, ok, NewState} = handle_call(resume, nil, State), + NewState; + {true, false} -> + if StrictWindow =:= "true" -> + {reply, ok, NewState} = handle_call(suspend, nil, State), + NewState; + true -> + State#state{paused=true} + end + end, + erlang:send_after(60 * 1000, self(), check_window), + {noreply, FinalState}; + +handle_info(pause, State0) -> + {ok, State} = code_change(nil, State0, nil), + {noreply, State#state{paused=true}}; +handle_info(unpause, State0) -> + {ok, State} = code_change(nil, State0, nil), + {noreply, maybe_start_compaction(State#state{paused=false})}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, #state{}=State, _Extra) -> + {ok, State}. + +% private functions. + +add_to_queue(Key, Priority, State) -> + #state{active=Active,waiting=Q} = State, + case lists:keymember(Key, 1, Active) of + true -> + State; + false -> + Capacity = list_to_integer(smoosh_utils:get(State#state.name, "capacity", "9999")), + couch_log:notice( + "~s: adding ~p to internal compactor queue with priority ~p", + [State#state.name, Key, Priority]), + State#state{ + waiting=smoosh_priority_queue:in(Key, Priority, Priority, Capacity, Q) + } + end. + +maybe_start_compaction(#state{paused=true}=State) -> + State; +maybe_start_compaction(State) -> + Concurrency = list_to_integer(smoosh_utils:get(State#state.name, + "concurrency", "1")), + if length(State#state.active) + length(State#state.starting) < Concurrency -> + case smoosh_priority_queue:out(State#state.waiting) of + false -> + State; + {Key, Priority, Q} -> + try + State2 = case start_compact(State, Key) of + false -> + State; + State1 -> + couch_log:notice( + "~s: Starting compaction for ~s (priority ~p)", + [State#state.name, smoosh_utils:stringify(Key), Priority]), + State1 + end, + maybe_start_compaction(State2#state{waiting=Q}) + catch Class:Exception -> + couch_log:notice("~s: ~p ~p for ~s", + [State#state.name, Class, Exception, + smoosh_utils:stringify(Key)]), + maybe_start_compaction(State#state{waiting=Q}) + end + end; + true -> + State + end. + +start_compact(State, {schema, DbName, GroupId}) -> + case smoosh_utils:ignore_db({DbName, GroupId}) of + false -> + {ok, Pid} = couch_md_index_manager:get_group_pid(DbName, + GroupId), + Ref = erlang:monitor(process, Pid), + Pid ! {'$gen_call', {self(), Ref}, compact}, + State#state{starting=[{Ref, {schema, DbName, + GroupId}} | State#state.starting]}; + _ -> + false + end; + +start_compact(State, DbName) when is_list(DbName) -> + start_compact(State, ?l2b(DbName)); +start_compact(State, DbName) when is_binary(DbName) -> + {ok, Db} = couch_db:open_int(DbName, []), + try start_compact(State, Db) after couch_db:close(Db) end; +start_compact(State, {Shard,GroupId}) -> + case smoosh_utils:ignore_db({Shard, GroupId}) of + false -> + DbName = mem3:dbname(Shard), + {ok, Pid} = couch_index_server:get_index( + couch_mrview_index, Shard, GroupId), + spawn(fun() -> cleanup_index_files(DbName, Shard) end), + Ref = erlang:monitor(process, Pid), + Pid ! {'$gen_call', {self(), Ref}, compact}, + State#state{starting=[{Ref, {Shard, GroupId}}|State#state.starting]}; + _ -> + false + end; +start_compact(State, Db) -> + case smoosh_utils:ignore_db(Db) of + false -> + DbPid = couch_db:get_pid(Db), + Key = couch_db:name(Db), + case couch_db:get_compactor_pid(Db) of + nil -> + Ref = erlang:monitor(process, DbPid), + DbPid ! {'$gen_call', {self(), Ref}, start_compact}, + State#state{starting=[{Ref, Key}|State#state.starting]}; + % database is still compacting so we can just monitor the existing + % compaction pid + CPid -> + couch_log:notice("Db ~s continuing compaction", + [smoosh_utils:stringify(Key)]), + erlang:monitor(process, CPid), + State#state{active=[{Key, CPid}|State#state.active]} + end; + _ -> + false + end. + +schedule_unpause() -> + WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")), + erlang:send_after(WaitSecs * 1000, self(), unpause). + +cleanup_index_files(DbName, _Shard) -> + case config:get("smoosh", "cleanup_index_files", "false") of + "true" -> + fabric:cleanup_index_files(DbName); + _ -> + ok + end. diff --git a/src/smoosh/src/smoosh_priority_queue.erl b/src/smoosh/src/smoosh_priority_queue.erl new file mode 100644 index 000000000..6376103d9 --- /dev/null +++ b/src/smoosh/src/smoosh_priority_queue.erl @@ -0,0 +1,86 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(smoosh_priority_queue). + +-export([new/0, last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]). + +-record(priority_queue, { + dict=dict:new(), + tree=gb_trees:empty() +}). + +new() -> + #priority_queue{}. + +last_updated(Key, #priority_queue{dict=Dict}) -> + case dict:find(Key, Dict) of + {ok, {_Priority, {LastUpdatedMTime, _MInt}}} -> + LastUpdatedMTime; + error -> + false + end. + +is_key(Key, #priority_queue{dict=Dict}) -> + dict:is_key(Key, Dict). + +in(Key, Value, Priority, Q) -> + in(Key, Value, Priority, infinity, Q). + +in(Key, Value, Priority, Capacity, #priority_queue{dict=Dict, tree=Tree}) -> + Tree1 = case dict:find(Key, Dict) of + {ok, TreeKey} -> + gb_trees:delete_any(TreeKey, Tree); + error -> + Tree + end, + Now = {erlang:monotonic_time(), erlang:unique_integer([monotonic])}, + TreeKey1 = {Priority, Now}, + Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1), + Dict1 = dict:store(Key, TreeKey1, Dict), + truncate(Capacity, #priority_queue{dict=Dict1, tree=Tree2}). + +out(#priority_queue{dict=Dict,tree=Tree}) -> + case gb_trees:is_empty(Tree) of + true -> + false; + false -> + {_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree), + Dict1 = dict:erase(Key, Dict), + {Key, Value, #priority_queue{dict=Dict1, tree=Tree1}} + end. + +size(#priority_queue{tree=Tree}) -> + gb_trees:size(Tree). + +info(#priority_queue{tree=Tree}=Q) -> + [{size, ?MODULE:size(Q)}| + case gb_trees:is_empty(Tree) of + true -> + []; + false -> + {Min, _, _} = gb_trees:take_smallest(Tree), + {Max, _, _} = gb_trees:take_largest(Tree), + [{min, Min}, {max, Max}] + end]. + +truncate(infinity, Q) -> + Q; +truncate(Capacity, Q) when Capacity > 0 -> + truncate(Capacity, ?MODULE:size(Q), Q). + +truncate(Capacity, Size, Q) when Size =< Capacity -> + Q; +truncate(Capacity, Size, #priority_queue{dict=Dict, tree=Tree}) when Size > 0 -> + {_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree), + Q1 = #priority_queue{dict=dict:erase(Key, Dict), tree=Tree1}, + truncate(Capacity, ?MODULE:size(Q1), Q1). diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl new file mode 100644 index 000000000..43f4bd8e2 --- /dev/null +++ b/src/smoosh/src/smoosh_server.erl @@ -0,0 +1,594 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(smoosh_server). +-behaviour(gen_server). +-vsn(4). +-behaviour(config_listener). +-include_lib("couch/include/couch_db.hrl"). + +% public api. +-export([ + start_link/0, + suspend/0, + resume/0, + enqueue/1, + sync_enqueue/1, + sync_enqueue/2, + handle_db_event/3, + status/0 +]). + +-define(SECONDS_PER_MINUTE, 60). + +% gen_server api. +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2]). + +% config_listener api +-export([handle_config_change/5, handle_config_terminate/3]). + +% exported but for internal use. +-export([enqueue_request/2]). + +-ifdef(TEST). +-define(RELISTEN_DELAY, 50). +-else. +-define(RELISTEN_DELAY, 5000). +-endif. + +% private records. + +-record(state, { + db_channels=[], + view_channels=[], + schema_channels=[], + tab, + event_listener, + waiting=dict:new() +}). + +-record(channel, { + name, + pid +}). + +% public functions. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +suspend() -> + gen_server:call(?MODULE, suspend). + +resume() -> + gen_server:call(?MODULE, resume). + +status() -> + gen_server:call(?MODULE, status). + +enqueue(Object) -> + gen_server:cast(?MODULE, {enqueue, Object}). + +sync_enqueue(Object) -> + gen_server:call(?MODULE, {enqueue, Object}). + +sync_enqueue(Object, Timeout) -> + gen_server:call(?MODULE, {enqueue, Object}, Timeout). + +handle_db_event(DbName, local_updated, St) -> + smoosh_server:enqueue(DbName), + {ok, St}; +handle_db_event(DbName, updated, St) -> + smoosh_server:enqueue(DbName), + {ok, St}; +handle_db_event(DbName, {index_commit, IdxName}, St) -> + smoosh_server:enqueue({DbName, IdxName}), + {ok, St}; +handle_db_event(DbName, {schema_updated, DDocId}, St) -> + smoosh_server:enqueue({schema, DbName, DDocId}), + {ok, St}; +handle_db_event(_DbName, _Event, St) -> + {ok, St}. + +% gen_server functions. + +init([]) -> + process_flag(trap_exit, true), + ok = config:listen_for_changes(?MODULE, nil), + {ok, Pid} = start_event_listener(), + DbChannels = smoosh_utils:split( + config:get("smoosh", "db_channels", "upgrade_dbs,ratio_dbs,slack_dbs")), + ViewChannels = smoosh_utils:split( + config:get("smoosh", "view_channels", "upgrade_views,ratio_views,slack_views")), + SchemaChannels = smoosh_utils:split(config:get("smoosh", + "schema_channels", "ratio_schemas,slack_schemas")), + Tab = ets:new(channels, [{keypos, #channel.name}]), + {ok, create_missing_channels(#state{ + db_channels=DbChannels, + view_channels=ViewChannels, + schema_channels=SchemaChannels, + event_listener=Pid, + tab=Tab + })}. + +handle_config_change("smoosh", "db_channels", L, _, _) -> + {ok, gen_server:cast(?MODULE, {new_db_channels, smoosh_utils:split(L)})}; +handle_config_change("smoosh", "view_channels", L, _, _) -> + {ok, gen_server:cast(?MODULE, {new_view_channels, smoosh_utils:split(L)})}; +handle_config_change("smoosh", "schema_channels", L, _, _) -> + {ok, gen_server:cast(?MODULE, {new_schema_channels, smoosh_utils:split(L)})}; +handle_config_change(_, _, _, _, _) -> + {ok, nil}. + +handle_config_terminate(_Server, stop, _State) -> + ok; +handle_config_terminate(_Server, _Reason, _State) -> + erlang:send_after(?RELISTEN_DELAY, + whereis(?MODULE), restart_config_listener). + +handle_call(status, _From, State) -> + Acc = ets:foldl(fun get_channel_status/2, [], State#state.tab), + {reply, {ok, Acc}, State}; + +handle_call({enqueue, Object}, _From, State) -> + {noreply, NewState} = handle_cast({enqueue, Object}, State), + {reply, ok, NewState}; + +handle_call(suspend, _From, State) -> + ets:foldl(fun(#channel{name=Name, pid=P}, _) -> + couch_log:notice("Suspending ~p", [Name]), + smoosh_channel:suspend(P) end, 0, + State#state.tab), + {reply, ok, State}; + +handle_call(resume, _From, State) -> + ets:foldl(fun(#channel{name=Name, pid=P}, _) -> + couch_log:notice("Resuming ~p", [Name]), + smoosh_channel:resume(P) end, 0, + State#state.tab), + {reply, ok, State}. + +handle_cast({new_db_channels, Channels}, State) -> + [smoosh_channel:close(channel_pid(State#state.tab, C)) || + C <- State#state.db_channels -- Channels], + {noreply, create_missing_channels(State#state{db_channels=Channels})}; + +handle_cast({new_view_channels, Channels}, State) -> + [smoosh_channel:close(channel_pid(State#state.tab, C)) || + C <- State#state.view_channels -- Channels], + {noreply, create_missing_channels(State#state{view_channels=Channels})}; + +handle_cast({new_schema_channels, Channels}, State) -> + [smoosh_channel:close(channel_pid(State#state.tab, C)) || + C <- State#state.schema_channels -- Channels], + {noreply, create_missing_channels(State#state{view_channels=Channels})}; + +handle_cast({enqueue, Object}, State) -> + #state{waiting=Waiting}=State, + case dict:is_key(Object, Waiting) of + true -> + {noreply, State}; + false -> + {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]), + {noreply, State#state{waiting=dict:store(Object, Ref, Waiting)}} + end. + +handle_info({'EXIT', Pid, Reason}, #state{event_listener=Pid}=State) -> + couch_log:notice("update notifier died ~p", [Reason]), + {ok, Pid1} = start_event_listener(), + {noreply, State#state{event_listener=Pid1}}; +handle_info({'EXIT', Pid, Reason}, State) -> + couch_log:notice("~p ~p died ~p", [?MODULE, Pid, Reason]), + case ets:match_object(State#state.tab, #channel{pid=Pid, _='_'}) of + [#channel{name=Name}] -> + ets:delete(State#state.tab, Name); + _ -> + ok + end, + {noreply, create_missing_channels(State)}; + +handle_info({'DOWN', Ref, _, _, _}, State) -> + Waiting = dict:filter(fun(_Key, Value) -> Value =/= Ref end, + State#state.waiting), + {noreply, State#state{waiting=Waiting}}; + +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + ets:foldl(fun(#channel{pid=P}, _) -> smoosh_channel:close(P) end, 0, + State#state.tab), + ok. + +code_change(_OldVsn, {state, DbChannels, ViewChannels, Tab, + EventListener, Waiting}, _Extra) -> + {ok, #state{db_channels=DbChannels, view_channels=ViewChannels, + schema_channels=[], tab=Tab, event_listener = EventListener, + waiting=Waiting}}; +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +% private functions. + +get_channel_status(#channel{name=Name, pid=P}, Acc0) when is_pid(P) -> + try gen_server:call(P, status) of + {ok, Status} -> + [{Name, Status} | Acc0]; + _ -> + Acc0 + catch _:_ -> + Acc0 + end; +get_channel_status(_, Acc0) -> + Acc0. + +start_event_listener() -> + couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]). + +enqueue_request(State, Object) -> + try + case find_channel(State, Object) of + false -> + ok; + {ok, Pid, Priority} -> + smoosh_channel:enqueue(Pid, Object, Priority) + end + catch Class:Exception -> + Stack = erlang:get_stacktrace(), + couch_log:notice("~s: ~p ~p for ~s : ~p", + [?MODULE, Class, Exception, + smoosh_utils:stringify(Object), Stack]) + end. + +find_channel(#state{}=State, {schema, DbName, GroupId}) -> + find_channel(State#state.tab, State#state.schema_channels, {schema, DbName, GroupId}); +find_channel(#state{}=State, {Shard, GroupId}) -> + find_channel(State#state.tab, State#state.view_channels, {Shard, GroupId}); +find_channel(#state{}=State, DbName) -> + find_channel(State#state.tab, State#state.db_channels, DbName). + +find_channel(_Tab, [], _Object) -> + false; +find_channel(Tab, [Channel|Rest], Object) -> + Pid = channel_pid(Tab, Channel), + LastUpdated = smoosh_channel:last_updated(Pid, Object), + StalenessInSec = config:get_integer("smoosh", "staleness", 5) + * ?SECONDS_PER_MINUTE, + Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native), + Now = erlang:monotonic_time(), + case LastUpdated =:= false orelse Now - LastUpdated > Staleness of + true -> + case smoosh_utils:ignore_db(Object) of + true -> + find_channel(Tab, Rest, Object); + _ -> + case get_priority(Channel, Object) of + 0 -> + find_channel(Tab, Rest, Object); + Priority -> + {ok, Pid, Priority} + end + end; + false -> + find_channel(Tab, Rest, Object) + end. + +channel_pid(Tab, Channel) -> + [#channel{pid=Pid}] = ets:lookup(Tab, Channel), + Pid. + +create_missing_channels(State) -> + create_missing_channels(State#state.tab, State#state.db_channels), + create_missing_channels(State#state.tab, State#state.view_channels), + create_missing_channels(State#state.tab, State#state.schema_channels), + State. + +create_missing_channels(_Tab, []) -> + ok; +create_missing_channels(Tab, [Channel|Rest]) -> + case ets:lookup(Tab, Channel) of + [] -> + {ok, Pid} = smoosh_channel:start_link(Channel), + true = ets:insert(Tab, [#channel{name=Channel, pid=Pid}]); + _ -> + ok + end, + create_missing_channels(Tab, Rest). + +get_priority(Channel, {Shard, GroupId}) -> + case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of + {ok, Pid} -> + try + {ok, ViewInfo} = couch_index:get_info(Pid), + {SizeInfo} = couch_util:get_value(sizes, ViewInfo), + DiskSize = couch_util:get_value(file, SizeInfo), + ActiveSize = couch_util:get_value(active, SizeInfo), + NeedsUpgrade = needs_upgrade(ViewInfo), + get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade) + catch + exit:{timeout, _} -> + 0 + end; + {not_found, _Reason} -> + 0; + {error, Reason} -> + couch_log:warning("Failed to get group_pid for ~p ~p ~p: ~p", + [Channel, Shard, GroupId, Reason]), + 0 + end; + +get_priority(Channel, {schema, DbName, DDocId}) -> + case couch_md_index_manager:get_group_pid(DbName, DDocId) of + {ok, Pid} -> + {ok, SchemaInfo} = couch_md_index:get_info(Pid), + DiskSize = couch_util:get_value(disk_size, SchemaInfo), + DataSize = couch_util:get_value(data_size, SchemaInfo), + get_priority(Channel, DiskSize, DataSize, false); + {error, Reason} -> + couch_log:warning("Failed to get group_pid for ~p ~p ~p: ~p", + [Channel, DbName, DDocId, Reason]), + 0 + end; + +get_priority(Channel, DbName) when is_list(DbName) -> + get_priority(Channel, ?l2b(DbName)); +get_priority(Channel, DbName) when is_binary(DbName) -> + {ok, Db} = couch_db:open_int(DbName, []), + try get_priority(Channel, Db) after couch_db:close(Db) end; +get_priority(Channel, Db) -> + {ok, DocInfo} = couch_db:get_db_info(Db), + {SizeInfo} = couch_util:get_value(sizes, DocInfo), + DiskSize = couch_util:get_value(file, SizeInfo), + ActiveSize = couch_util:get_value(active, SizeInfo), + NeedsUpgrade = needs_upgrade(DocInfo), + case db_changed(Channel, DocInfo) of + true -> get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade); + false -> 0 + end. + +get_priority(Channel, DiskSize, DataSize, NeedsUpgrade) -> + Priority = get_priority(Channel), + MinSize = to_number(Channel, "min_size", "1048576"), + MaxSize = to_number(Channel, "max_size", "infinity"), + DefaultMinPriority = case Priority of "slack" -> "16777216"; _ -> "5.0" end, + MinPriority = to_number(Channel, "min_priority", DefaultMinPriority), + MaxPriority = to_number(Channel, "max_priority", "infinity"), + if Priority =:= "upgrade", NeedsUpgrade -> + 1; + DiskSize =< MinSize -> + 0; + DiskSize > MaxSize -> + 0; + DataSize =:= 0 -> + MinPriority; + Priority =:= "ratio", DiskSize/DataSize =< MinPriority -> + 0; + Priority =:= "ratio", DiskSize/DataSize > MaxPriority -> + 0; + Priority =:= "ratio" -> + DiskSize/DataSize; + Priority =:= "slack", DiskSize-DataSize =< MinPriority -> + 0; + Priority =:= "slack", DiskSize-DataSize > MaxPriority -> + 0; + Priority =:= "slack" -> + DiskSize-DataSize; + true -> + 0 + end. + +db_changed(Channel, Info) -> + case couch_util:get_value(compacted_seq, Info) of + undefined -> + true; + CompactedSeq -> + MinChanges = list_to_integer( + smoosh_utils:get(Channel, "min_changes", "0")), + UpdateSeq = couch_util:get_value(update_seq, Info), + UpdateSeq - CompactedSeq >= MinChanges + end. + +to_number(Channel, Name, Default) -> + case smoosh_utils:get(Channel, Name, Default) of + "infinity" -> infinity; + Value -> + try + list_to_float(Value) + catch error:badarg -> + list_to_integer(Value) + end + end. + +get_priority("ratio_dbs") -> + "ratio"; +get_priority("ratio_views") -> + "ratio"; +get_priority("ratio_schemas") -> + "ratio"; +get_priority("slack_dbs") -> + "slack"; +get_priority("slack_views") -> + "slack"; +get_priority("slack_schemas") -> + "slack"; +get_priority("upgrade_dbs") -> + "upgrade"; +get_priority("upgrade_views") -> + "upgrade"; +get_priority(Channel) -> + smoosh_utils:get(Channel, "priority", "ratio"). + +needs_upgrade(Props) -> + DiskVersion = couch_util:get_value(disk_format_version, Props), + case couch_util:get_value(engine, Props) of + couch_bt_engine -> + (couch_bt_engine_header:latest(DiskVersion) =:= false); + _ -> + false + end. + + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + + +setup() -> + meck:new([config, couch_index, couch_index_server], [passthrough]), + Pid = list_to_pid("<0.0.0>"), + meck:expect(couch_index_server, get_index, 3, {ok, Pid}), + meck:expect(config, get, fun(_, _, Default) -> Default end), + Shard = <<"shards/00000000-1fffffff/test.1529510412">>, + GroupId = <<"_design/ddoc">>, + {ok, Shard, GroupId}. + + +teardown(_) -> + meck:unload(). + +config_change_test_() -> + { + "Test config updates", + { + foreach, + fun() -> test_util:start_couch([smoosh]) end, + fun test_util:stop_couch/1, + [ + fun t_restart_config_listener/1 + ] + } +}. + +get_priority_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun t_ratio_view/1, + fun t_slack_view/1, + fun t_no_data_view/1, + fun t_below_min_priority_view/1, + fun t_below_min_size_view/1, + fun t_timeout_view/1, + fun t_missing_view/1, + fun t_invalid_view/1 + ] +}. + +t_restart_config_listener(_) -> + ?_test(begin + ConfigMonitor = config_listener_mon(), + ?assert(is_process_alive(ConfigMonitor)), + test_util:stop_sync(ConfigMonitor), + ?assertNot(is_process_alive(ConfigMonitor)), + NewConfigMonitor = test_util:wait(fun() -> + case config_listener_mon() of + undefined -> wait; + Pid -> Pid + end + end), + ?assert(is_process_alive(NewConfigMonitor)) + end). + +t_ratio_view({ok, Shard, GroupId}) -> + ?_test(begin + meck:expect(couch_index, get_info, fun(_) -> + {ok, [{sizes, {[{file, 5242880}, {active, 524288}]}}]} + end), + ?assertEqual(10.0, get_priority("ratio_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})) + end). + +t_slack_view({ok, Shard, GroupId}) -> + ?_test(begin + meck:expect(couch_index, get_info, fun(_) -> + {ok, [{sizes, {[{file, 33554432}, {active, 16777215}]}}]} + end), + ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), + ?assertEqual(16777217, get_priority("slack_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})) + end). + +t_no_data_view({ok, Shard, GroupId}) -> + ?_test(begin + meck:expect(couch_index, get_info, fun(_) -> + {ok, [{sizes, {[{file, 5242880}, {active, 0}]}}]} + end), + ?assertEqual(5.0, get_priority("ratio_views", {Shard, GroupId})), + ?assertEqual(16777216, get_priority("slack_views", {Shard, GroupId})), + ?assertEqual(5.0, get_priority("upgrade_views", {Shard, GroupId})) + end). + +t_below_min_priority_view({ok, Shard, GroupId}) -> + ?_test(begin + meck:expect(couch_index, get_info, fun(_) -> + {ok, [{sizes, {[{file, 5242880}, {active, 1048576}]}}]} + end), + ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})) + end). + +t_below_min_size_view({ok, Shard, GroupId}) -> + ?_test(begin + meck:expect(couch_index, get_info, fun(_) -> + {ok, [{sizes, {[{file, 1048576}, {active, 512000}]}}]} + end), + ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})) + end). + +t_timeout_view({ok, Shard, GroupId}) -> + ?_test(begin + meck:expect(couch_index, get_info, fun(_) -> + exit({timeout, get_info}) + end), + ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})) + end). + +t_missing_view({ok, Shard, GroupId}) -> + ?_test(begin + meck:expect(couch_index_server, get_index, 3, {not_found, missing}), + ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})) + end). + +t_invalid_view({ok, Shard, GroupId}) -> + ?_test(begin + meck:expect(couch_index_server, get_index, 3, {error, undef}), + ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), + ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})) + end). + +config_listener_mon() -> + IsConfigMonitor = fun(P) -> + [M | _] = string:tokens(couch_debug:process_name(P), ":"), + M =:= "config_listener_mon" + end, + [{_, MonitoredBy}] = process_info(whereis(?MODULE), [monitored_by]), + case lists:filter(IsConfigMonitor, MonitoredBy) of + [Pid] -> Pid; + [] -> undefined + end. + +-endif. diff --git a/src/smoosh/src/smoosh_sup.erl b/src/smoosh/src/smoosh_sup.erl new file mode 100644 index 000000000..158498cd5 --- /dev/null +++ b/src/smoosh/src/smoosh_sup.erl @@ -0,0 +1,38 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(smoosh_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, [?CHILD(smoosh_server, worker)]} }. diff --git a/src/smoosh/src/smoosh_utils.erl b/src/smoosh/src/smoosh_utils.erl new file mode 100644 index 000000000..b433de033 --- /dev/null +++ b/src/smoosh/src/smoosh_utils.erl @@ -0,0 +1,92 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(smoosh_utils). +-include_lib("couch/include/couch_db.hrl"). + +-export([get/2, get/3, group_pid/1, split/1, stringify/1, ignore_db/1]). +-export([ + in_allowed_window/1 +]). + +group_pid({Shard, GroupId}) -> + case couch_view_group:open_db_group(Shard, GroupId) of + {ok, Group} -> + try + gen_server:call(couch_view, {get_group_server, Shard, Group}) + catch _:Error -> + {error, Error} + end; + Else -> + Else + end. + +get(Channel, Key) -> + ?MODULE:get(Channel, Key, undefined). + +get(Channel, Key, Default) -> + config:get("smoosh." ++ Channel, Key, Default). + +split(CSV) -> + re:split(CSV, "\\s*,\\s*", [{return,list}, trim]). + +stringify({DbName, GroupId}) -> + io_lib:format("~s ~s", [DbName, GroupId]); +stringify({schema, DbName, GroupId}) -> + io_lib:format("schema: ~s ~s", [DbName, GroupId]); +stringify(DbName) -> + io_lib:format("~s", [DbName]). + +ignore_db({DbName, _GroupName}) -> + ignore_db(DbName); +ignore_db(DbName) when is_binary(DbName)-> + ignore_db(?b2l(DbName)); +ignore_db(DbName) when is_list(DbName) -> + case config:get("smoosh.ignore", DbName, false) of + "true" -> + true; + _ -> + false + end; +ignore_db(Db) -> + ignore_db(couch_db:name(Db)). + +in_allowed_window(Channel) -> + From = parse_time(get(Channel, "from"), {00, 00}), + To = parse_time(get(Channel, "to"), {24, 00}), + in_allowed_window(From, To). + +in_allowed_window(From, To) -> + {HH, MM, _} = erlang:time(), + case From < To of + true -> + ({HH, MM} >= From) andalso ({HH, MM} < To); + false -> + ({HH, MM} >= From) orelse ({HH, MM} < To) + end. + + +parse_time(undefined, Default) -> + Default; +parse_time(String, Default) -> + case string:tokens(String, ":") of + [HH, MM] -> + try + {list_to_integer(HH), list_to_integer(MM)} + catch error:badarg -> + couch_log:error("Malformed compaction schedule configuration: ~s", [String]), + Default + end; + _Else -> + couch_log:error("Malformed compaction schedule configuration: ~s", [String]), + Default + end. diff --git a/src/smoosh/test/exunit/scheduling_window_test.exs b/src/smoosh/test/exunit/scheduling_window_test.exs new file mode 100644 index 000000000..7fa6c23a7 --- /dev/null +++ b/src/smoosh/test/exunit/scheduling_window_test.exs @@ -0,0 +1,81 @@ +defmodule SmooshSchedulingWindowTest do + use Couch.Test.ExUnit.Case + + alias Couch.Test.Setup + + setup_all(context) do + test_ctx = :test_util.start_couch([]) + + on_exit(fn -> + :config.delete('smoosh.test_channel', 'from') + :config.delete('smoosh.test_channel', 'to') + :test_util.stop_couch(test_ctx) + end) + + context + end + + test "in_allowed_window returns true by default", _context do + assert :smoosh_utils.in_allowed_window('nonexistent_channel') == true + end + + test "in_allowed_window ignores bad input", _context do + :config.set('smoosh.test_channel', 'from', 'midnight', false) + :config.set('smoosh.test_channel', 'to', 'infinity', false) + assert :smoosh_utils.in_allowed_window('test_channel') == true + end + + test "in_allowed_window returns false when now < from < to", _context do + now = DateTime.utc_now() + from = DateTime.add(now, 18_000) + to = DateTime.add(now, 36_000) + :config.set('smoosh.test_channel', 'from', '#{from.hour}:#{from.minute}', false) + :config.set('smoosh.test_channel', 'to', '#{to.hour}:#{to.minute}', false) + assert :smoosh_utils.in_allowed_window('test_channel') == false + end + + test "in_allowed_window returns true when from < now < to", _context do + now = DateTime.utc_now() + from = DateTime.add(now, -18_000) + to = DateTime.add(now, 18_000) + :config.set('smoosh.test_channel', 'from', '#{from.hour}:#{from.minute}', false) + :config.set('smoosh.test_channel', 'to', '#{to.hour}:#{to.minute}', false) + assert :smoosh_utils.in_allowed_window('test_channel') == true + end + + test "in_allowed_window returns false when from < to < now", _context do + now = DateTime.utc_now() + from = DateTime.add(now, -36_000) + to = DateTime.add(now, -18_000) + :config.set('smoosh.test_channel', 'from', '#{from.hour}:#{from.minute}', false) + :config.set('smoosh.test_channel', 'to', '#{to.hour}:#{to.minute}', false) + assert :smoosh_utils.in_allowed_window('test_channel') == false + end + + test "in_allowed_window returns true when to < from < now", _context do + now = DateTime.utc_now() + from = DateTime.add(now, -18_000) + to = DateTime.add(now, -36_000) + :config.set('smoosh.test_channel', 'from', '#{from.hour}:#{from.minute}', false) + :config.set('smoosh.test_channel', 'to', '#{to.hour}:#{to.minute}', false) + assert :smoosh_utils.in_allowed_window('test_channel') == true + end + + test "in_allowed_window returns false when to < now < from", _context do + now = DateTime.utc_now() + from = DateTime.add(now, 18_000) + to = DateTime.add(now, -18_000) + :config.set('smoosh.test_channel', 'from', '#{from.hour}:#{from.minute}', false) + :config.set('smoosh.test_channel', 'to', '#{to.hour}:#{to.minute}', false) + assert :smoosh_utils.in_allowed_window('test_channel') == false + end + + test "in_allowed_window returns true when now < to < from", _context do + now = DateTime.utc_now() + from = DateTime.add(now, 36_000) + to = DateTime.add(now, 18_000) + :config.set('smoosh.test_channel', 'from', '#{from.hour}:#{from.minute}', false) + :config.set('smoosh.test_channel', 'to', '#{to.hour}:#{to.minute}', false) + assert :smoosh_utils.in_allowed_window('test_channel') == true + end +end diff --git a/src/smoosh/test/exunit/test_helper.exs b/src/smoosh/test/exunit/test_helper.exs new file mode 100644 index 000000000..314050085 --- /dev/null +++ b/src/smoosh/test/exunit/test_helper.exs @@ -0,0 +1,2 @@ +ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter]) +ExUnit.start() |