#! /usr/bin/env ruby require 'open3' require 'optparse' require 'ostruct' require 'time' $options = OpenStruct.new $options.test_only = false $options.train_only = false $options.cores = 8 $options.min_csi = 1 #$options.data_points_cache = 'data_points_csi.cache' $options.data_points_cache = 'data_points3.cache' $options.mirrors = 1 $options.map_dir = "mapping" $random = false $log = false $size = false op = OptionParser.new do |opts| opts.banner = """Usage ./arrival_rate_sweep.rb [options] ...""" opts.on("-m", "--map_dir DIR", "Directory containing mapping dirs.") do |q| $options.map_dir = q end # only applicable for load runs; where -o == nil opts.on("-t", "--test_only", "Skip the first slice when generating data.") do |q| $options.test_only = q end # only applicable for load runs; where -o == nil opts.on("-r", "--train_only", "Only use the first slice") do |q| $options.train_only = q end # if this is present, saves query time information opts.on("-o", "--outdir DIR", "Query time output dir") do |q| $options.outdir = q end opts.on("-c", "--cores CORES", Integer, "Number of cores each machines should have") do |q| $options.cores = q end opts.on("-d", "--datapoints POINTS", Array, "Override for data points to use") do |q| $options.datapoints = q.map{|x| x.to_f} end opts.on("-s", "--min_csi NUM", Integer, "Number of central machines") do |s| $options.min_csi = s end opts.on("-x", "--pattern PATTERN", "Pattern matching for mapping files") do |s| $options.pattern = s.split(":") end opts.on("-z", "--mirrors NUM", Integer, "Number of mirrors for this system") do |s| $options.mirrors = s end opts.on("-a", "--allocations METHOD_STR", "Shard allocation methods to try.") do |a| if a =~ /r/ $random = true end if a =~ /l/ $log = true end if a =~ /s/ $size = true end end end op.parse! $is_load_run = $options.outdir == nil $ts = Time.now.iso8601 class Float def sigfig(digits) f = sprintf("%.#{digits}g", self).to_f return f end end # creates an array of arrival rates to sample from a starting arrival rate # point def get_data_points(start_point) # determine scale of steps step_size = 10**(Math.log10(start_point).floor).to_f if start_point/step_size < 5 step_size /= 2 if start_point/step_size < 6 step_size /= 2 end end num_steps = (start_point / step_size).floor curr_point = 0.01 arrivals = [curr_point] for i in 1..num_steps # if step size is 0.01 or smaller prevent 0.01 from being added if i*step_size <= 0.01 next end arrivals << i*step_size # if we don't have a lot points, add half steps at the tail end # so we can see the elbow point more clearly if num_steps < 10 && i > num_steps - 2 && (i+0.5)*step_size <= start_point arrivals << (i+0.5)*step_size end end arrivals = arrivals.map{|x| x.to_f.sigfig(2)} # add starting point if it hasn't been added already if start_point > arrivals[-1] arrivals << start_point end arrivals = arrivals.reverse half_size = arrivals[0]-arrivals[1] return arrivals,half_size.sigfig(2) ## decide on data points to plot #arrival = start_point #arrivals = [] #for x in 0...5 # # do first 5 points close to the edge # arrivals << arrival*(1-x*0.1) #end ## spread rest of the 5 points between 0.01 and latest arrival #if arrivals[-1] > 0.01 # remaining_intervals = (arrivals[-1] - 0.01)/5 # 4.times do # arrivals << arrivals[-1] - remaining_intervals # end # arrivals << 0.01 #end #arrivals = arrivals.map{|x| x.sigfig(2)} #return arrivals end def run_instance(num_machines, num_csi, arrival_rate, mapping, monitorLoad, queryListFile, is_exhaustive) $template = """numThreads=%s machineIds=%s numSelectMachines=%d mirrors=%d queryArrivalRate=%f shardMappingFile=%s queryListFile=%s monitorLoad=%s exhaustiveSearch=%s """ # parameters for java file #numThreads = ([$options.cores]*num_machines).join(",") #TODO fix this up #numThreads = ([8]*num_machines).join(",") numThreads = ([$options.cores]*num_machines).join(",") machineIds = (0...num_machines).to_a.join(",") numSelectMachines = num_csi shardMappingFile = mapping File.open("/tmp/sweep-#{$ts}.param", "w+") do |f| f.puts $template % [numThreads, machineIds, numSelectMachines, $options.mirrors, arrival_rate, shardMappingFile, queryListFile, monitorLoad.to_s, is_exhaustive.to_s] end cmd = "java -Xmx2G -cp ../loadsim/dist/loadsim.jar yubink.loadsim.SearchLoadModel /tmp/sweep-#{$ts}.param sweep" output = nil erroutput = nil Open3.popen3(cmd) do |stdin, stdout, stderr, wait_thr| output = stdout.read.split("\n") erroutput = stderr.read end return output, erroutput end def run(simdata, mapping) $min_csi = $options.min_csi num_csi = 0 num_machines = 0 mirrors = $options.mirrors is_exhaustive = false # if exhaustive search, set CSI to like 1 or something if simdata =~ /exh/ is_exhaustive = true num_csi = 1 md = mapping.match(/exh_(\d+)\.mapping/) num_machines = md[1].to_i num_csi = $min_csi else # otherwise, set from the mapping file name md = mapping.match(/_([\d,x]*)\.mapping/) shardNumStr = md[1].split(",") # parse the filename of mapping file into cores core_array = [] for part in shardNumStr trysplit = part.split("x") if trysplit.length > 1 core_array += [trysplit[0].to_i]*trysplit[1].to_i else core_array << trysplit[0].to_i end end num_machines = core_array.size # figure out how many machines require resource selection dbs # based on proportion for core in core_array #if core < $options.cores if core < 10 num_csi += 1 end end end if num_csi < $min_csi num_csi = $min_csi end # retrieve data points to run from cache file_pair = File.basename(simdata) + " " + File.basename(mapping) start_point = $data_points_cache[file_pair] puts file_pair # if there is no data start point cached, get an estimate if start_point == nil if simdata =~ /exh/ && simdata =~ /gov2/ if simdata =~ /mqt/ start_point = 0.8 else start_point = 3.2 end elsif simdata =~ /exh/ && simdata =~ /cw/ if simdata =~ /mqt/ start_point = 0.08 else start_point = 0.05 end elsif simdata =~ /cw/ && simdata =~ /mqt/ start_point = 4 if simdata =~ /taily/ start_point = 7 end elsif simdata =~ /cw/ && (simdata =~ /aol/ || simdata =~ /1month/ || simdata =~ /1week/) start_point = 2.5 if simdata =~ /taily/ start_point = 2 end elsif simdata =~ /gov2/ && simdata =~ /mqt/ start_point = 2 if simdata =~ /taily/ start_point = 3 end elsif simdata =~ /gov2/ && (simdata =~ /aol/ || simdata =~ /1month/ || simdata =~ /1week/) start_point = 8 end start_point *= num_machines/2 start_point *= $options.cores/8 end start_point *= mirrors if File.directory?(simdata) # if the given simdata is a dir, then do the parts parts = Dir.glob(File.join(simdata, "*.part")).sort # if test only, means to skip the first 1000 queries because they were # training queries; if train_only, use only the 1000, which are training # queries if $options.test_only parts = parts[1..-1] elsif $options.train_only parts = [parts[0]] end else # if it's an ordinary file, run experiment on whole thing parts = [simdata] end # keep track of base latency base_latency = 0 columns = [] arrivals = [] parts.each_with_index do |queryListFile, parts_idx| puts File.basename(queryListFile) points_gathered = 0 # make sure we hit the abort limit at least once so that the real boundary is found # for the first part abort_limit_found = true # only freshly generate arrivals points for the first part; reuse this for # all other parts if parts_idx == 0 output, erroutput = run_instance(num_machines, num_csi, 0.01, mapping, false, queryListFile, is_exhaustive) if !erroutput.empty? puts "java_error" $stderr.puts erroutput exit(1) end #print 0.01, "\t" #puts output[-1] # determine latency at near-unloaded state base_latency = output[-1].split[0].to_f puts "base_latency\t#{base_latency}" arrivals, spacing = get_data_points(start_point.to_f.sigfig(2)) abort_limit_found = false if $options.datapoints != nil abort_limit_found = true arrivals = $options.datapoints end end last_valid_arrival = nil while points_gathered < arrivals.size arrival = arrivals[points_gathered] #monitorLoad = (parts_idx == 0) && ($options.datapoints == nil) monitorLoad = false output, erroutput = run_instance(num_machines, num_csi, arrival, mapping, monitorLoad, queryListFile, is_exhaustive) curr_latency = output[-1].split[0].to_f # query arrival rate; per second print arrival.to_s + "\t" if erroutput =~ /Aborted/ if erroutput =~ /queue overloaded/ puts erroutput.split("\n")[0] elsif erroutput =~ /Machine (.*) search/ puts "search_overload_#{$1}" elsif erroutput =~ /Machine (.*) merge/ puts "merge_overload_#{$2}" end if $options.datapoints == nil # if this data pooint was aborted, try a new point arrivals, spacing = get_data_points((arrival-spacing).sigfig(2)) abort_limit_found = true else points_gathered += 1 end next elsif !erroutput.empty? puts "java_error" $stderr.puts erroutput exit(1) elsif parts_idx == 0 && $options.datapoints == nil && curr_latency > base_latency*2 puts "latency too high #{curr_latency}" # if this data pooint was aborted, try a new point if last_valid_arrival != nil arrivals, spacing = get_data_points(last_valid_arrival) else arrivals, spacing = get_data_points((arrival-spacing).sigfig(2)) end abort_limit_found = true next else # if we haven't found the upper boundary yet, try finding it first! if !abort_limit_found && points_gathered == 0 last_valid_arrival = arrival arrivals, spacing = get_data_points((arrival+spacing).sigfig(2)) puts "finding limit..." next end if $options.datapoints == nil && points_gathered == 0 # update the persistent data points cache $data_points_cache[file_pair] = arrival/mirrors # update variable for future parts files start_point = arrival end puts output[-1] points_gathered += 1 end # if we're saving the query times, save! if $options.outdir != nil column = [arrival] printing = false for line in output if line.strip == "Query times" printing = true next elsif line.strip.length == 0 break end if printing column << line.strip end end columns << column end end end # dump the update data points cache File.open($options.data_points_cache, "w+") do |f| for key, val in $data_points_cache f.puts "#{key}\t#{val}" end end if $options.outdir != nil # print the collected query time information base = File.basename(simdata, ".simdata") mapbase = File.basename(mapping, ".mapping") mirror_suffix = "" if mirrors > 1 mirror_suffix = mirrors.to_s + "." end outfile = base+"_"+mapbase+"_#{$options.cores}" if $options.min_csi > 1 outfile += "_#{$options.min_csi}csi" end if $options.mirrors > 1 outfile += "_#{$options.mirrors}mirror" end outfile = File.join($options.outdir, outfile+".qtimes") File.open(outfile, "w+") do |f| # check to see if all columns are even; if not, pad them max_cols = columns.reduce(0) {|max_size, curr| [max_size, curr.size].max} for col in columns # if a column is shorter, pad it with the mean of the column if col.size < max_cols avg = 0 for x in col avg += x.to_f end avg /= col.size (max_cols-col.size).times do |x| col << avg.to_s end end end # transpose data so each inner array is a row instead of a column out = columns.transpose for rows in out f.puts rows.join("\t") end end end puts end def get_mappings(simdata) # for random files in CW; taily also applies to gov2 though optimal_map = { "gov2mqt" => ["9,10x3"], "gov2aol" => ["9,10x3"], "cw-splitmqt" => ["4,10x3"], "cw-splitaol" => ["2,10x3"], "aol" => ["0,0,0,8", "0,4"], "1month" => [], "1week" => [], "mqt" => ["0,0,2,8", "0,5"], "cw" => ["5,10"], "gov2" => ["5,10", "0,10", "0,5"], } # strip file format bits and parse out info from file name dataset, queryset, alg, mach = File.basename(simdata).sub(/\..*$/, '').split("_") settings = [] cores = $options.cores if $options.pattern != nil settings = $options.pattern end files = [] if alg == "exh" files << File.join($options.map_dir, "exh_#{mach}.mapping") return files end # not exhaustive if settings.size == 0 settings = ['*'] end # month/week query sets don't require random assignment runs or size runs if queryset == "aol" || queryset == "mqt" if $random if alg == "ranks" # ranks random everything because of that CSI optimization graph for setting in settings #for setting in optimal_map[dataset+queryset] files += Dir.glob(File.join($options.map_dir, dataset+"_random_#{setting}.mapping")).sort end elsif alg == "taily" # taily only has optimal random for setting in settings #for setting in optimal_map[dataset+queryset] files += Dir.glob(File.join($options.map_dir, dataset+"_random_#{setting}.mapping")).sort end end end if $size if alg == "taily" # only do size files for aol/mqt; the 1week/month things don't need it if queryset == "aol" || queryset == "mqt" #for setting in optimal_map[dataset+queryset] for setting in settings files += Dir.glob(File.join($options.map_dir, dataset+"_size_#{setting}.mapping")).sort end end elsif alg == "ranks" # if ranks, then look up optimial values for the size mappings #for setting in optimal_map[dataset+queryset] for setting in settings files += Dir.glob(File.join($options.map_dir, dataset+"_size_#{setting}.mapping")).sort end end end end if $log for setting in settings #["*,10x3","10x4"] if queryset == "1week" || queryset == "1month" files += Dir.glob(File.join($options.map_dir, dataset+"_train_#{alg}_aol_#{setting}.mapping")).sort else files += Dir.glob(File.join($options.map_dir, dataset+"_train_#{alg}_#{queryset}_#{setting}.mapping")).sort end end end return files end for fname in ARGV $data_points_cache = Hash.new # read the cache that lists the correct datapoints to read for each # simdata/mapping combo, see if this is currently present File.open($options.data_points_cache) do |f| for line in f pair, start_point = line.split("\t") $data_points_cache[pair] = start_point.to_f end end for mapping in get_mappings(fname) run(fname, mapping) end #base = File.basename(fname, ".simdata") #dataset, queryset, alg = base.split("_") #files = Dir.glob(File.join($options.map_dir, dataset+"_train_#{alg}_#{queryset}_*.mapping")) #for rmap in files.sort # run(fname, rmap) #end #files = Dir.glob(File.join($options.map_dir, dataset+"_random_*.mapping")) #for rmap in files.sort # run(fname, rmap) #end #files = Dir.glob(File.join($options.map_dir, dataset+"_size_*.mapping")) #for rmap in files.sort # run(fname, rmap) #end end