* increased messaging for leave/join situations and reconnect support for VRFS-484

This commit is contained in:
Seth Call 2013-08-07 10:38:35 -05:00
parent 52287d054f
commit 97a56fd8e0
6 changed files with 87 additions and 45 deletions

View File

@ -1,6 +1,6 @@
Defaults: &defaults
connect_time_stale: 30
connect_time_expire: 180
connect_time_expire: 60
development:
port: 6767

View File

@ -15,5 +15,14 @@
return "Client[user:#{@user} client:#{@client} msgs:#{@msg_count} session:#{@session}]"
end
def hash
@client.hash
end
def ==(o)
o.class == self.class && o.client == @client
end
alias_method :eql?, :==
end
end

View File

@ -60,8 +60,8 @@ module JamWebsockets
@log.info "started"
end
def add_client(client_id, client)
@client_lookup[client_id] = client
def add_client(client_id, client_context)
@client_lookup[client_id] = client_context
end
def remove_client(client_id, client)
@ -69,13 +69,13 @@ module JamWebsockets
if deleted.nil?
@log.warn "unable to delete #{client_id} from client_lookup"
elsif deleted != client
elsif deleted.client != client
# put it back--this is only possible if add_client hit the 'old connection' path
# so in other words if this happens:
# add_client(1, clientX)
# add_client(1, clientY) # but clientX is essentially defunct - this could happen due to a bug in client, or EM doesn't notify always of connection close in time
# remove_client(1, clientX) -- this check maintains that clientY stays as the current client in the hash
@client_lookup[client_id] = client
@client_lookup[client_id] = deleted
@log.debug "putting back client into @client_lookup for #{client_id} #{client.inspect}"
else
@log.debug "cleaned up @client_lookup for #{client_id}"
@ -86,11 +86,11 @@ module JamWebsockets
def add_user(context)
user_contexts = @user_context_lookup[context.user.id]
if user_contexts.nil?
user_contexts = Set.new
user_contexts = Hash.new
@user_context_lookup[context.user.id] = user_contexts
end
user_contexts.add(context)
user_contexts[context.client] = context
end
def remove_user(client_context)
@ -100,7 +100,7 @@ module JamWebsockets
@log.warn "user can not be removed #{client_context}"
else
# delete the context from set of user contexts
user_contexts.delete(client_context)
user_contexts.delete(client_context.client)
# if last user context, delete entire set (memory leak concern)
if user_contexts.length == 0
@ -138,7 +138,7 @@ module JamWebsockets
msg = Jampb::ClientMessage.parse(msg)
contexts.each do |context|
contexts.each do |client_id, context|
EM.schedule do
@log.debug "sending user message to #{context}"
send_to_client(context.client, msg)
@ -171,7 +171,8 @@ module JamWebsockets
routing_key = headers.routing_key
client_id = routing_key["client.".length..-1]
@semaphore.synchronize do
client = @client_lookup[client_id]
client_context = @client_lookup[client_id]
client = client_context.client
msg = Jampb::ClientMessage.parse(msg)
@ -319,7 +320,11 @@ module JamWebsockets
def stale_client(client)
if cid = client.client_id
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.flag_connection_stale_with_client_id(cid)
music_session_id = connection_manager.flag_connection_stale_with_client_id(cid)
# update the session members, letting them know this client went stale
context = @client_lookup[client.client_id]
music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil?
Notification.send_musician_session_stale(music_session, client.client_id, context.user) unless music_session.nil?
end
end
end
@ -327,19 +332,8 @@ module JamWebsockets
def cleanup_clients_with_ids(client_ids)
# @log.debug("*** cleanup_clients_with_ids: client_ids = #{client_ids.inspect}")
client_ids.each do |cid|
if 0 < (ws_clients = @clients.keys).length
ws_clients.each do |client|
if cid == client.client_id
self.cleanup_client(client)
break
else
# @log.debug("*** cleanup_clients: deleting connection = #{cid}")
ConnectionManager.active_record_transaction { |mgr| mgr.delete_connection(cid) }
end
end
else
ConnectionManager.active_record_transaction { |mgr| mgr.delete_connection(cid) }
end
client_context = @client_lookup[cid]
self.cleanup_client(client_context.client) unless client_context.nil?
end
end
@ -350,7 +344,7 @@ module JamWebsockets
pending = @pending_clients.delete?(client)
if !pending.nil?
@log.debug "cleaning up pending client #{client}"
@log.debug "cleaning up not-logged-in client #{client}"
else
@log.debug "cleanup up logged-in client #{client}"
@ -360,11 +354,14 @@ module JamWebsockets
context = @clients.delete(client)
if !context.nil?
# remove this connection from the database
if !context.user.nil? && !context.client.nil?
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.delete_connection(client.client_id)
ConnectionManager.active_record_transaction do |mgr|
mgr.delete_connection(client.client_id) { |conn, count, music_session_id|
Notification.send_friend_update(context.user.id, false, conn) if count == 0
music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil?
Notification.send_musician_session_depart(music_session, client.client_id, context.user) unless music_session.nil?
}
end
end
@ -437,9 +434,11 @@ module JamWebsockets
password = login.password if login.value_for_tag(2)
token = login.token if login.value_for_tag(3)
client_id = login.client_id if login.value_for_tag(4)
reconnect_music_session_id = login.client_id if login.value_for_tag(5)
@log.info("*** handle_login: token=#{token}; client_id=#{client_id}")
connection = nil
reconnected = false
# you don't have to supply client_id in login--if you don't, we'll generate one
if client_id.nil? || client_id.empty?
@ -451,8 +450,21 @@ module JamWebsockets
if connection = JamRuby::Connection.find_by_client_id(client_id)
# FIXME: I think connection table needs to updated within connection_manager
# otherwise this would be 1 line of code (connection.connect!)
music_session_upon_reentry = connection.music_session
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.reconnect(connection)
music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id)
context = @client_lookup[client_id]
if music_session_id.nil?
# if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before.
# if so, then we need to tell the others in the session that this user is now departed
Notification.send_musician_session_depart(music_session_upon_reentry, client.client_id, context.user) unless context.nil? || music_session_upon_reentry.nil? || music_session_upon_reentry.destroyed?
else
music_session = MusicSession.find_by_id(music_session_id)
Notification.send_musician_session_fresh(music_session, client.client_id, context.user) unless context.nil?
end
end if connection.stale?
end
# if there's a client_id but no connection object, create new client_id
@ -467,15 +479,8 @@ module JamWebsockets
@log.debug "user #{user} logged in"
# respond with LOGIN_ACK to let client know it was successful
#binding.pry
remote_ip = extract_ip(client)
login_ack = @message_factory.login_ack(remote_ip,
client_id,
user.remember_token,
@heartbeat_interval,
connection.try(:music_session_id))
send_to_client(client, login_ack)
@semaphore.synchronize do
# remove from pending_queue
@ -485,14 +490,25 @@ module JamWebsockets
context = ClientContext.new(user, client)
@clients[client] = context
add_user(context)
add_client(client_id, client) # TODO
add_client(client_id, context)
unless connection
unless connection
# log this connection in the database
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.create_connection(user.id, client.client_id, extract_ip(client))
connection_manager.create_connection(user.id, client.client_id, remote_ip) do |conn, count|
if count == 1
Notification.send_friend_update(user.id, true, conn)
end
end
end
end
login_ack = @message_factory.login_ack(remote_ip,
client_id,
user.remember_token,
@heartbeat_interval,
connection.try(:music_session_id),
reconnected)
send_to_client(client, login_ack)
end
else
raise SessionError, 'invalid login'
@ -528,12 +544,14 @@ module JamWebsockets
@log.warn "*** WARNING: unable to find connection due to heartbeat from client: #{context}; calling cleanup_client"
cleanup_client(client)
else
connection.touch
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.reconnect(connection)
connection_manager.reconnect(connection, connection.music_session.id)
end if connection.stale?
end
# send errors to clients in response to heartbeats if
# send errors to clients in response to heartbeats if rabbitmq is down
if !@amqp_connection_manager.connected?
error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down")
context.sent_bad_state_previously = true

