diff --git a/config/application.yml b/config/application.yml index f0576f674..038b59167 100644 --- a/config/application.yml +++ b/config/application.yml @@ -1,6 +1,6 @@ Defaults: &defaults connect_time_stale: 30 - connect_time_expire: 180 + connect_time_expire: 60 development: port: 6767 diff --git a/lib/jam_websockets/client_context.rb b/lib/jam_websockets/client_context.rb index 316057c56..c3f302778 100644 --- a/lib/jam_websockets/client_context.rb +++ b/lib/jam_websockets/client_context.rb @@ -15,5 +15,14 @@ return "Client[user:#{@user} client:#{@client} msgs:#{@msg_count} session:#{@session}]" end + def hash + @client.hash + end + + def ==(o) + o.class == self.class && o.client == @client + end + alias_method :eql?, :== + end end diff --git a/lib/jam_websockets/router.rb b/lib/jam_websockets/router.rb index f602f577d..61da6549d 100644 --- a/lib/jam_websockets/router.rb +++ b/lib/jam_websockets/router.rb @@ -60,8 +60,8 @@ module JamWebsockets @log.info "started" end - def add_client(client_id, client) - @client_lookup[client_id] = client + def add_client(client_id, client_context) + @client_lookup[client_id] = client_context end def remove_client(client_id, client) @@ -69,13 +69,13 @@ module JamWebsockets if deleted.nil? @log.warn "unable to delete #{client_id} from client_lookup" - elsif deleted != client + elsif deleted.client != client # put it back--this is only possible if add_client hit the 'old connection' path # so in other words if this happens: # add_client(1, clientX) # add_client(1, clientY) # but clientX is essentially defunct - this could happen due to a bug in client, or EM doesn't notify always of connection close in time # remove_client(1, clientX) -- this check maintains that clientY stays as the current client in the hash - @client_lookup[client_id] = client + @client_lookup[client_id] = deleted @log.debug "putting back client into @client_lookup for #{client_id} #{client.inspect}" else @log.debug "cleaned up @client_lookup for #{client_id}" @@ -86,11 +86,11 @@ module JamWebsockets def add_user(context) user_contexts = @user_context_lookup[context.user.id] if user_contexts.nil? - user_contexts = Set.new + user_contexts = Hash.new @user_context_lookup[context.user.id] = user_contexts end - user_contexts.add(context) + user_contexts[context.client] = context end def remove_user(client_context) @@ -100,7 +100,7 @@ module JamWebsockets @log.warn "user can not be removed #{client_context}" else # delete the context from set of user contexts - user_contexts.delete(client_context) + user_contexts.delete(client_context.client) # if last user context, delete entire set (memory leak concern) if user_contexts.length == 0 @@ -138,7 +138,7 @@ module JamWebsockets msg = Jampb::ClientMessage.parse(msg) - contexts.each do |context| + contexts.each do |client_id, context| EM.schedule do @log.debug "sending user message to #{context}" send_to_client(context.client, msg) @@ -171,7 +171,8 @@ module JamWebsockets routing_key = headers.routing_key client_id = routing_key["client.".length..-1] @semaphore.synchronize do - client = @client_lookup[client_id] + client_context = @client_lookup[client_id] + client = client_context.client msg = Jampb::ClientMessage.parse(msg) @@ -319,7 +320,11 @@ module JamWebsockets def stale_client(client) if cid = client.client_id ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.flag_connection_stale_with_client_id(cid) + music_session_id = connection_manager.flag_connection_stale_with_client_id(cid) + # update the session members, letting them know this client went stale + context = @client_lookup[client.client_id] + music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil? + Notification.send_musician_session_stale(music_session, client.client_id, context.user) unless music_session.nil? end end end @@ -327,19 +332,8 @@ module JamWebsockets def cleanup_clients_with_ids(client_ids) # @log.debug("*** cleanup_clients_with_ids: client_ids = #{client_ids.inspect}") client_ids.each do |cid| - if 0 < (ws_clients = @clients.keys).length - ws_clients.each do |client| - if cid == client.client_id - self.cleanup_client(client) - break - else - # @log.debug("*** cleanup_clients: deleting connection = #{cid}") - ConnectionManager.active_record_transaction { |mgr| mgr.delete_connection(cid) } - end - end - else - ConnectionManager.active_record_transaction { |mgr| mgr.delete_connection(cid) } - end + client_context = @client_lookup[cid] + self.cleanup_client(client_context.client) unless client_context.nil? end end @@ -350,7 +344,7 @@ module JamWebsockets pending = @pending_clients.delete?(client) if !pending.nil? - @log.debug "cleaning up pending client #{client}" + @log.debug "cleaning up not-logged-in client #{client}" else @log.debug "cleanup up logged-in client #{client}" @@ -360,11 +354,14 @@ module JamWebsockets context = @clients.delete(client) if !context.nil? - # remove this connection from the database if !context.user.nil? && !context.client.nil? - ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.delete_connection(client.client_id) + ConnectionManager.active_record_transaction do |mgr| + mgr.delete_connection(client.client_id) { |conn, count, music_session_id| + Notification.send_friend_update(context.user.id, false, conn) if count == 0 + music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil? + Notification.send_musician_session_depart(music_session, client.client_id, context.user) unless music_session.nil? + } end end @@ -437,9 +434,11 @@ module JamWebsockets password = login.password if login.value_for_tag(2) token = login.token if login.value_for_tag(3) client_id = login.client_id if login.value_for_tag(4) + reconnect_music_session_id = login.client_id if login.value_for_tag(5) @log.info("*** handle_login: token=#{token}; client_id=#{client_id}") connection = nil + reconnected = false # you don't have to supply client_id in login--if you don't, we'll generate one if client_id.nil? || client_id.empty? @@ -451,8 +450,21 @@ module JamWebsockets if connection = JamRuby::Connection.find_by_client_id(client_id) # FIXME: I think connection table needs to updated within connection_manager # otherwise this would be 1 line of code (connection.connect!) + + music_session_upon_reentry = connection.music_session + ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.reconnect(connection) + music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id) + context = @client_lookup[client_id] + if music_session_id.nil? + # if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before. + # if so, then we need to tell the others in the session that this user is now departed + Notification.send_musician_session_depart(music_session_upon_reentry, client.client_id, context.user) unless context.nil? || music_session_upon_reentry.nil? || music_session_upon_reentry.destroyed? + else + music_session = MusicSession.find_by_id(music_session_id) + Notification.send_musician_session_fresh(music_session, client.client_id, context.user) unless context.nil? + end + end if connection.stale? end # if there's a client_id but no connection object, create new client_id @@ -467,15 +479,8 @@ module JamWebsockets @log.debug "user #{user} logged in" # respond with LOGIN_ACK to let client know it was successful - #binding.pry - remote_ip = extract_ip(client) - login_ack = @message_factory.login_ack(remote_ip, - client_id, - user.remember_token, - @heartbeat_interval, - connection.try(:music_session_id)) - send_to_client(client, login_ack) + @semaphore.synchronize do # remove from pending_queue @@ -485,14 +490,25 @@ module JamWebsockets context = ClientContext.new(user, client) @clients[client] = context add_user(context) - add_client(client_id, client) # TODO + add_client(client_id, context) - unless connection + unless connection # log this connection in the database ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.create_connection(user.id, client.client_id, extract_ip(client)) + connection_manager.create_connection(user.id, client.client_id, remote_ip) do |conn, count| + if count == 1 + Notification.send_friend_update(user.id, true, conn) + end + end end end + login_ack = @message_factory.login_ack(remote_ip, + client_id, + user.remember_token, + @heartbeat_interval, + connection.try(:music_session_id), + reconnected) + send_to_client(client, login_ack) end else raise SessionError, 'invalid login' @@ -528,12 +544,14 @@ module JamWebsockets @log.warn "*** WARNING: unable to find connection due to heartbeat from client: #{context}; calling cleanup_client" cleanup_client(client) else + connection.touch + ConnectionManager.active_record_transaction do |connection_manager| - connection_manager.reconnect(connection) + connection_manager.reconnect(connection, connection.music_session.id) end if connection.stale? end - # send errors to clients in response to heartbeats if + # send errors to clients in response to heartbeats if rabbitmq is down if !@amqp_connection_manager.connected? error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down") context.sent_bad_state_previously = true diff --git a/lib/jam_websockets/server.rb b/lib/jam_websockets/server.rb index 7607175c2..795e8455d 100644 --- a/lib/jam_websockets/server.rb +++ b/lib/jam_websockets/server.rb @@ -23,7 +23,8 @@ module JamWebsockets @router.start(connect_time_stale) do # take stale off the expire limit because the call to stale will # touch the updated_at column, adding an extra stale limit to the expire time limit - expire_time = connect_time_expire > connect_time_stale ? connect_time_expire - connect_time_stale : connect_time_expire + # expire_time = connect_time_expire > connect_time_stale ? connect_time_expire - connect_time_stale : connect_time_expire + expire_time = connect_time_expire start_connection_expiration(expire_time) start_connection_flagger(connect_time_stale) @@ -69,7 +70,7 @@ module JamWebsockets # one cleanup on startup flag_stale_connections(flag_max_time) - EventMachine::PeriodicTimer.new(flag_max_time) do + EventMachine::PeriodicTimer.new(flag_max_time/2) do flag_stale_connections(flag_max_time) end end diff --git a/spec/jam_websockets/client_context_spec.rb b/spec/jam_websockets/client_context_spec.rb new file mode 100644 index 000000000..522e77b10 --- /dev/null +++ b/spec/jam_websockets/client_context_spec.rb @@ -0,0 +1,14 @@ +require 'spec_helper' + +describe ClientContext do + + let(:context) {ClientContext.new({}, "client1")} + + describe 'hashing' do + it "hash correctly" do + set = Set.new + set.add?(context).should eql(set) + set.add?(context).should be_nil + end + end +end diff --git a/spec/jam_websockets/router_spec.rb b/spec/jam_websockets/router_spec.rb index 070fbbeaf..61193a26f 100644 --- a/spec/jam_websockets/router_spec.rb +++ b/spec/jam_websockets/router_spec.rb @@ -42,7 +42,7 @@ def login(router, user, password, client_id) message_factory = MessageFactory.new client = LoginClient.new - login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token, 15, nil) + login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token, 15, nil, false) router.should_receive(:send_to_client) do |*args| args.count.should == 2