Merge branch 'master' of bitbucket.org:jamkazam/jam-ruby
This commit is contained in:
commit
c55c63bb7a
|
|
@ -11,6 +11,10 @@ module JamRuby
|
|||
# This may make sense in the short term if we are still managing connections in the database, but
|
||||
# we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods).
|
||||
# Or of course we could just port the relevant methods to node-js
|
||||
#
|
||||
# Also we don't send notifications from ConnectionManager;
|
||||
# we just return enough data so that a caller can make the determination if it needs to
|
||||
|
||||
class ConnectionManager < BaseManager
|
||||
|
||||
def initialize(options={})
|
||||
|
|
@ -33,32 +37,64 @@ module JamRuby
|
|||
return friend_ids
|
||||
end
|
||||
|
||||
def reconnect(conn)
|
||||
sql =<<SQL
|
||||
UPDATE connections SET (aasm_state,updated_at) = ('#{Connection::CONNECT_STATE.to_s}', NOW())
|
||||
# reclaim the existing connection,
|
||||
def reconnect(conn, reconnect_music_session_id)
|
||||
music_session_id = nil
|
||||
reconnected = false
|
||||
|
||||
# we will reconnect the same music_session that the connection was previously in,
|
||||
# if it matches the same value currently in the database for music_session_id
|
||||
music_session_id_expression = 'NULL'
|
||||
unless reconnect_music_session_id.nil?
|
||||
music_session_id_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN music_session_id ELSE NULL END)"
|
||||
end
|
||||
|
||||
|
||||
sql =<<SQL
|
||||
UPDATE connections SET (aasm_state, updated_at, music_session_id) = ('#{Connection::CONNECT_STATE.to_s}', NOW(), #{music_session_id_expression})
|
||||
WHERE
|
||||
client_id = '#{conn.client_id}'
|
||||
RETURNING music_session_id
|
||||
SQL
|
||||
# @log.info("*** reconnect: client_id = #{conn.client_id}")
|
||||
self.pg_conn.exec(sql)
|
||||
self.pg_conn.exec(sql) do |result|
|
||||
if result.cmd_tuples == 1
|
||||
music_session_id = result[0]['music_session_id']
|
||||
end
|
||||
end
|
||||
|
||||
# we tell the client they reconnected if they specified a reconnect music_session_id, and if that is now the
|
||||
# current value in the database
|
||||
reconnected = true if !reconnect_music_session_id.nil? && reconnect_music_session_id == music_session_id
|
||||
|
||||
return music_session_id, reconnected
|
||||
end
|
||||
|
||||
# returns the music_session_id, if any, associated with the client
|
||||
def flag_connection_stale_with_client_id(client_id)
|
||||
music_session_id = nil
|
||||
sql =<<SQL
|
||||
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
|
||||
WHERE
|
||||
client_id = '#{client_id}' AND
|
||||
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
|
||||
RETURNING music_session_id
|
||||
SQL
|
||||
# @log.info("*** flag_connection_stale_with_client_id: client_id = #{client_id}; sql = #{sql}")
|
||||
self.pg_conn.exec(sql)
|
||||
self.pg_conn.exec(sql) do |result|
|
||||
|
||||
# if we did update a client to stale, retriee music_session_id
|
||||
if result.cmd_tuples == 1
|
||||
music_session_id = result[0]['music_session_id']
|
||||
end
|
||||
end
|
||||
|
||||
music_session_id
|
||||
end
|
||||
|
||||
# flag connections as stale
|
||||
def flag_stale_connections(max_seconds)
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
conn = connection_manager.pg_conn
|
||||
|
||||
sql =<<SQL
|
||||
SELECT count(user_id) FROM connections
|
||||
WHERE
|
||||
|
|
@ -93,13 +129,21 @@ SQL
|
|||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
conn = connection_manager.pg_conn
|
||||
sql =<<SQL
|
||||
SELECT client_id FROM connections
|
||||
SELECT client_id, music_session_id, user_id FROM connections
|
||||
WHERE
|
||||
updated_at < (NOW() - interval '#{max_seconds} second') AND
|
||||
aasm_state = '#{Connection::STALE_STATE.to_s}'
|
||||
SQL
|
||||
conn.exec(sql) do |result|
|
||||
result.each { |row| client_ids << row['client_id'] }
|
||||
result.each { |row|
|
||||
client_id = row['client_id']
|
||||
music_session_id = row['music_session_id']
|
||||
user_id = row['user_id']
|
||||
|
||||
client_ids << client_id
|
||||
|
||||
}
|
||||
|
||||
# @log.debug("*** stale_connection_client_ids: client_ids = #{client_ids.inspect}")
|
||||
end
|
||||
end
|
||||
|
|
@ -107,7 +151,12 @@ SQL
|
|||
end
|
||||
|
||||
|
||||
def create_connection(user_id, client_id, ip_address)
|
||||
# returns the number of connections that this user currently has across all clients
|
||||
# this number is used by notification logic elsewhere to know
|
||||
# 'oh the user joined for the 1st time, so send a friend update', or
|
||||
# 'don't bother because the user has connected somewhere else already'
|
||||
def create_connection(user_id, client_id, ip_address, &blk)
|
||||
count = 0
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
conn = connection_manager.pg_conn
|
||||
|
||||
|
|
@ -118,25 +167,26 @@ SQL
|
|||
|
||||
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
|
||||
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
|
||||
count = result.getvalue(0, 0)
|
||||
if count == "1"
|
||||
# send notification
|
||||
Notification.send_friend_update(user_id, true, conn)
|
||||
end
|
||||
count = result.getvalue(0, 0) .to_i
|
||||
blk.call(conn, count) unless blk.nil?
|
||||
end
|
||||
|
||||
return Connection.find_by_client_id!(client_id)
|
||||
return count
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
|
||||
# this code is responsible for all cleanup logic associated with a connection going away
|
||||
def delete_connection(client_id)
|
||||
# returns how many connections are left for this user; this data is used by callers to know whether
|
||||
# to tell friends if the user is offline (count==0) or not (count > 0)
|
||||
# If a blk is passed in, on success, count is also passed back an the db connection, allowing for
|
||||
# notifications to go out within the table log. music_session_id is also passed, if the music_session still exists
|
||||
# and this connection was in a session
|
||||
def delete_connection(client_id, &blk)
|
||||
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
|
||||
conn = connection_manager.pg_conn
|
||||
|
||||
count = 0
|
||||
user_id = nil
|
||||
music_session_id = nil
|
||||
|
||||
|
|
@ -152,7 +202,7 @@ SQL
|
|||
return
|
||||
elsif result.cmd_tuples == 1
|
||||
user_id = result[0]['user_id']
|
||||
music_session_id = result[0]['client_id']
|
||||
music_session_id = result[0]['music_session_id']
|
||||
|
||||
else
|
||||
raise Exception, 'uniqueness constraint has been lost on client_id'
|
||||
|
|
@ -164,17 +214,19 @@ SQL
|
|||
# since we did delete a row, check and see if any more connections for that user exist
|
||||
# if we are down to zero, send out user gone message
|
||||
conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result|
|
||||
count = result.getvalue(0, 0)
|
||||
if count == "0"
|
||||
# send notification
|
||||
Notification.send_friend_update(user_id, false, conn)
|
||||
end
|
||||
count = result.getvalue(0, 0).to_i
|
||||
end
|
||||
|
||||
# same for session-if we are down to the last participant, delete the session
|
||||
unless music_session_id.nil?
|
||||
conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear
|
||||
result = conn.exec("DELETE FROM music_sessions WHERE id = $1 AND 0 = (select count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id])
|
||||
if result.cmd_tuples == 1
|
||||
music_session_id = nil
|
||||
end
|
||||
end
|
||||
|
||||
blk.call(conn, count, music_session_id) unless blk.nil?
|
||||
return count
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -224,7 +276,9 @@ SQL
|
|||
end
|
||||
end
|
||||
|
||||
def join_music_session(user, client_id, music_session, as_musician, tracks)
|
||||
# if a blk is passed in, upon success, it will be called and you can issue notifications
|
||||
# within the connection table lock
|
||||
def join_music_session(user, client_id, music_session, as_musician, tracks, &blk)
|
||||
connection = nil
|
||||
user_id = user.id
|
||||
music_session_id = music_session.id
|
||||
|
|
@ -242,10 +296,7 @@ SQL
|
|||
if connection.errors.any?
|
||||
raise ActiveRecord::Rollback
|
||||
else
|
||||
if as_musician && music_session.musician_access
|
||||
Notification.send_musician_session_join(music_session, connection, user)
|
||||
Notification.send_friend_session_join(db_conn, connection, user)
|
||||
end
|
||||
blk.call(db_conn, connection) unless blk.nil?
|
||||
MusicSessionUserHistory.save(music_session_id, user_id, client_id)
|
||||
end
|
||||
end
|
||||
|
|
@ -253,57 +304,9 @@ SQL
|
|||
return connection
|
||||
end
|
||||
|
||||
def join_music_session_old(user_id, client_id, music_session_id, as_musician)
|
||||
conn = @pg_conn
|
||||
|
||||
lock_connections(conn)
|
||||
|
||||
previous_music_session_id = check_already_session(conn, client_id)
|
||||
|
||||
user = User.find(user_id)
|
||||
|
||||
if as_musician != true && as_musician != false # checks that a boolean was passed in
|
||||
raise JamArgumentError, "as_musician incorrectly specified"
|
||||
end
|
||||
|
||||
# determine if the user can join; if not, throw a PermissionError
|
||||
music_session = MusicSession.find(music_session_id)
|
||||
|
||||
|
||||
unless music_session.can_join?(user, as_musician)
|
||||
@log.debug "user can not join a session user_id=#{user_id} and client_id=#{client_id}"
|
||||
raise PermissionError, "unable to join the specified session"
|
||||
end
|
||||
|
||||
begin
|
||||
# we include user_id in the query as an act of security, so that a user can't access someone else's client connection
|
||||
conn.exec("UPDATE connections SET music_session_id = $1, as_musician = $2 WHERE client_id = $3 and user_id = $4", [music_session_id, as_musician, client_id, user_id]) do |result|
|
||||
if result.cmd_tuples == 1
|
||||
@log.debug "associated music_session with connection for client=#{client_id}, music_session=#{music_session_id}, and user=#{user_id}"
|
||||
|
||||
session_checks(conn, previous_music_session_id, user_id)
|
||||
elsif result.cmd_tuples == 0
|
||||
@log.debug "join_music_session no connection found with client_id=#{client_id} and user_id=#{user_id}"
|
||||
raise ActiveRecord::RecordNotFound
|
||||
else
|
||||
@log.error "database failure or logic error; this path should be impossible if the table is locked (join_music_session)"
|
||||
raise Exception, "locked table changed state"
|
||||
end
|
||||
end
|
||||
rescue PG::Error => pg_error
|
||||
if pg_error.to_s.include?("insert or update on table \"connections\" violates foreign key constraint \"connections_music_session_id_fkey\"")
|
||||
# if there is no music session that exists, we will receive this message
|
||||
@log.debug "music_session does not exist. music_session=#{music_session_id}"
|
||||
raise StateError, "music_session does not exist"
|
||||
else
|
||||
raise pg_error
|
||||
end
|
||||
|
||||
end
|
||||
# end
|
||||
end
|
||||
|
||||
def leave_music_session(user, connection, music_session)
|
||||
# if a blk is passed in, upon success, it will be called and you can issue notifications
|
||||
# within the connection table lock
|
||||
def leave_music_session(user, connection, music_session, &blk)
|
||||
ConnectionManager.active_record_transaction do |connection_manager|
|
||||
|
||||
conn = connection_manager.pg_conn
|
||||
|
|
@ -331,7 +334,7 @@ SQL
|
|||
|
||||
JamRuby::MusicSessionUserHistory.removed_music_session(user_id, music_session_id)
|
||||
session_checks(conn, previous_music_session_id, user_id)
|
||||
Notification.send_musician_session_depart(music_session, connection, user)
|
||||
blk.call() unless blk.nil?
|
||||
|
||||
elsif result.cmd_tuples == 0
|
||||
@log.debug "leave_music_session no connection found with client_id=#{client_id}"
|
||||
|
|
|
|||
|
|
@ -35,8 +35,8 @@
|
|||
end
|
||||
|
||||
# create a login ack (login was successful)
|
||||
def login_ack(public_ip, client_id, token, heartbeat_interval, music_session_id)
|
||||
login_ack = Jampb::LoginAck.new(:public_ip => public_ip, :client_id => client_id, :token => token, :heartbeat_interval => heartbeat_interval, :music_session_id => music_session_id)
|
||||
def login_ack(public_ip, client_id, token, heartbeat_interval, music_session_id, reconnected)
|
||||
login_ack = Jampb::LoginAck.new(:public_ip => public_ip, :client_id => client_id, :token => token, :heartbeat_interval => heartbeat_interval, :music_session_id => music_session_id, :reconnected => reconnected)
|
||||
return Jampb::ClientMessage.new(:type => ClientMessage::Type::LOGIN_ACK, :route_to => CLIENT_TARGET, :login_ack => login_ack)
|
||||
end
|
||||
|
||||
|
|
@ -112,6 +112,19 @@
|
|||
return Jampb::ClientMessage.new(:type => ClientMessage::Type::MUSICIAN_SESSION_DEPART, :route_to => CLIENT_TARGET, :musician_session_depart => left)
|
||||
end
|
||||
|
||||
# create a musician fresh session message
|
||||
def musician_session_fresh(session_id, user_id, username, photo_url)
|
||||
fresh = Jampb::MusicianSessionFresh.new(:session_id => session_id, :user_id => user_id, :username => username, :photo_url => photo_url)
|
||||
return Jampb::ClientMessage.new(:type => ClientMessage::Type::MUSICIAN_SESSION_FRESH, :route_to => CLIENT_TARGET, :musician_session_fresh => fresh)
|
||||
end
|
||||
|
||||
# create a musician stale session message
|
||||
def musician_session_stale(session_id, user_id, username, photo_url)
|
||||
stale = Jampb::MusicianSessionStale.new(:session_id => session_id, :user_id => user_id, :username => username, :photo_url => photo_url)
|
||||
return Jampb::ClientMessage.new(:type => ClientMessage::Type::MUSICIAN_SESSION_STALE, :route_to => CLIENT_TARGET, :musician_session_stale => stale)
|
||||
end
|
||||
|
||||
|
||||
# create a user-joined session message
|
||||
def join_request(session_id, join_request_id, username, text)
|
||||
join_request = Jampb::JoinRequest.new(:join_request_id => join_request_id, :username => username, :text => text)
|
||||
|
|
|
|||
|
|
@ -223,13 +223,31 @@ module JamRuby
|
|||
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => connection.client_id})
|
||||
end
|
||||
|
||||
def send_musician_session_depart(music_session, connection, user)
|
||||
def send_musician_session_depart(music_session, client_id, user)
|
||||
|
||||
# (1) create notification
|
||||
msg = @@message_factory.musician_session_depart(music_session.id, user.id, user.name, user.photo_url)
|
||||
|
||||
# (2) send notification
|
||||
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => connection.client_id})
|
||||
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => client_id})
|
||||
end
|
||||
|
||||
def send_musician_session_fresh(music_session, client_id, user)
|
||||
|
||||
# (1) create notification
|
||||
msg = @@message_factory.musician_session_fresh(music_session.id, user.id, user.name, user.photo_url)
|
||||
|
||||
# (2) send notification
|
||||
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => client_id})
|
||||
end
|
||||
|
||||
def send_musician_session_stale(music_session, client_id, user)
|
||||
|
||||
# (1) create notification
|
||||
msg = @@message_factory.musician_session_stale(music_session.id, user.id, user.name, user.photo_url)
|
||||
|
||||
# (2) send notification
|
||||
@@mq_router.server_publish_to_session(music_session, msg, sender = {:client_id => client_id})
|
||||
end
|
||||
|
||||
def send_friend_session_join(db_conn, connection, user)
|
||||
|
|
|
|||
|
|
@ -62,8 +62,8 @@ describe ConnectionManager do
|
|||
|
||||
client_id = "client_id2"
|
||||
user_id = create_user("test", "user2", "user2@jamkazam.com")
|
||||
connection = @connman.create_connection(user_id, client_id, "1.1.1.1")
|
||||
|
||||
count = @connman.create_connection(user_id, client_id, "1.1.1.1")
|
||||
count.should == 1
|
||||
# make sure the connection is seen
|
||||
|
||||
@conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result|
|
||||
|
|
@ -73,7 +73,8 @@ describe ConnectionManager do
|
|||
cc = Connection.find_by_client_id!(client_id)
|
||||
cc.connected?.should be_true
|
||||
|
||||
@connman.delete_connection(client_id)
|
||||
count = @connman.delete_connection(client_id)
|
||||
count.should == 0
|
||||
|
||||
@conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result|
|
||||
result.getvalue(0, 0).should == "0"
|
||||
|
|
@ -377,9 +378,9 @@ describe ConnectionManager do
|
|||
user = User.find(user_id)
|
||||
dummy_music_session = MusicSession.new
|
||||
|
||||
connection = @connman.create_connection(user_id, client_id, "1.1.1.1")
|
||||
@connman.create_connection(user_id, client_id, "1.1.1.1")
|
||||
|
||||
expect { @connman.leave_music_session(user, connection, dummy_music_session) }.to raise_error(JamRuby::StateError)
|
||||
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
|
||||
end
|
||||
|
||||
it "leave_music_session fails if in different music_session" do
|
||||
|
|
@ -393,9 +394,9 @@ describe ConnectionManager do
|
|||
|
||||
dummy_music_session = MusicSession.new
|
||||
|
||||
connection = @connman.create_connection(user_id, client_id, "1.1.1.1")
|
||||
@connman.create_connection(user_id, client_id, "1.1.1.1")
|
||||
@connman.join_music_session(user, client_id, music_session, true, TRACKS)
|
||||
expect { @connman.leave_music_session(user, connection, dummy_music_session) }.to raise_error(JamRuby::StateError)
|
||||
expect { @connman.leave_music_session(user, Connection.find_by_client_id(client_id), dummy_music_session) }.to raise_error(JamRuby::StateError)
|
||||
end
|
||||
|
||||
it "leave_music_session works" do
|
||||
|
|
@ -407,7 +408,7 @@ describe ConnectionManager do
|
|||
user = User.find(user_id)
|
||||
music_session = MusicSession.find(music_session_id)
|
||||
|
||||
connection = @connman.create_connection(user_id, client_id, "1.1.1.1")
|
||||
@connman.create_connection(user_id, client_id, "1.1.1.1")
|
||||
@connman.join_music_session(user, client_id, music_session, true, TRACKS)
|
||||
|
||||
assert_session_exists(music_session_id, true)
|
||||
|
|
@ -416,7 +417,7 @@ describe ConnectionManager do
|
|||
result.getvalue(0, 0).should == music_session_id
|
||||
end
|
||||
|
||||
@connman.leave_music_session(user, connection, music_session)
|
||||
@connman.leave_music_session(user, Connection.find_by_client_id(client_id), music_session)
|
||||
|
||||
@conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
|
||||
result.getvalue(0, 0).should == nil
|
||||
|
|
|
|||
Loading…
Reference in New Issue