* VRFS-484; more changes needed but this helps a lot of common leave/join scenarios

This commit is contained in:
Seth Call 2013-08-07 10:35:27 -05:00
parent ab46fffa9b
commit 9424ffa6aa
4 changed files with 133 additions and 97 deletions

View File

@ -11,6 +11,10 @@ module JamRuby
# This may make sense in the short term if we are still managing connections in the database, but
# we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods).
# Or of course we could just port the relevant methods to node-js
#
# Also we don't send notifications from ConnectionManager;
# we just return enough data so that a caller can make the determination if it needs to
class ConnectionManager < BaseManager
def initialize(options={})
@ -33,32 +37,64 @@ module JamRuby
return friend_ids
end
def reconnect(conn)
sql =<<SQL
UPDATE connections SET (aasm_state,updated_at) = ('#{Connection::CONNECT_STATE.to_s}', NOW())
# reclaim the existing connection,
def reconnect(conn, reconnect_music_session_id)
music_session_id = nil
reconnected = false
# we will reconnect the same music_session that the connection was previously in,
# if it matches the same value currently in the database for music_session_id
music_session_id_expression = 'NULL'
unless reconnect_music_session_id.nil?
music_session_id_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN music_session_id ELSE NULL END)"
end
sql =<<SQL
UPDATE connections SET (aasm_state, updated_at, music_session_id) = ('#{Connection::CONNECT_STATE.to_s}', NOW(), #{music_session_id_expression})
WHERE
client_id = '#{conn.client_id}'
RETURNING music_session_id
SQL
# @log.info("*** reconnect: client_id = #{conn.client_id}")
self.pg_conn.exec(sql)
self.pg_conn.exec(sql) do |result|
if result.cmd_tuples == 1
music_session_id = result[0]['music_session_id']
end
end
# we tell the client they reconnected if they specified a reconnect music_session_id, and if that is now the
# current value in the database
reconnected = true if !reconnect_music_session_id.nil? && reconnect_music_session_id == music_session_id
return music_session_id, reconnected
end
# returns the music_session_id, if any, associated with the client
def flag_connection_stale_with_client_id(client_id)
music_session_id = nil
sql =<<SQL
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
WHERE
client_id = '#{client_id}' AND
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
RETURNING music_session_id
SQL
# @log.info("*** flag_connection_stale_with_client_id: client_id = #{client_id}; sql = #{sql}")
self.pg_conn.exec(sql)
self.pg_conn.exec(sql) do |result|
# if we did update a client to stale, retriee music_session_id
if result.cmd_tuples == 1
music_session_id = result[0]['music_session_id']
end
end
music_session_id
end
# flag connections as stale
def flag_stale_connections(max_seconds)
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
sql =<<SQL
SELECT count(user_id) FROM connections
WHERE
@ -93,13 +129,21 @@ SQL
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
sql =<<SQL
SELECT client_id FROM connections
SELECT client_id, music_session_id, user_id FROM connections
WHERE
updated_at < (NOW() - interval '#{max_seconds} second') AND
aasm_state = '#{Connection::STALE_STATE.to_s}'
SQL
conn.exec(sql) do |result|
result.each { |row| client_ids << row['client_id'] }
result.each { |row|
client_id = row['client_id']
music_session_id = row['music_session_id']
user_id = row['user_id']
client_ids << client_id
}
# @log.debug("*** stale_connection_client_ids: client_ids = #{client_ids.inspect}")
end
end
@ -107,7 +151,12 @@ SQL
end
def create_connection(user_id, client_id, ip_address)
# returns the number of connections that this user currently has across all clients
# this number is used by notification logic elsewhere to know
# 'oh the user joined for the 1st time, so send a friend update', or
# 'don't bother because the user has connected somewhere else already'
def create_connection(user_id, client_id, ip_address, &blk)
count = 0
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
@ -118,25 +167,26 @@ SQL
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0)
if count == "1"
# send notification
Notification.send_friend_update(user_id, true, conn)
end
count = result.getvalue(0, 0) .to_i
blk.call(conn, count) unless blk.nil?
end
return Connection.find_by_client_id!(client_id)
return count
end
end
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
# this code is responsible for all cleanup logic associated with a connection going away
def delete_connection(client_id)
# returns how many connections are left for this user; this data is used by callers to know whether
# to tell friends if the user is offline (count==0) or not (count > 0)
# If a blk is passed in, on success, count is also passed back an the db connection, allowing for
# notifications to go out within the table log. music_session_id is also passed, if the music_session still exists
# and this connection was in a session
def delete_connection(client_id, &blk)
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
count = 0
user_id = nil
music_session_id = nil
@ -152,7 +202,7 @@ SQL
return
elsif result.cmd_tuples == 1
user_id = result[0]['user_id']
music_session_id = result[0]['client_id']
music_session_id = result[0]['music_session_id']
else
raise Exception, 'uniqueness constraint has been lost on client_id'
@ -164,17 +214,19 @@ SQL
# since we did delete a row, check and see if any more connections for that user exist
# if we are down to zero, send out user gone message
conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0)
if count == "0"
# send notification
Notification.send_friend_update(user_id, false, conn)
end
count = result.getvalue(0, 0).to_i
end
# same for session-if we are down to the last participant, delete the session
unless music_session_id.nil?
conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear
result = conn.exec("DELETE FROM music_sessions WHERE id = $1 AND 0 = (select count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id])
if result.cmd_tuples == 1
music_session_id = nil
end
end
blk.call(conn, count, music_session_id) unless blk.nil?
return count
end
end
@ -224,7 +276,9 @@ SQL
end
end
def join_music_session(user, client_id, music_session, as_musician, tracks)
# if a blk is passed in, upon success, it will be called and you can issue notifications
# within the connection table lock
def join_music_session(user, client_id, music_session, as_musician, tracks, &blk)
connection = nil
user_id = user.id
music_session_id = music_session.id
@ -242,10 +296,7 @@ SQL
if connection.errors.any?
raise ActiveRecord::Rollback
else
if as_musician && music_session.musician_access
Notification.send_musician_session_join(music_session, connection, user)
Notification.send_friend_session_join(db_conn, connection, user)
end
blk.call(db_conn, connection) unless blk.nil?
MusicSessionUserHistory.save(music_session_id, user_id, client_id)
end
end
@ -253,57 +304,9 @@ SQL
return connection
end
def join_music_session_old(user_id, client_id, music_session_id, as_musician)
conn = @pg_conn
lock_connections(conn)
previous_music_session_id = check_already_session(conn, client_id)
user = User.find(user_id)
if as_musician != true && as_musician != false # checks that a boolean was passed in
raise JamArgumentError, "as_musician incorrectly specified"
end
# determine if the user can join; if not, throw a PermissionError
music_session = MusicSession.find(music_session_id)
unless music_session.can_join?(user, as_musician)
@log.debug "user can not join a session user_id=#{user_id} and client_id=#{client_id}"
raise PermissionError, "unable to join the specified session"
end
begin
# we include user_id in the query as an act of security, so that a user can't access someone else's client connection
conn.exec("UPDATE connections SET music_session_id = $1, as_musician = $2 WHERE client_id = $3 and user_id = $4", [music_session_id, as_musician, client_id, user_id]) do |result|
if result.cmd_tuples == 1
@log.debug "associated music_session with connection for client=#{client_id}, music_session=#{music_session_id}, and user=#{user_id}"
session_checks(conn, previous_music_session_id, user_id)
elsif result.cmd_tuples == 0
@log.debug "join_music_session no connection found with client_id=#{client_id} and user_id=#{user_id}"
raise ActiveRecord::RecordNotFound
else
@log.error "database failure or logic error; this path should be impossible if the table is locked (join_music_session)"
raise Exception, "locked table changed state"
end
end
rescue PG::Error => pg_error
if pg_error.to_s.include?("insert or update on table \"connections\" violates foreign key constraint \"connections_music_session_id_fkey\"")
# if there is no music session that exists, we will receive this message
@log.debug "music_session does not exist. music_session=#{music_session_id}"
raise StateError, "music_session does not exist"
else
raise pg_error
end
end
# end
end
def leave_music_session(user, connection, music_session)
# if a blk is passed in, upon success, it will be called and you can issue notifications
# within the connection table lock
def leave_music_session(user, connection, music_session, &blk)
ConnectionManager.active_record_transaction do |connection_manager|
conn = connection_manager.pg_conn
@ -331,7 +334,8 @@ SQL
session_checks(conn, previous_music_session_id, user_id)
Notification.send_musician_session_depart(music_session, connection, user)
blk.call() unless blk.nil?
elsif result.cmd_tuples == 0
@log.debug "leave_music_session no connection found with client_id=#{client_id}"
raise ActiveRecord::RecordNotFound

View File

@ -35,8 +35,8 @@
end
# create a login ack (login was successful)
def login_ack(public_ip, client_id, token, heartbeat_interval, music_session_id)
login_ack = Jampb::LoginAck.new(:public_ip => public_ip, :client_id => client_id, :token => token, :heartbeat_interval => heartbeat_interval, :music_session_id => music_session_id)
def login_ack(public_ip, client_id, token, heartbeat_interval, music_session_id, reconnected)
login_ack = Jampb::LoginAck.new(:public_ip => public_ip, :client_id => client_id, :token => token, :heartbeat_interval => heartbeat_interval, :music_session_id => music_session_id, :reconnected => reconnected)
return Jampb::ClientMessage.new(:type => ClientMessage::Type::LOGIN_ACK, :route_to => CLIENT_TARGET, :login_ack => login_ack)
end
@ -112,6 +112,19 @@
return Jampb::ClientMessage.new(:type => ClientMessage::Type::MUSICIAN_SESSION_DEPART, :route_to => CLIENT_TARGET, :musician_session_depart => left)
end
# create a musician fresh session message
def musician_session_fresh(session_id, user_id, username, photo_url)
fresh = Jampb::MusicianSessionFresh.new(:session_id => session_id, :user_id => user_id, :username => username, :photo_url => photo_url)
return Jampb::ClientMessage.new(:type => ClientMessage::Type::MUSICIAN_SESSION_FRESH, :route_to => CLIENT_TARGET, :musician_session_fresh => fresh)
end
# create a musician stale session message
def musician_session_stale(session_id, user_id, username, photo_url)
stale = Jampb::MusicianSessionStale.new(:session_id => session_id, :user_id => user_id, :username => username, :photo_url => photo_url)
return Jampb::ClientMessage.new(:type => ClientMessage::Type::MUSICIAN_SESSION_STALE, :route_to => CLIENT_TARGET, :musician_session_stale => stale)
end
# create a user-joined session message
def join_request(session_id, join_request_id, username, text)
join_request = Jampb::JoinRequest.new(:join_request_id => join_request_id, :username => username, :text => text)

View File

@ -223,13 +223,31 @@ module JamRuby
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => connection.client_id})
end
def send_musician_session_depart(music_session, connection, user)
def send_musician_session_depart(music_session, client_id, user)
# (1) create notification
msg = @@message_factory.musician_session_depart(music_session.id, user.id, user.name, user.photo_url)
# (2) send notification
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => connection.client_id})
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => client_id})
end
def send_musician_session_fresh(music_session, client_id, user)
# (1) create notification
msg = @@message_factory.musician_session_fresh(music_session.id, user.id, user.name, user.photo_url)
# (2) send notification
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => client_id})
end
def send_musician_session_stale(music_session, client_id, user)
# (1) create notification
msg = @@message_factory.musician_session_stale(music_session.id, user.id, user.name, user.photo_url)
# (2) send notification
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => client_id})
end
def send_friend_session_join(db_conn, connection, user)

