summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikhil Benesch <nikhil.benesch@gmail.com>2014-03-09 22:58:59 -0400
committerClaire McQuin <claire@getchef.com>2014-05-13 15:21:16 -0700
commitc9426f9a0fd8e7943fd51b032999bde080c07af7 (patch)
treeaa4e53053adf300faa92430560454663428a2bb9
parentd465f8ca9937e871caa409988f5909b2b0f0f843 (diff)
downloadchef-c9426f9a0fd8e7943fd51b032999bde080c07af7.tar.gz
CHEF-4423: implement threaded cookbook synchronization
This commit parallelizes cookbook synchronization (downloads) with the threaded worker queue model used during cookbook upload.
-rw-r--r--lib/chef/cookbook/synchronizer.rb106
1 files changed, 64 insertions, 42 deletions
diff --git a/lib/chef/cookbook/synchronizer.rb b/lib/chef/cookbook/synchronizer.rb
index fc5d16617c..c855a4f708 100644
--- a/lib/chef/cookbook/synchronizer.rb
+++ b/lib/chef/cookbook/synchronizer.rb
@@ -1,4 +1,5 @@
require 'chef/client'
+require 'chef/util/threaded_job_queue'
require 'singleton'
class Chef
@@ -56,6 +57,8 @@ class Chef
# Synchronizes the locally cached copies of cookbooks with the files on the
# server.
class CookbookSynchronizer
+ CookbookFile = Struct.new(:cookbook, :segment, :manifest_record)
+
def initialize(cookbooks_by_name, events)
@eager_segments = Chef::CookbookVersion::COOKBOOK_SEGMENTS.dup
unless Chef::Config[:no_lazy_load]
@@ -87,6 +90,38 @@ class Chef
@cookbooks_by_name.key?(cookbook_name)
end
+ def files
+ @files ||= cookbooks.inject([]) do |memo, cookbook|
+ @eager_segments.each do |segment|
+ cookbook.manifest[segment].each do |manifest_record|
+ memo << CookbookFile.new(cookbook, segment, manifest_record)
+ end
+ end
+ memo
+ end
+ end
+
+ def files_by_cookbook
+ files.group_by { |file| file.cookbook }
+ end
+
+ def files_remaining_by_cookbook
+ @files_remaining_by_cookbook ||= begin
+ files_by_cookbook.inject({}) do |memo, (cookbook, files)|
+ memo[cookbook] = files.size
+ memo
+ end
+ end
+ end
+
+ def mark_file_synced(file)
+ files_remaining_by_cookbook[file.cookbook] -= 1
+
+ if files_remaining_by_cookbook[file.cookbook] == 0
+ @events.synchronized_cookbook(file.cookbook.name)
+ end
+ end
+
# Synchronizes all the cookbooks from the chef-server.
#)
# === Returns
@@ -97,14 +132,19 @@ class Chef
clear_obsoleted_cookbooks
- @events.cookbook_sync_start(cookbook_count)
+ queue = Chef::Util::ThreadedJobQueue.new
- # Synchronize each of the node's cookbooks, and add to the
- # valid_cache_entries hash.
- cookbooks.each do |cookbook|
- sync_cookbook(cookbook)
+ files.each do |file|
+ queue << lambda do |lock|
+ sync_file(file)
+ lock.synchronize { mark_file_synced(file) }
+ end
end
+ @events.cookbook_sync_start(cookbook_count)
+ queue.process(20)
+ update_cookbook_filenames
+
rescue Exception => e
@events.cookbook_sync_failed(cookbooks, e)
raise
@@ -129,61 +169,43 @@ class Chef
@events.cookbook_clean_complete
end
- # Sync the eagerly loaded files contained by +cookbook+
- #
- # === Arguments
- # cookbook<Chef::Cookbook>:: The cookbook to update
- # valid_cache_entries<Hash>:: Out-param; Added to this hash are the files that
- # were referred to by this cookbook
- def sync_cookbook(cookbook)
- Chef::Log.debug("Synchronizing cookbook #{cookbook.name} #{cookbook.version}")
-
- # files and templates are lazily loaded, and will be done later.
-
- @eager_segments.each do |segment|
- segment_filenames = Array.new
- cookbook.manifest[segment].each do |manifest_record|
-
- cache_filename = sync_file_in_cookbook(cookbook, manifest_record)
- # make the segment filenames a full path.
- full_path_cache_filename = cache.load(cache_filename, false)
- segment_filenames << full_path_cache_filename
- end
+ def update_cookbook_filenames
+ files_by_cookbook.each do |cookbook, cookbook_files|
+ files_by_segment = cookbook_files.group_by { |file| file.segment }
+ @eager_segments.each do |segment|
+ segment_files = files_by_segment[segment]
+ next unless segment_files
- # replace segment filenames with a full-path one.
- if segment.to_sym == :recipes
- cookbook.recipe_filenames = segment_filenames
- elsif segment.to_sym == :attributes
- cookbook.attribute_filenames = segment_filenames
- else
- cookbook.segment_filenames(segment).replace(segment_filenames)
+ filenames = segment_files.map { |file| file.manifest_record['path'] }
+ cookbook.replace_segment_filenames(segment, filenames)
end
end
- @events.synchronized_cookbook(cookbook.name)
end
# Sync an individual file if needed. If there is an up to date copy
- # locally, nothing is done.
+ # locally, nothing is done. Updates +file+'s manifest with the full path to
+ # the cached file.
#
# === Arguments
- # file_manifest::: A Hash of the form {"path" => 'relative/path', "url" => "location to fetch the file"}
+ # file<CookbookFile>
# === Returns
- # Path to the cached file as a String
- def sync_file_in_cookbook(cookbook, file_manifest)
- cache_filename = File.join("cookbooks", cookbook.name, file_manifest['path'])
+ # Full path to the cached file as a String
+ def sync_file(file)
+ cache_filename = File.join("cookbooks", file.cookbook.name, file.manifest_record['path'])
mark_cached_file_valid(cache_filename)
# If the checksums are different between on-disk (current) and on-server
# (remote, per manifest), do the update. This will also execute if there
# is no current checksum.
- if !cached_copy_up_to_date?(cache_filename, file_manifest['checksum'])
- download_file(file_manifest['url'], cache_filename)
- @events.updated_cookbook_file(cookbook.name, cache_filename)
+ if !cached_copy_up_to_date?(cache_filename, file.manifest_record['checksum'])
+ download_file(file.manifest_record['url'], cache_filename)
+ @events.updated_cookbook_file(file.cookbook.name, cache_filename)
else
Chef::Log.debug("Not storing #{cache_filename}, as the cache is up to date.")
end
- cache_filename
+ # Update the manifest with the full path to the cached file
+ file.manifest_record['path'] = cache.load(cache_filename, false)
end
def cached_copy_up_to_date?(local_path, expected_checksum)