130 lines
3.3 KiB
Ruby
130 lines
3.3 KiB
Ruby
|
|
require 'influxdb'
|
|
|
|
# monkey patch InfluxDB client to clear the queue when asked to stop
|
|
=begin
|
|
module InfluxDB
|
|
class Client
|
|
def stop!
|
|
@queue.clear if @queue
|
|
@stopped = true
|
|
end
|
|
end
|
|
end
|
|
=end
|
|
|
|
module InfluxDB
|
|
class Worker
|
|
def spawn_threads!
|
|
NUM_WORKER_THREADS.times do |thread_num|
|
|
log :debug, "Spawning background worker thread #{thread_num}."
|
|
|
|
Thread.new do
|
|
Thread.current[:influxdb] = self.object_id
|
|
|
|
at_exit do
|
|
log :debug, "Thread exiting, bailing out (not flushing queue)"
|
|
end
|
|
|
|
while !client.stopped?
|
|
self.check_background_queue(thread_num)
|
|
sleep rand(SLEEP_INTERVAL)
|
|
end
|
|
end
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
end
|
|
|
|
module JamRuby
|
|
class Stats
|
|
|
|
class << self
|
|
attr_accessor :client, :host, :ignore
|
|
@@log = Logging.logger[JamRuby::Stats]
|
|
end
|
|
|
|
def self.destroy!
|
|
if @client
|
|
#@client.queue.clear if @client.queue
|
|
@client.stop!
|
|
end
|
|
end
|
|
|
|
def self.init(options)
|
|
|
|
influxdb_database = options[:influxdb_database]
|
|
influxdb_username = options[:influxdb_username]
|
|
influxdb_password = options[:influxdb_password]
|
|
influxdb_hosts = options[:influxdb_hosts]
|
|
influxdb_port = options[:influxdb_port]
|
|
influxdb_async = options[:influxdb_async].nil? ? true : options[:influxdb_async]
|
|
|
|
# WHEN WE TRY TO BRING INFLUX BACK, TAKE THIS OUT
|
|
influx_db_right_now = false
|
|
|
|
if influx_db_right_now && influxdb_database && influxdb_database.length > 0
|
|
@client = InfluxDB::Client.new influxdb_database,
|
|
username: influxdb_username,
|
|
password: influxdb_password,
|
|
time_precision: 's',
|
|
hosts: influxdb_hosts,
|
|
port: influxdb_port,
|
|
async:influxdb_async,
|
|
retry: -1
|
|
@host = `hostname`.strip
|
|
else
|
|
self.ignore = true
|
|
@@log.debug("stats client not initiated")
|
|
end
|
|
|
|
end
|
|
|
|
def self.write(name, data)
|
|
return if self.ignore # doing any writes in a test environment cause annoying puts to occur
|
|
|
|
if @client && data && data.length > 0
|
|
if data.has_key?('values') || data.has_key?(:values)
|
|
@client.write_point(name, data)
|
|
data['timestamp'] = Time.now.to_i
|
|
|
|
tags = data['tags']
|
|
key = 'tags' if tags
|
|
tags ||= data[:tags]
|
|
key = :tags if key.nil?
|
|
tags ||= {}
|
|
key = :tags if key.nil?
|
|
|
|
tags['host'] = @host
|
|
data[key] = tags
|
|
else
|
|
tags = {}
|
|
values = {}
|
|
for k,v in data
|
|
if v.is_a?(String)
|
|
tags[k] = v
|
|
else
|
|
values[k] = v
|
|
end
|
|
end
|
|
data = {tags: tags, values: values}
|
|
end
|
|
|
|
@client.write_point(name, data)
|
|
end
|
|
end
|
|
|
|
def self.timer(name)
|
|
start = Time.now
|
|
begin
|
|
yield
|
|
Stats.write(name, result: 'success', duration: Time.now - start)
|
|
rescue Exception => e
|
|
Stats.write(name, result: 'failure', duration: Time.now - start, error: e.to_s)
|
|
raise e
|
|
end
|
|
end
|
|
end
|
|
end |