diff --git a/web/app/assets/javascripts/JamServer.js b/web/app/assets/javascripts/JamServer.js index ee570a513..55dd7cc00 100644 --- a/web/app/assets/javascripts/JamServer.js +++ b/web/app/assets/javascripts/JamServer.js @@ -23,6 +23,7 @@ var mode = null; // heartbeat + var startHeartbeatTimeout = null; var heartbeatInterval = null; var heartbeatMS = null; var connection_expire_time = null; @@ -104,6 +105,12 @@ heartbeatInterval = null; } + // stop the heartbeat start delay from happening + if (startHeartbeatTimeout != null) { + clearTimeout(startHeartbeatTimeout); + startHeartbeatTimeout = null; + } + // stop checking for heartbeat acks if (heartbeatAckCheckInterval != null) { clearTimeout(heartbeatAckCheckInterval); @@ -236,9 +243,41 @@ heartbeatMS = payload.heartbeat_interval * 1000; connection_expire_time = payload.connection_expire_time * 1000; logger.info("loggedIn(): clientId=" + app.clientId + " heartbeat=" + payload.heartbeat_interval + "s expire_time=" + payload.connection_expire_time + 's'); - heartbeatInterval = context.setInterval(_heartbeat, heartbeatMS); - heartbeatAckCheckInterval = context.setInterval(_heartbeatAckCheck, 1000); - lastHeartbeatAckTime = new Date(new Date().getTime() + heartbeatMS); // add a little forgiveness to server for initial heartbeat + + // add some randomness to help move heartbeats apart from each other + + // send 1st heartbeat somewhere between 0 - 0.5 of the connection expire time + var randomStartTime = connection_expire_time * (Math.random() / 2) + + if (startHeartbeatTimeout) { + logger.warn("start heartbeat timeout is active; should be null") + clearTimeout(startHeartbeatTimeout) + } + + if (heartbeatInterval != null) { + logger.warn("heartbeatInterval is active; should be null") + clearInterval(heartbeatInterval); + heartbeatInterval = null; + } + + if (heartbeatAckCheckInterval != null) { + logger.warn("heartbeatAckCheckInterval is active; should be null") + clearInterval(heartbeatAckCheckInterval); + heartbeatAckCheckInterval = null; + } + + startHeartbeatTimeout = setTimeout(function() { + if(server.connected) { + heartbeatInterval = context.setInterval(_heartbeat, heartbeatMS); + heartbeatAckCheckInterval = context.setInterval(_heartbeatAckCheck, 1000); + lastHeartbeatAckTime = new Date(new Date().getTime() + heartbeatMS); // add a little forgiveness to server for initial heartbeat + } + }, randomStartTime) + + logger.info("starting heartbeat timer in " + randomStartTime/1000 + 's') + + + connectDeferred.resolve(); $self.triggerHandler(EVENTS.CONNECTION_UP) @@ -295,6 +334,11 @@ logger.debug(payload.error_code + ": no longer reconnecting") server.noReconnect = true; // stop trying to log in!! } + else if (payload.error_code == 'no_reconnect') { + logger.debug(payload.error_code + ": no longer reconnecting") + server.noReconnect = true; // stop trying to log in!! + context.JK.Banner.showAlert("Misbehaved Client", "Please restart your application in order to continue using JamKazam.") + } } /////////////////// @@ -384,7 +428,7 @@ } function formatDelaySecs(secs) { - return $('' + secs + ' ' + (secs == 1 ? ' second.s' : 'seconds.') + ''); + return $('' + secs + ' ' + (secs == 1 ? ' second.s' : 'seconds.') + ''); } function setCountdown($parent) { diff --git a/websocket-gateway/lib/jam_websockets/router.rb b/websocket-gateway/lib/jam_websockets/router.rb index 0757db0b8..fd5202450 100644 --- a/websocket-gateway/lib/jam_websockets/router.rb +++ b/websocket-gateway/lib/jam_websockets/router.rb @@ -27,11 +27,17 @@ module JamWebsockets :heartbeat_interval_browser, :connect_time_expire_browser, :connect_time_stale_browser, + :maximum_minutely_heartbeat_rate_browser, + :maximum_minutely_heartbeat_rate_client, :max_connections_per_user, :gateway_name, :client_lookup, :time_it_sums, - :profile_it_sums + :profile_it_sums, + :highest_drift, + :heartbeat_tracker + :temp_ban + def initialize() @log = Logging.logger[self] @@ -53,11 +59,15 @@ module JamWebsockets @heartbeat_interval_browser= nil @connect_time_expire_browser= nil @connect_time_stale_browser= nil + @maximum_minutely_heartbeat_rate_browser = nil + @maximum_minutely_heartbeat_rate_client = nil @gateway_name = nil @ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base] @message_stats = {} @time_it_sums = {} @profile_it_sums = {} + @heartbeat_tracker = {} + @temp_ban = {} @login_success_count = 0 @login_fail_count = 0 @@ -66,6 +76,8 @@ module JamWebsockets @user_message_counts = {} @largest_message = nil @largest_message_user = nil + @highest_drift = 0 + end def start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, options={:host => "localhost", :port => 5672, :max_connections_per_user => 10, :gateway => 'default', :allow_dynamic_registration => true}, &block) @@ -82,6 +94,11 @@ module JamWebsockets @gateway_name = options[:gateway] @allow_dynamic_registration = options[:allow_dynamic_registration] + # determine the maximum amount of heartbeats we should get per user + @maximum_minutely_heartbeat_rate_client = ((@heartbeat_interval_client / 60.0) * 2).ceil + 3 + @maximum_minutely_heartbeat_rate_browser = ((@heartbeat_interval_browser / 60.0) * 2).ceil + 3 + + @log.info("maxmium minutely timer #{maximum_minutely_heartbeat_rate_client}") begin @amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port]) @amqp_connection_manager.connect do |channel| @@ -850,6 +867,36 @@ module JamWebsockets end end + def add_to_ban(user, reason) + user_ban = @temp_ban[user.id] + + if user_ban.nil? + user_ban = {} + @temp_ban[user.id] = user_ban + end + + # allow user back in, after 10 minutes + user_ban[:allow] = Time.now + 600 + + @log.info("user #{user} banned for 10 minutes. reason #{reason}") + end + + def runaway_heartbeat(heartbeat, context) + heartbeat_count = @heartbeat_tracker[context.user.id] || 0 + heartbeat_count += 1 + @heartbeat_tracker[context.user.id] = heartbeat_count + + if heartbeat_count > (context.client_type == 'browser' ? @maximum_minutely_heartbeat_rate_browser : @maximum_minutely_heartbeat_rate_client) + @log.warn("user #{context.user} sending too many heartbeats: #{heartbeat_count}") if heartbeat_count % 100 == 0 + + add_to_ban(context.user, 'too many heartbeats') + raise SessionError.new('too many heartbeats', 'empty_login') + else + false + end + + end + def handle_heartbeat(heartbeat, heartbeat_message_id, client) unless context = @clients[client] profile_it('heartbeat_context_gone') { @@ -858,6 +905,10 @@ module JamWebsockets raise SessionError, 'context state is gone. please reconnect.' } else + if runaway_heartbeat(heartbeat, context) + return + end + connection = nil profile_it('heartbeat_find_conn') { connection = Connection.find_by_client_id(context.client.client_id) @@ -961,6 +1012,13 @@ module JamWebsockets @log.debug "no user found with token #{token}" return nil else + + # check against temp ban list + if @temp_ban[user.id] + @log.debug("user #{user} is still banned; rejecting login") + raise SessionError.new('login rejected temporarily', 'empty_login') + end + @log.debug "#{user} login via token" return user end @@ -971,6 +1029,12 @@ module JamWebsockets # attempt login with username and password user = User.find_by_email(username) + # check against temp ban list + if !user.nil? && @temp_ban[user.id] + @log.debug("user #{user} is still banned; rejecting login") + raise SessionError.new('login rejected temporarily', 'empty_login') + end + if !user.nil? && user.valid_password?(password) @log.debug "#{user} login via password" return user @@ -1157,13 +1221,17 @@ module JamWebsockets end def periodical_stats_dump + # assume 60 seconds per status dump stats = @message_stats.sort_by{|k,v| -v} stats.map { |i| i[1] = (i[1] / 60.0).round(2) } @log.info("msg/s: " + stats.map { |i| i.join('=>') }.join(', ')) - @log.info("largest msg from #{@largest_message_user}: #{@largest_message.length}b") + @log.info("largest msg from #{@largest_message_user}: #{@largest_message ? @largest_message.length : 0}b") + if @highest_drift > 1 + @log.info("highest drift: #{@highest_drift - 2}") + end total_time = 0 time_sums = @time_it_sums.sort_by{|k,v| -v} @@ -1184,11 +1252,15 @@ module JamWebsockets profile_sums = @profile_it_sums.sort_by{|k,v| -v} profile_sums.each do | cat, cat_time | @log.info("profiled #{cat} used time: #{cat_time}") - end - + @temp_ban.each do |user_id, data| + if Time.now > data[:allow] + @log.info("user #{user_id} allowed back in") + @temp_ban.delete(user_id) + end + end # stuff in extra stats into the @message_stats and send it all off @message_stats['gateway_name'] = @gateway_name @@ -1197,6 +1269,10 @@ module JamWebsockets @message_stats['connected'] = @connected_count @message_stats['disconnected'] = @disconnected_count @message_stats['largest_msg'] = @largest_message ? @largest_message.length : 0 + @message_stats['highest_drift'] = @highest_drift - 2 # 2 comes from the server's 2 second timer for the drift check + @message_stats['total_time'] = total_time + @message_stats['banned_users'] = @temp_ban.length + Stats.write('gateway.stats', @message_stats) @@ -1210,7 +1286,8 @@ module JamWebsockets @largest_message = nil @largest_message_user = nil @time_it_sums = {} - + @highest_drift = 0 + @heartbeat_tracker = {} end def cleanup_clients_with_ids(expired_connections) diff --git a/websocket-gateway/lib/jam_websockets/server.rb b/websocket-gateway/lib/jam_websockets/server.rb index 0d194a4b2..25322dc98 100644 --- a/websocket-gateway/lib/jam_websockets/server.rb +++ b/websocket-gateway/lib/jam_websockets/server.rb @@ -67,9 +67,8 @@ module JamWebsockets def check_for_em_drift(timer) # if our timer check is a full second off, say what's up - if Time.now - @last_conn_check > timer + 1 - @log.error("significant drift! Should be 2 seconds. Instead was: #{Time.now - @last_conn_check}") - end + drift = Time.now - @last_conn_check + @router.highest_drift = drift if drift > @router.highest_drift @last_conn_check = Time.now end @@ -142,6 +141,9 @@ module JamWebsockets rescue => e Bugsnag.notify(e) @log.error("unhandled exception in EM Timer #{e}") + puts "Error during processing: #{$!}" + puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}" + end end