113 lines
3.7 KiB
Ruby
113 lines
3.7 KiB
Ruby
require 'em-websocket'
|
|
require 'bugsnag'
|
|
|
|
module JamWebsockets
|
|
|
|
class Server
|
|
|
|
def initialize(options={})
|
|
EM::WebSocket.close_timeout = 10 # the default of 60 is pretty intense
|
|
@log = Logging.logger[self]
|
|
@count=0
|
|
@router = Router.new
|
|
@ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base]
|
|
end
|
|
|
|
def run(options={})
|
|
host = "0.0.0.0"
|
|
port = options[:port]
|
|
connect_time_stale = options[:connect_time_stale].to_i
|
|
connect_time_expire = options[:connect_time_expire].to_i
|
|
rabbitmq_host = options[:rabbitmq_host]
|
|
rabbitmq_port = options[:rabbitmq_port].to_i
|
|
calling_thread = options[:calling_thread]
|
|
|
|
@log.info "starting server #{host}:#{port} staleness_time=#{connect_time_stale}; reconnect time = #{connect_time_expire}, rabbitmq=#{rabbitmq_host}:#{rabbitmq_port}"
|
|
|
|
EventMachine.error_handler{|e|
|
|
@log.error "unhandled error #{e}"
|
|
Bugsnag.notify(e)
|
|
}
|
|
|
|
EventMachine.run do
|
|
@router.start(connect_time_stale, host: rabbitmq_host, port: rabbitmq_port) do
|
|
# take stale off the expire limit because the call to stale will
|
|
# touch the updated_at column, adding an extra stale limit to the expire time limit
|
|
# expire_time = connect_time_expire > connect_time_stale ? connect_time_expire - connect_time_stale : connect_time_expire
|
|
expire_time = connect_time_expire
|
|
start_connection_expiration(expire_time)
|
|
start_connection_flagger(connect_time_stale)
|
|
start_websocket_listener(host, port, options[:emwebsocket_debug])
|
|
calling_thread.wakeup if calling_thread
|
|
end
|
|
|
|
# if you don't do this, the app won't exit unless you kill -9
|
|
at_exit do
|
|
@log.info "cleaning up server"
|
|
@router.cleanup
|
|
end
|
|
end
|
|
end
|
|
|
|
def stop
|
|
EventMachine::stop_event_loop
|
|
end
|
|
|
|
def start_websocket_listener(listen_ip, port, emwebsocket_debug)
|
|
EventMachine::WebSocket.run(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws|
|
|
@log.info "new client #{ws}"
|
|
@router.new_client(ws)
|
|
end
|
|
@log.debug("started websocket")
|
|
end
|
|
|
|
def start_connection_expiration(stale_max_time)
|
|
# one cleanup on startup
|
|
expire_stale_connections(stale_max_time)
|
|
|
|
EventMachine::PeriodicTimer.new(stale_max_time) do
|
|
sane_logging { expire_stale_connections(stale_max_time) }
|
|
end
|
|
|
|
end
|
|
|
|
def expire_stale_connections(stale_max_time)
|
|
client_ids = []
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
client_ids = connection_manager.stale_connection_client_ids(stale_max_time)
|
|
end
|
|
# @log.debug("*** expire_stale_connections(#{stale_max_time}): client_ids = #{client_ids.inspect}")
|
|
@router.cleanup_clients_with_ids(client_ids)
|
|
end
|
|
|
|
def start_connection_flagger(flag_max_time)
|
|
# one cleanup on startup
|
|
flag_stale_connections(flag_max_time)
|
|
|
|
EventMachine::PeriodicTimer.new(flag_max_time/2) do
|
|
sane_logging { flag_stale_connections(flag_max_time) }
|
|
end
|
|
end
|
|
|
|
def flag_stale_connections(flag_max_time)
|
|
# @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds")
|
|
ConnectionManager.active_record_transaction do |connection_manager|
|
|
connection_manager.flag_stale_connections(flag_max_time)
|
|
end
|
|
end
|
|
|
|
def sane_logging(&blk)
|
|
# used around repeated transactions that cause too much ActiveRecord::Base logging
|
|
begin
|
|
original_level = @ar_base_logger.level
|
|
@ar_base_logger.level = :info if @ar_base_logger
|
|
blk.call
|
|
ensure
|
|
@ar_base_logger.level = original_level if @ar_base_logger
|
|
end
|
|
end
|
|
|
|
end
|
|
|
|
end
|