From ad266e5b808e3e5faa7338fe4f50bc5953bf5d71 Mon Sep 17 00:00:00 2001 From: Seth Call Date: Wed, 30 Apr 2014 15:29:10 -0500 Subject: [PATCH] * VRFS-1663 (diagnostics), VRFS-1657 (configurable timer for heartbeats), VRFS-1653 (websocket connection cleanup) --- db/up/connection_stale_expire.sql | 4 +- ruby/lib/jam_ruby/connection_manager.rb | 28 ++++--- ruby/lib/jam_ruby/models/connection.rb | 6 +- ruby/lib/jam_ruby/models/feed.rb | 2 +- ruby/lib/jam_ruby/models/user.rb | 8 +- .../jam_ruby/resque/icecast_config_writer.rb | 2 +- .../resque/scheduled/icecast_source_check.rb | 2 +- ruby/spec/jam_ruby/connection_manager_spec.rb | 6 +- ruby/spec/jam_ruby/models/user_spec.rb | 6 +- web/app/assets/javascripts/JamServer.js | 58 +++++++++------ web/app/views/clients/index.html.erb | 1 - web/config/application.rb | 12 ++- web/config/environments/development.rb | 6 +- web/config/initializers/eventmachine.rb | 6 +- web/spec/spec_helper.rb | 6 +- web/spec/support/utilities.rb | 4 +- websocket-gateway/config/application.yml | 6 +- .../lib/jam_websockets/client_context.rb | 2 +- .../lib/jam_websockets/router.rb | 74 ++++++++++++++----- .../lib/jam_websockets/server.rb | 29 ++++---- 20 files changed, 162 insertions(+), 106 deletions(-) diff --git a/db/up/connection_stale_expire.sql b/db/up/connection_stale_expire.sql index 57f34a1f1..a98953f1b 100644 --- a/db/up/connection_stale_expire.sql +++ b/db/up/connection_stale_expire.sql @@ -1,2 +1,2 @@ -ALTER TABLE connections ADD COLUMN stale_time INTEGER NOT NULL DEFAULT 20; -ALTER TABLE connections ADD COLUMN expire_time INTEGER NOT NULL DEFAULT 30; \ No newline at end of file +ALTER TABLE connections ADD COLUMN stale_time INTEGER NOT NULL DEFAULT 40; +ALTER TABLE connections ADD COLUMN expire_time INTEGER NOT NULL DEFAULT 60; \ No newline at end of file diff --git a/ruby/lib/jam_ruby/connection_manager.rb b/ruby/lib/jam_ruby/connection_manager.rb index dd96b6765..1f39755c6 100644 --- a/ruby/lib/jam_ruby/connection_manager.rb +++ b/ruby/lib/jam_ruby/connection_manager.rb @@ -44,7 +44,7 @@ module JamRuby end # reclaim the existing connection, if ip_address is not nil then perhaps a new address as well - def reconnect(conn, reconnect_music_session_id, ip_address) + def reconnect(conn, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time) music_session_id = nil reconnected = false @@ -54,7 +54,7 @@ module JamRuby joined_session_at_expression = 'NULL' unless reconnect_music_session_id.nil? music_session_id_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN music_session_id ELSE NULL END)" - joined_session_at_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN NOW() ELSE NULL END)" + joined_session_at_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN NOW() at time zone 'utc' ELSE NULL END)" end if ip_address and !ip_address.eql?(conn.ip_address) @@ -101,7 +101,7 @@ module JamRuby end sql =< "JamRuby::MusicSession" has_many :tracks, :class_name => "JamRuby::Track", :inverse_of => :connection, :foreign_key => 'connection_id', :dependent => :delete_all - validates :as_musician, :inclusion => {:in => [true, false]} - validates :client_type, :inclusion => {:in => ['client', 'browser']} + validates :client_type, :inclusion => {:in => [TYPE_CLIENT, TYPE_BROWSER]} validate :can_join_music_session, :if => :joining_session? after_save :require_at_least_one_track_when_in_session, :if => :joining_session? after_create :did_create diff --git a/ruby/lib/jam_ruby/models/feed.rb b/ruby/lib/jam_ruby/models/feed.rb index 64f002a1d..a04f9091f 100644 --- a/ruby/lib/jam_ruby/models/feed.rb +++ b/ruby/lib/jam_ruby/models/feed.rb @@ -61,7 +61,7 @@ module JamRuby # handle time range days = TIME_RANGES[time_range] if days > 0 - query = query.where("feeds.created_at > NOW() - '#{days} day'::INTERVAL") + query = query.where("feeds.created_at > NOW() at time zone 'utc' - '#{days} day'::INTERVAL") end # handle type filters diff --git a/ruby/lib/jam_ruby/models/user.rb b/ruby/lib/jam_ruby/models/user.rb index ee4ad0c60..44147da44 100644 --- a/ruby/lib/jam_ruby/models/user.rb +++ b/ruby/lib/jam_ruby/models/user.rb @@ -289,12 +289,12 @@ module JamRuby @mods_json ||= mods ? JSON.parse(mods, symbolize_names: true) : {} end - def heartbeat_interval - mods_json[:heartbeat_interval] + def heartbeat_interval_client + mods_json[:heartbeat_interval_client] end - def connection_expire_time - mods_json[:connection_expire_time] + def connection_expire_time_client + mods_json[:connection_expire_time_client] end def recent_history diff --git a/ruby/lib/jam_ruby/resque/icecast_config_writer.rb b/ruby/lib/jam_ruby/resque/icecast_config_writer.rb index 05ce40a26..c40be7d5a 100644 --- a/ruby/lib/jam_ruby/resque/icecast_config_writer.rb +++ b/ruby/lib/jam_ruby/resque/icecast_config_writer.rb @@ -21,7 +21,7 @@ module JamRuby def self.queue_jobs_needing_retry # if we haven't seen updated_at be tickled in 5 minutes, but config_changed is still set to TRUE, this record has gotten stale - IcecastServer.find_each(:conditions => "config_changed = 1 AND updated_at < (NOW() - interval '#{APP_CONFIG.icecast_max_missing_check} second')", :batch_size => 100) do |server| + IcecastServer.find_each(:conditions => "config_changed = 1 AND updated_at < (NOW() at time zone 'utc' - interval '#{APP_CONFIG.icecast_max_missing_check} second')", :batch_size => 100) do |server| IcecastConfigWriter.enqueue(server.server_id) end end diff --git a/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb b/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb index 4927e6568..14749829f 100644 --- a/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb +++ b/ruby/lib/jam_ruby/resque/scheduled/icecast_source_check.rb @@ -32,7 +32,7 @@ module JamRuby def run # if we haven't seen updated_at be tickled in 5 minutes, but config_changed is still set to TRUE, this record has gotten stale - IcecastMount.find_each(lock: true, :conditions => "sourced_needs_changing_at < (NOW() - interval '#{APP_CONFIG.icecast_max_sourced_changed} second')", :batch_size => 100) do |mount| + IcecastMount.find_each(lock: true, :conditions => "sourced_needs_changing_at < (NOW() at time zone 'utc' - interval '#{APP_CONFIG.icecast_max_sourced_changed} second')", :batch_size => 100) do |mount| if mount.music_session_id mount.with_lock do handle_notifications(mount) diff --git a/ruby/spec/jam_ruby/connection_manager_spec.rb b/ruby/spec/jam_ruby/connection_manager_spec.rb index 853951416..6ae0fc029 100644 --- a/ruby/spec/jam_ruby/connection_manager_spec.rb +++ b/ruby/spec/jam_ruby/connection_manager_spec.rb @@ -247,15 +247,15 @@ describe ConnectionManager do sleep(1) - num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"]) + num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() at time zone 'utc' - interval '#{1} second') AND aasm_state = 'connected'"]) num.should == 1 # this should change the aasm_state to stale @connman.flag_stale_connections(1) - num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'connected'"]) + num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() at time zone 'utc' - interval '#{1} second') AND aasm_state = 'connected'"]) num.should == 0 - num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() - interval '#{1} second') AND aasm_state = 'stale'"]) + num = JamRuby::Connection.count(:conditions => ["updated_at < (NOW() at time zone 'utc' - interval '#{1} second') AND aasm_state = 'stale'"]) num.should == 1 assert_num_connections(client_id, 1) diff --git a/ruby/spec/jam_ruby/models/user_spec.rb b/ruby/spec/jam_ruby/models/user_spec.rb index 956b7cff1..0e0396fa3 100644 --- a/ruby/spec/jam_ruby/models/user_spec.rb +++ b/ruby/spec/jam_ruby/models/user_spec.rb @@ -464,11 +464,11 @@ describe User do end it "should return connection_expire_time" do - @user.connection_expire_time.should be_nil - @user.mods = {connection_expire_time: 5}.to_json + @user.connection_expire_time_client.should be_nil + @user.mods = {connection_expire_time_client: 5}.to_json @user.save! @user = User.find(@user.id) # necessary because mods_json is cached in the model - @user.connection_expire_time.should == 5 + @user.connection_expire_time_client.should == 5 end end =begin diff --git a/web/app/assets/javascripts/JamServer.js b/web/app/assets/javascripts/JamServer.js index de7fde2e4..077df1392 100644 --- a/web/app/assets/javascripts/JamServer.js +++ b/web/app/assets/javascripts/JamServer.js @@ -14,10 +14,14 @@ context.JK.JamServer = function (app) { + // uniquely identify the websocket connection + var channelId = null; + var clientType = null; + // heartbeat var heartbeatInterval = null; var heartbeatMS = null; - var heartbeatMissedMS = 10000; // if 10 seconds go by and we haven't seen a heartbeat ack, get upset + var connection_expire_time = null; var lastHeartbeatSentTime = null; var lastHeartbeatAckTime = null; var lastHeartbeatFound = false; @@ -55,7 +59,6 @@ server.socketClosedListeners = []; server.connected = false; - var clientType = context.JK.clientType(); function heartbeatStateReset() { lastHeartbeatSentTime = null; @@ -136,8 +139,8 @@ // check if the server is still sending heartbeat acks back down // this logic equates to 'if we have not received a heartbeat within heartbeatMissedMS, then get upset - if (new Date().getTime() - lastHeartbeatAckTime.getTime() > heartbeatMissedMS) { - logger.error("no heartbeat ack received from server after ", heartbeatMissedMS, " seconds . giving up on socket connection"); + if (new Date().getTime() - lastHeartbeatAckTime.getTime() > connection_expire_time) { + logger.error("no heartbeat ack received from server after ", connection_expire_time, " seconds . giving up on socket connection"); lastDisconnectedReason = 'NO_HEARTBEAT_ACK'; context.JK.JamServer.close(true); } @@ -184,11 +187,11 @@ heartbeatMS = payload.heartbeat_interval * 1000; - logger.debug("jamkazam.js.loggedIn(): clientId now " + app.clientId + "; Setting up heartbeat every " + heartbeatMS + " MS"); + connection_expire_time = payload.connection_expire_time * 1000; + logger.debug("jamkazam.js.loggedIn(): clientId=" + app.clientId + ", heartbeat=" + heartbeatMS + "ms, expire_time=" + connection_expire_time); heartbeatInterval = context.setInterval(_heartbeat, heartbeatMS); heartbeatAckCheckInterval = context.setInterval(_heartbeatAckCheck, 1000); lastHeartbeatAckTime = new Date(new Date().getTime() + heartbeatMS); // add a little forgiveness to server for initial heartbeat - connectDeferred.resolve(); app.activeElementEvent('afterConnect', payload); @@ -250,23 +253,26 @@ rest.createDiagnostic({ type: lastDisconnectedReason, - data: {logs: logger.logCache, client_type: clientType, client_id: server.clientID} + data: {logs: logger.logCache, client_type: clientType, client_id: server.clientID, channel_id: channelId} + }) + .always(function() { + if ($currentDisplay.is('.no-websocket-connection')) { + // this path is the 'not in session path'; so there is nothing else to do + $currentDisplay.hide(); + + // TODO: tell certain elements that we've reconnected + } + else { + // this path is the 'in session' path, where we actually reload the page + context.JK.CurrentSessionModel.leaveCurrentSession() + .always(function () { + window.location.reload(); + }); + } + server.reconnecting = false; }); - if ($currentDisplay.is('.no-websocket-connection')) { - // this path is the 'not in session path'; so there is nothing else to do - $currentDisplay.hide(); - // TODO: tell certain elements that we've reconnected - } - else { - // this path is the 'in session' path, where we actually reload the page - context.JK.CurrentSessionModel.leaveCurrentSession() - .always(function () { - window.location.reload(); - }); - } - server.reconnecting = false; } function buildOptions() { @@ -435,9 +441,14 @@ }; server.connect = function () { + if(!clientType) { + clientType = context.JK.clientType(); + } connectDeferred = new $.Deferred(); - logger.log("server.connect"); - var uri = context.JK.websocket_gateway_uri; // Set in index.html.erb. + channelId = context.JK.generateUUID(); // create a new channel ID for every websocket connection + logger.log("connecting websocket, channel_id: " + channelId); + + var uri = context.JK.websocket_gateway_uri + '?channel_id=' + channelId; // Set in index.html.erb. //var uri = context.gon.websocket_gateway_uri; // Leaving here for now, as we're looking for a better solution. server.socket = new context.WebSocket(uri); @@ -506,7 +517,7 @@ // onClose is called if either client or server closes connection server.onClose = function () { - logger.log("Socket to server closed.", arguments); + logger.log("Socket to server closed."); if (connectDeferred.state() === "pending") { connectDeferred.reject(); @@ -611,6 +622,7 @@ } function initialize() { + registerLoginAck(); registerHeartbeatAck(); registerSocketClosed(); diff --git a/web/app/views/clients/index.html.erb b/web/app/views/clients/index.html.erb index bd37726f2..b9fbc1d71 100644 --- a/web/app/views/clients/index.html.erb +++ b/web/app/views/clients/index.html.erb @@ -298,7 +298,6 @@ window.jamClient = interceptedJamClient; - } // Let's get things rolling... diff --git a/web/config/application.rb b/web/config/application.rb index 5175e0e1d..aa2078e65 100644 --- a/web/config/application.rb +++ b/web/config/application.rb @@ -105,11 +105,15 @@ if defined?(Bundler) # Websocket-gateway embedded configs config.websocket_gateway_enable = false if Rails.env=='test' - config.websocket_gateway_connect_time_stale = 2 - config.websocket_gateway_connect_time_expire = 5 + config.websocket_gateway_connect_time_stale_client = 4 + config.websocket_gateway_connect_time_expire_client = 6 + config.websocket_gateway_connect_time_stale_browser = 4 + config.websocket_gateway_connect_time_expire_browser = 6 else - config.websocket_gateway_connect_time_stale = 12 # 12 matches production - config.websocket_gateway_connect_time_expire = 20 # 20 matches production + config.websocket_gateway_connect_time_stale_client = 40 # 40 matches production + config.websocket_gateway_connect_time_expire_client = 60 # 60 matches production + config.websocket_gateway_connect_time_stale_browser = 40 # 40 matches production + config.websocket_gateway_connect_time_expire_browser = 60 # 60 matches production end config.websocket_gateway_internal_debug = false config.websocket_gateway_port = 6767 + ENV['JAM_INSTANCE'].to_i diff --git a/web/config/environments/development.rb b/web/config/environments/development.rb index 017f717a1..797e9aeba 100644 --- a/web/config/environments/development.rb +++ b/web/config/environments/development.rb @@ -68,8 +68,10 @@ SampleApp::Application.configure do # it's nice to have even admin accounts (which all the default ones are) generate GA data for testing config.ga_suppress_admin = false - config.websocket_gateway_connect_time_stale = 12 - config.websocket_gateway_connect_time_expire = 20 + config.websocket_gateway_connect_time_stale_client = 40 # 40 matches production + config.websocket_gateway_connect_time_expire_client = 60 # 60 matches production + config.websocket_gateway_connect_time_stale_browser = 40 # 40 matches production + config.websocket_gateway_connect_time_expire_browser = 60 # 60 matches production config.audiomixer_path = ENV['AUDIOMIXER_PATH'] || audiomixer_workspace_path || "/var/lib/audiomixer/audiomixer/audiomixerapp" diff --git a/web/config/initializers/eventmachine.rb b/web/config/initializers/eventmachine.rb index 64cf993cc..53858b4d9 100644 --- a/web/config/initializers/eventmachine.rb +++ b/web/config/initializers/eventmachine.rb @@ -9,8 +9,10 @@ unless $rails_rake_task JamWebsockets::Server.new.run( :port => APP_CONFIG.websocket_gateway_port, :emwebsocket_debug => APP_CONFIG.websocket_gateway_internal_debug, - :connect_time_stale => APP_CONFIG.websocket_gateway_connect_time_stale, - :connect_time_expire_client => APP_CONFIG.websocket_gateway_connect_time_expire, + :connect_time_stale_client => APP_CONFIG.websocket_gateway_connect_time_stale_client, + :connect_time_expire_client => APP_CONFIG.websocket_gateway_connect_time_expire_client, + :connect_time_stale_browser => APP_CONFIG.websocket_gateway_connect_time_stale_browser, + :connect_time_expire_browser=> APP_CONFIG.websocket_gateway_connect_time_expire_browser, :rabbitmq_host => APP_CONFIG.rabbitmq_host, :rabbitmq_port => APP_CONFIG.rabbitmq_port, :calling_thread => current) diff --git a/web/spec/spec_helper.rb b/web/spec/spec_helper.rb index 598a19c3e..94960bf13 100644 --- a/web/spec/spec_helper.rb +++ b/web/spec/spec_helper.rb @@ -75,8 +75,10 @@ Thread.new do JamWebsockets::Server.new.run( :port => 6769, :emwebsocket_debug => false, - :connect_time_stale => 2, - :connect_time_expire_client => 5, + :connect_time_stale_client => 4, + :connect_time_expire_client => 6, + :connect_time_stale_browser => 4, + :connect_time_expire_browser => 6, :rabbitmq_host => 'localhost', :rabbitmq_port => 5672, :calling_thread => current) diff --git a/web/spec/support/utilities.rb b/web/spec/support/utilities.rb index 3040a535f..deba66215 100644 --- a/web/spec/support/utilities.rb +++ b/web/spec/support/utilities.rb @@ -131,8 +131,8 @@ end def leave_music_session_sleep_delay # add a buffer to ensure WSG has enough time to expire - sleep_dur = (Rails.application.config.websocket_gateway_connect_time_stale + - Rails.application.config.websocket_gateway_connect_time_expire) * 1.4 + sleep_dur = (Rails.application.config.websocket_gateway_connect_time_stale_browser + + Rails.application.config.websocket_gateway_connect_time_expire_browser) * 1.4 sleep sleep_dur end diff --git a/websocket-gateway/config/application.yml b/websocket-gateway/config/application.yml index b25f37cea..1b5e37f1b 100644 --- a/websocket-gateway/config/application.yml +++ b/websocket-gateway/config/application.yml @@ -1,8 +1,8 @@ Defaults: &defaults - connect_time_stale_client: 20 - connect_time_expire_client: 30 + connect_time_stale_client: 40 + connect_time_expire_client: 62 connect_time_stale_browser: 40 - connect_time_expire_browser: 60 + connect_time_expire_browser: 62 development: port: 6767 diff --git a/websocket-gateway/lib/jam_websockets/client_context.rb b/websocket-gateway/lib/jam_websockets/client_context.rb index ecb065118..2c8c027cb 100644 --- a/websocket-gateway/lib/jam_websockets/client_context.rb +++ b/websocket-gateway/lib/jam_websockets/client_context.rb @@ -19,7 +19,7 @@ end def to_json - {user_id: @user.id, client_id: @client.client_id, msg_count: @msg_count, client_type: @client_type}.to_json + {user_id: @user.id, client_id: @client.client_id, msg_count: @msg_count, client_type: @client_type, socket_id: @client.socket_id}.to_json end def hash diff --git a/websocket-gateway/lib/jam_websockets/router.rb b/websocket-gateway/lib/jam_websockets/router.rb index 5ccd62ab7..8ddb254c4 100644 --- a/websocket-gateway/lib/jam_websockets/router.rb +++ b/websocket-gateway/lib/jam_websockets/router.rb @@ -10,7 +10,7 @@ include Jampb module EventMachine module WebSocket class Connection < EventMachine::Connection - attr_accessor :encode_json, :client_id, :user_id, :context # client_id is uuid we give to each client to track them as we like + attr_accessor :encode_json, :channel_id, :client_id, :user_id, :context # client_id is uuid we give to each client to track them as we like # http://stackoverflow.com/questions/11150147/how-to-check-if-eventmachineconnection-is-open attr_accessor :connected @@ -51,8 +51,10 @@ module JamWebsockets @thread_pool = nil @heartbeat_interval_client = nil @connect_time_expire_client = nil + @connect_time_stale_client = nil @heartbeat_interval_browser= nil @connect_time_expire_browser= nil + @connect_time_stale_browser= nil @ar_base_logger = ::Logging::Repository.instance[ActiveRecord::Base] end @@ -60,8 +62,12 @@ module JamWebsockets @log.info "startup" - @heartbeat_interval_client = connect_time_stale_client / 2 - @connect_time_expire_client = connect_time_expire_client + @heartbeat_interval_client = connect_time_stale_client / 2 + @connect_time_stale_client = connect_time_stale_client + @connect_time_expire_client = connect_time_expire_client + @heartbeat_interval_browser = connect_time_stale_browser / 2 + @connect_time_stale_browser = connect_time_stale_browser + @connect_time_expire_browser = connect_time_expire_browser begin @amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port]) @@ -221,8 +227,11 @@ module JamWebsockets client.encode_json = true client.onopen { |handshake| - #binding.pry - @log.debug "client connected #{client}" + # a unique ID for this TCP connection, to aid in debugging + client.channel_id = handshake.query["channel_id"] + + @log.debug "client connected #{client} with channel_id: #{client.channel_id}" + # check for '?pb' or '?pb=true' in url query parameters query_pb = handshake.query["pb"] @@ -447,6 +456,37 @@ module JamWebsockets end end + # returns heartbeat_interval, connection stale time, and connection expire time + def determine_connection_times(user, client_type) + + if client_type == Connection::TYPE_BROWSER + default_heartbeat = @heartbeat_interval_browser + default_stale = @connect_time_stale_browser + default_expire = @connect_time_expire_browser + else + default_heartbeat = @heartbeat_interval_client + default_stale = @connect_time_stale_client + default_expire = @connect_time_expire_client + end + + heartbeat_interval = user.heartbeat_interval_client || default_heartbeat + heartbeat_interval = heartbeat_interval.to_i + heartbeat_interval = default_heartbeat if heartbeat_interval == 0 # protect against bad config + connection_expire_time = user.connection_expire_time_client || default_expire + connection_expire_time = connection_expire_time.to_i + connection_expire_time = default_expire if connection_expire_time == 0 # protect against bad config + connection_stale_time = default_stale # no user override exists for this; not a very meaningful time right now + + if heartbeat_interval >= connection_stale_time + raise SessionError, "misconfiguration! heartbeat_interval (#{heartbeat_interval}) should be less than stale time (#{connection_stale_time})" + end + if connection_stale_time >= connection_expire_time + raise SessionError, "misconfiguration! stale time (#{connection_stale_time}) should be less than expire time (#{connection_expire_time})" + end + + [heartbeat_interval, connection_stale_time, connection_expire_time] + end + def handle_login(login, client) username = login.username if login.value_for_tag(1) password = login.password if login.value_for_tag(2) @@ -460,8 +500,7 @@ module JamWebsockets # you don't have to supply client_id in login--if you don't, we'll generate one if client_id.nil? || client_id.empty? - # give a unique ID to this client. This is used to prevent session messages - # from echoing back to the sender, for instance. + # give a unique ID to this client. client_id = UUIDTools::UUID.random_create.to_s end @@ -471,13 +510,13 @@ module JamWebsockets # this code must happen here, before we go any further, so that there is only one websocket connection per client_id existing_context = @client_lookup[client_id] if existing_context - # in reconnect scenarios, we may have in memory a client still + # in some reconnect scenarios, we may have in memory a websocket client still. Diagnostic.duplicate_client(existing_context.user, existing_context) if existing_context.client.connected cleanup_client(existing_context.client) end connection = JamRuby::Connection.find_by_client_id(client_id) - # if this connection is reused by a different user, then whack the connection + # if this connection is reused by a different user (possible in logout/login scenarios), then whack the connection # because it will recreate a new connection lower down if connection && user && connection.user != user @log.debug("user #{user.email} took client_id #{client_id} from user #{connection.user.email}") @@ -490,6 +529,9 @@ module JamWebsockets remote_ip = extract_ip(client) if user + + heartbeat_interval, connection_stale_time, connection_expire_time = determine_connection_times(user, client_type) + @log.debug "user #{user} logged in with client_id #{client_id}" # check if there's a connection for the client... if it's stale, reconnect it @@ -503,7 +545,7 @@ module JamWebsockets recording_id = nil ConnectionManager.active_record_transaction do |connection_manager| - music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id, remote_ip) + music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id, remote_ip, connection_stale_time, connection_expire_time) if music_session_id.nil? # if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before. @@ -540,7 +582,7 @@ module JamWebsockets unless connection # log this connection in the database ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.create_connection(user.id, client.client_id, remote_ip, client_type) do |conn, count| + connection_manager.create_connection(user.id, client.client_id, remote_ip, client_type, connection_stale_time, connection_expire_time) do |conn, count| if count == 1 Notification.send_friend_update(user.id, true, conn) end @@ -548,20 +590,14 @@ module JamWebsockets end end - heartbeat_interval = user.heartbeat_interval_client.to_i || @heartbeat_interval_client - heartbeat_interval = @heartbeat_interval_client if heartbeat_interval == 0 # protect against bad config - connection_expire_time = user.connection_expire_time || @connection_expire_time - connection_expire_time = @connection_expire_time if connection_expire_time == 0 # protect against bad config - - login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token, - @heartbeat_interval_client, + heartbeat_interval, connection.try(:music_session_id), reconnected, user.id, - @connection_expire_time) + connection_expire_time) send_to_client(client, login_ack) end else diff --git a/websocket-gateway/lib/jam_websockets/server.rb b/websocket-gateway/lib/jam_websockets/server.rb index b9a8b9a92..4aafa62c2 100644 --- a/websocket-gateway/lib/jam_websockets/server.rb +++ b/websocket-gateway/lib/jam_websockets/server.rb @@ -33,9 +33,8 @@ module JamWebsockets EventMachine.run do @router.start(connect_time_stale_client, connect_time_expire_client, connect_time_stale_browser, connect_time_expire_browser, host: rabbitmq_host, port: rabbitmq_port) do - expire_time = connect_time_expire_client - start_connection_expiration(expire_time) - start_connection_flagger(connect_time_stale_client) + start_connection_expiration() + start_connection_flagger() start_websocket_listener(host, port, options[:emwebsocket_debug]) calling_thread.wakeup if calling_thread end @@ -60,37 +59,37 @@ module JamWebsockets @log.debug("started websocket") end - def start_connection_expiration(stale_max_time) + def start_connection_expiration() # one cleanup on startup - expire_stale_connections(stale_max_time) + expire_stale_connections() - EventMachine::PeriodicTimer.new(stale_max_time) do - sane_logging { expire_stale_connections(stale_max_time) } + EventMachine::PeriodicTimer.new(5) do + sane_logging { expire_stale_connections() } end end - def expire_stale_connections(stale_max_time) + def expire_stale_connections() clients = [] ConnectionManager.active_record_transaction do |connection_manager| - clients = connection_manager.stale_connection_client_ids(stale_max_time) + clients = connection_manager.stale_connection_client_ids() end @router.cleanup_clients_with_ids(clients) end - def start_connection_flagger(flag_max_time) + def start_connection_flagger() # one cleanup on startup - flag_stale_connections(flag_max_time) + flag_stale_connections() - EventMachine::PeriodicTimer.new(flag_max_time/2) do - sane_logging { flag_stale_connections(flag_max_time) } + EventMachine::PeriodicTimer.new(5) do + sane_logging { flag_stale_connections() } end end - def flag_stale_connections(flag_max_time) + def flag_stale_connections() # @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds") ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.flag_stale_connections(flag_max_time) + connection_manager.flag_stale_connections() end end