View File

@ -23,7 +23,8 @@ module JamWebsockets
@router.start(connect_time_stale) do
# take stale off the expire limit because the call to stale will
# touch the updated_at column, adding an extra stale limit to the expire time limit
expire_time = connect_time_expire > connect_time_stale ? connect_time_expire - connect_time_stale : connect_time_expire
# expire_time = connect_time_expire > connect_time_stale ? connect_time_expire - connect_time_stale : connect_time_expire
expire_time = connect_time_expire
start_connection_expiration(expire_time)
start_connection_flagger(connect_time_stale)
@ -69,7 +70,7 @@ module JamWebsockets
# one cleanup on startup
flag_stale_connections(flag_max_time)
EventMachine::PeriodicTimer.new(flag_max_time) do
EventMachine::PeriodicTimer.new(flag_max_time/2) do
flag_stale_connections(flag_max_time)
end
end

View File

@ -0,0 +1,14 @@
require 'spec_helper'
describe ClientContext do
let(:context) {ClientContext.new({}, "client1")}
describe 'hashing' do
it "hash correctly" do
set = Set.new
set.add?(context).should eql(set)
set.add?(context).should be_nil
end
end
end

View File

@ -42,7 +42,7 @@ def login(router, user, password, client_id)
message_factory = MessageFactory.new
client = LoginClient.new
login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token, 15, nil)
login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token, 15, nil, false)
router.should_receive(:send_to_client) do |*args|
args.count.should == 2