require 'em-websocket' require 'bugsnag' module JamWebsockets class Server def initialize(options={}) EM::WebSocket.close_timeout = 10 # the default of 60 is pretty intense @log = Logging.logger[self] @count=0 @router = Router.new @ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base] @last_conn_check = nil end def run(options={}) host = "0.0.0.0" port = options[:port] trust_port = port + 1 connect_time_stale_client = options[:connect_time_stale_client].to_i connect_time_expire_client = options[:connect_time_expire_client].to_i connect_time_stale_browser = options[:connect_time_stale_browser].to_i connect_time_expire_browser = options[:connect_time_expire_browser].to_i max_connections_per_user = options[:max_connections_per_user].to_i chat_enabled = options[:chat_enabled].nil? ? true : options[:chat_enabled] chat_blast = options[:chat_blast].nil? ? true : options[:chat_blast] gateway_name = options[:gateway_name] rabbitmq_host = options[:rabbitmq_host] rabbitmq_port = options[:rabbitmq_port].to_i allow_dynamic_registration = options[:allow_dynamic_registration].nil? ? true : options[:allow_dynamic_registration] Stats::init(options) calling_thread = options[:calling_thread] trust_check = TrustCheck.new(trust_port, options[:cidr]) @log.info "starting server #{host}:#{port} staleness_time=#{connect_time_stale_client}; reconnect time = #{connect_time_expire_client}, rabbitmq=#{rabbitmq_host}:#{rabbitmq_port} gateway_name=#{gateway_name}" EventMachine.error_handler{|e| puts "unhandled error #{e}" puts "unhandled error #{e.backtrace}" @log.error "unhandled error #{e}" @log.error "unhandled error #{e.backtrace}" # SHOULD WE JUST DIE HERE? This was seen in production, and the gateway stopped after this!!! begin Bugsnag.notify(e) rescue => bugsnag_e puts "unable to report to bugsnag #{bugsnag_e}" @log.error "unable to report to bugsnag #{bugsnag_e}" end } EventMachine.run do @router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port, max_connections_per_user: max_connections_per_user, gateway: gateway_name, allow_dynamic_registration: allow_dynamic_registration, chat_enabled: chat_enabled, chat_blast: chat_blast) do @router.init #start_connection_expiration #start_client_expiration #start_connection_flagger start_stats_dump start_websocket_listener(host, port, trust_port, trust_check, options[:emwebsocket_debug]) calling_thread.wakeup if calling_thread end # if you don't do this, the app won't exit unless you kill -9 at_exit do @log.info "cleaning up server" @log.info "Exit error: #{$ERROR_INFO}" if $ERROR_INFO @log.info "Exit position: #{$ERROR_POSITION}" if $ERROR_POSITION @router.cleanup end end end def stop EventMachine::stop_event_loop end def check_for_em_drift(timer) # if our timer check is a full second off, say what's up drift = Time.now - @last_conn_check @router.highest_drift = drift if drift > @router.highest_drift @last_conn_check = Time.now end def start_websocket_listener(listen_ip, port, trust_port, trust_check, emwebsocket_debug) EventMachine::WebSocket.run(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws| #@log.info "new client #{ws}" @router.new_client(ws, false) end EventMachine::WebSocket.run(:host => listen_ip, :port => trust_port, :debug => emwebsocket_debug) do |ws| @log.info "new latency_tester client #{ws}" # verify this connection came in from a valid subnet, if specified ip = extract_ip(ws) if trust_check.trusted?(ip, trust_port) @router.new_client(ws, true) else @log.warn("untrusted client attempted to connect to #{listen_ip}:#{trust_port} from #{ip}") ws.close end end @log.debug("started websocket") end def start_connection_expiration # one cleanup on startup @router.wipe_all_connections #@router.periodical_check_connections @last_conn_check = Time.now timer = 2 EventMachine::PeriodicTimer.new(timer) do check_for_em_drift(timer) time_it('conn_expire') { safety_net { sane_logging { @router.periodical_check_connections } } } end end def start_client_expiration # one cleanup on startup @router.periodical_check_clients EventMachine::PeriodicTimer.new(30) do time_it('client_expire') { safety_net { sane_logging { @router.periodical_check_clients } } } end end def start_connection_flagger # one cleanup on startup @router.periodical_flag_connections EventMachine::PeriodicTimer.new(2) do time_it('conn_flagger') { safety_net { sane_logging { @router.periodical_flag_connections } } } end end def start_stats_dump EventMachine::PeriodicTimer.new(60) do time_it('stats_dump') { safety_net { @router.periodical_stats_dump } } end end # this was added for this reason: https://jamkazam.atlassian.net/browse/VRFS-2425 # if an unhandled exception occurs in PeriodicTimer, it just kills all future timers; doesn't kill the app. # not really what you want. # so, we signal to Bugsnag, so we know really bad stuff is happening, but we also move def safety_net(&blk) begin blk.call rescue => e #Bugsnag.notify(e) @log.error("unhandled exception in EM Timer #{e}") puts "Error during processing: #{$!}" puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}" end end def time_it(cat, &blk) start = Time.now blk.call time = Time.now - start @router.time_it_sums[cat] = (@router.time_it_sums[cat] || 0) + time @log.warn("LONG TIME #{cat}: #{time}") if time > 1 end def sane_logging(&blk) # used around repeated transactions that cause too much ActiveRecord::Base logging # example is handling heartbeats begin original_level = @ar_base_logger.level if @ar_base_logger @ar_base_logger.level = :info if @ar_base_logger blk.call ensure @ar_base_logger.level = original_level if @ar_base_logger end end private def extract_ip(client) Socket.unpack_sockaddr_in(client.get_peername)[1] end end end