summaryrefslogtreecommitdiff
path: root/chromium/sync/internal_api/public/engine/model_safe_worker.cc
blob: 7179ed52b69a60a186eb1da95dd25d9e5a3e31cc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "sync/internal_api/public/engine/model_safe_worker.h"

#include "base/bind.h"
#include "base/json/json_writer.h"
#include "base/memory/scoped_ptr.h"
#include "base/values.h"

namespace syncer {

base::DictionaryValue* ModelSafeRoutingInfoToValue(
    const ModelSafeRoutingInfo& routing_info) {
  base::DictionaryValue* dict = new base::DictionaryValue();
  for (ModelSafeRoutingInfo::const_iterator it = routing_info.begin();
       it != routing_info.end(); ++it) {
    dict->SetString(ModelTypeToString(it->first),
                    ModelSafeGroupToString(it->second));
  }
  return dict;
}

std::string ModelSafeRoutingInfoToString(
    const ModelSafeRoutingInfo& routing_info) {
  scoped_ptr<base::DictionaryValue> dict(
      ModelSafeRoutingInfoToValue(routing_info));
  std::string json;
  base::JSONWriter::Write(dict.get(), &json);
  return json;
}

ModelTypeInvalidationMap ModelSafeRoutingInfoToInvalidationMap(
    const ModelSafeRoutingInfo& routes,
    const std::string& payload) {
  ModelTypeInvalidationMap invalidation_map;
  for (ModelSafeRoutingInfo::const_iterator i = routes.begin();
       i != routes.end(); ++i) {
    invalidation_map[i->first].payload = payload;
  }
  return invalidation_map;
}

ModelTypeSet GetRoutingInfoTypes(const ModelSafeRoutingInfo& routing_info) {
  ModelTypeSet types;
  for (ModelSafeRoutingInfo::const_iterator it = routing_info.begin();
       it != routing_info.end(); ++it) {
    types.Put(it->first);
  }
  return types;
}

ModelSafeGroup GetGroupForModelType(const ModelType type,
                                    const ModelSafeRoutingInfo& routes) {
  ModelSafeRoutingInfo::const_iterator it = routes.find(type);
  if (it == routes.end()) {
    if (type != UNSPECIFIED && type != TOP_LEVEL_FOLDER)
      DVLOG(1) << "Entry does not belong to active ModelSafeGroup!";
    return GROUP_PASSIVE;
  }
  return it->second;
}

std::string ModelSafeGroupToString(ModelSafeGroup group) {
  switch (group) {
    case GROUP_UI:
      return "GROUP_UI";
    case GROUP_DB:
      return "GROUP_DB";
    case GROUP_FILE:
      return "GROUP_FILE";
    case GROUP_HISTORY:
      return "GROUP_HISTORY";
    case GROUP_PASSIVE:
      return "GROUP_PASSIVE";
    case GROUP_PASSWORD:
      return "GROUP_PASSWORD";
    default:
      NOTREACHED();
      return "INVALID";
  }
}

ModelSafeWorker::ModelSafeWorker(WorkerLoopDestructionObserver* observer)
    : stopped_(false),
      work_done_or_stopped_(false, false),
      observer_(observer),
      working_loop_(NULL),
      working_loop_set_wait_(true, false) {}

ModelSafeWorker::~ModelSafeWorker() {}

void ModelSafeWorker::RequestStop() {
  base::AutoLock al(stopped_lock_);

  // Set stop flag but don't signal work_done_or_stopped_ to unblock sync loop
  // because the worker may be working and depending on sync command object
  // living on sync thread. his prevents any *further* tasks from being posted
  // to worker threads (see DoWorkAndWaitUntilDone below), but note that one
  // may already be posted.
  stopped_ = true;
}

SyncerError ModelSafeWorker::DoWorkAndWaitUntilDone(const WorkCallback& work) {
  {
    base::AutoLock al(stopped_lock_);
    if (stopped_)
      return CANNOT_DO_WORK;

    CHECK(!work_done_or_stopped_.IsSignaled());
  }

  return DoWorkAndWaitUntilDoneImpl(work);
}

bool ModelSafeWorker::IsStopped() {
  base::AutoLock al(stopped_lock_);
  return stopped_;
}

void ModelSafeWorker::WillDestroyCurrentMessageLoop() {
  {
    base::AutoLock al(stopped_lock_);
    stopped_ = true;

    // Must signal to unblock syncer if it's waiting for a posted task to
    // finish. At this point, all pending tasks posted to the loop have been
    // destroyed (see MessageLoop::~MessageLoop). So syncer will be blocked
    // indefinitely without signaling here.
    work_done_or_stopped_.Signal();

    DVLOG(1) << ModelSafeGroupToString(GetModelSafeGroup())
        << " worker stops on destruction of its working thread.";
  }

  if (observer_)
    observer_->OnWorkerLoopDestroyed(GetModelSafeGroup());
}

void ModelSafeWorker::SetWorkingLoopToCurrent() {
  DCHECK(!working_loop_);
  working_loop_ = base::MessageLoop::current();
  working_loop_set_wait_.Signal();
}

void ModelSafeWorker::UnregisterForLoopDestruction(
    base::Callback<void(ModelSafeGroup)> unregister_done_callback) {
  // Ok to wait until |working_loop_| is set because this is called on sync
  // loop.
  working_loop_set_wait_.Wait();

  // Should be called on sync loop.
  DCHECK_NE(base::MessageLoop::current(), working_loop_);
  DCHECK(working_loop_);
  working_loop_->PostTask(
      FROM_HERE,
      base::Bind(&ModelSafeWorker::UnregisterForLoopDestructionAsync,
                 this, unregister_done_callback));
}

void ModelSafeWorker::UnregisterForLoopDestructionAsync(
    base::Callback<void(ModelSafeGroup)> unregister_done_callback) {
  DCHECK(stopped_);
  DCHECK_EQ(base::MessageLoop::current(), working_loop_);
  base::MessageLoop::current()->RemoveDestructionObserver(this);
  unregister_done_callback.Run(GetModelSafeGroup());
}

}  // namespace syncer