View File

@ -62,8 +62,8 @@ describe ConnectionManager do
client_id = "client_id2"
user_id = create_user("test", "user2", "user2@jamkazam.com")
connection = @connman.create_connection(user_id, client_id, "1.1.1.1")
count = @connman.create_connection(user_id, client_id, "1.1.1.1")
count.should == 1
# make sure the connection is seen
@conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result|
@ -73,7 +73,8 @@ describe ConnectionManager do
cc = Connection.find_by_client_id!(client_id)
cc.connected?.should be_true
@connman.delete_connection(client_id)
count = @connman.delete_connection(client_id)
count.should == 0
@conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result|
result.getvalue(0, 0).should == "0"
@ -377,9 +378,9 @@ describe ConnectionManager do
user = User.find(user_id)
dummy_music_session = MusicSession.new
connection = @connman.create_connection(user_id, client_id, "1.1.1.1")
@connman.create_connection(user_id, client_id, "1.1.1.1")
expect { @connman.leave_music_session(user, connection, dummy_music_session) }.to raise_error(JamRuby::StateError)
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
end
it "leave_music_session fails if in different music_session" do
@ -393,9 +394,9 @@ describe ConnectionManager do
dummy_music_session = MusicSession.new
connection = @connman.create_connection(user_id, client_id, "1.1.1.1")
@connman.create_connection(user_id, client_id, "1.1.1.1")
@connman.join_music_session(user, client_id, music_session, true, TRACKS)
expect { @connman.leave_music_session(user, connection, dummy_music_session) }.to raise_error(JamRuby::StateError)
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
end
it "leave_music_session works" do
@ -407,7 +408,7 @@ describe ConnectionManager do
user = User.find(user_id)
music_session = MusicSession.find(music_session_id)
connection = @connman.create_connection(user_id, client_id, "1.1.1.1")
@connman.create_connection(user_id, client_id, "1.1.1.1")
@connman.join_music_session(user, client_id, music_session, true, TRACKS)
assert_session_exists(music_session_id, true)
@ -416,7 +417,7 @@ describe ConnectionManager do
result.getvalue(0, 0).should == music_session_id
end
@connman.leave_music_session(user, connection, music_session)
@connman.leave_music_session(user, Connection.find_by_client_id(client_id), music_session)
@conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
result.getvalue(0, 0).should == nil