jam-cloud/ruby/lib/jam_ruby/connection_manager.rb

562 lines
24 KiB
Ruby

module JamRuby
# All writes should occur through the ConnectionManager for the connection table
# Reads can occur freely elsewhere, though
# Because connections are tied to the websocket-connection and we bookkeep them in the database purely
# for 'SQL convenience', this is a obvious place we can go away from a database
# as an optimization if we find it's too much db traffic created'
# At a minimum, though, we could make connections an UNLOGGED table because if the database crashes,
# all clients should reconnect and re-establish their connection anyway
#
# All methods in here could also be refactored as stored procedures, if we stick with a database.
# This may make sense in the short term if we are still managing connections in the database, but
# we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods).
# Or of course we could just port the relevant methods to node-js
#
# Also we don't send notifications from ConnectionManager;
# we just return enough data so that a caller can make the determination if it needs to
class ConnectionManager < BaseManager
def initialize(options={})
super(options)
@log = Logging.logger[self]
end
def update_staleness()
#TODO
end
##### TODO: refactored to notification.rb but left here for backwards compatibility w/ connection_manager_spec.rb
def gather_friends(connection, user_id)
friend_ids = []
connection.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results|
friend_results.each do |friend_result|
friend_ids.push(friend_result['friend_id'])
end
end
return friend_ids
end
# this simulates music_session destroy callbacks with activerecord
def before_destroy_music_session(music_session_id)
music_session = ActiveMusicSession.find_by_id(music_session_id)
music_session.before_destroy if music_session
end
# reclaim the existing connection, if ip_address is not nil then perhaps a new address as well
def reconnect(conn, channel_id, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time, udp_reachable, gateway)
music_session_id = nil
reconnected = false
# we will reconnect the same music_session that the connection was previously in,
# if it matches the same value currently in the database for music_session_id
music_session_id_expression = 'NULL'
joined_session_at_expression = 'NULL'
unless reconnect_music_session_id.nil?
music_session_id_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN music_session_id ELSE NULL END)"
joined_session_at_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN NOW() ELSE NULL END)"
end
if ip_address and !ip_address.eql?(conn.ip_address)
# turn ip_address string into a number, then fetch the isp and block records and update location info
#addr = JamIsp.ip_to_num(ip_address)
#puts("============= JamIsp.ip_to_num returns #{addr} for #{ip_address} =============")
#isp = JamIsp.lookup(addr)
#puts("============= JamIsp.lookup returns #{isp.inspect} for #{addr} =============")
#if isp.nil? then
# ispid = 0
#else
# ispid = isp.coid
#end
#block = GeoIpBlocks.lookup(addr)
#puts("============= GeoIpBlocks.lookup returns #{block.inspect} for #{addr} =============")
#if block.nil? then
# locid = 0
#else
# locid = block.locid
#end
#location = GeoIpLocations.find_by_locid(locid)
#if location.nil? || isp.nil? || block.nil?
# locidispid = nil
#else
# locidispid = locid*1000000+ispid
#end
conn.ip_address = ip_address
#conn.addr = addr
#conn.locidispid = locidispid
conn.save!(validate: false)
end
# if udp_reachable is nil, it means it's unknown. Since this is a reconnect, we'll, preserve existing value in this case
# otherwise, pass in the value of boolean udp_reachable var
udp_reachable_value = udp_reachable.nil? ? 'udp_reachable' : udp_reachable
sql =<<SQL
UPDATE connections SET (channel_id, aasm_state, updated_at, music_session_id, joined_session_at, stale_time, expire_time, udp_reachable, gateway, is_network_testing, metronome_open) = ('#{channel_id}', '#{Connection::CONNECT_STATE.to_s}', NOW(), #{music_session_id_expression}, #{joined_session_at_expression}, #{connection_stale_time}, #{connection_expire_time}, #{udp_reachable_value}, '#{gateway}', FALSE, FALSE)
WHERE
client_id = '#{conn.client_id}'
RETURNING music_session_id
SQL
self.pg_conn.exec(sql) do |result|
if result.cmd_tuples == 1
music_session_id = result[0]['music_session_id']
end
end
# we tell the client they reconnected if they specified a reconnect music_session_id, and if that is now the
# current value in the database
reconnected = true if !reconnect_music_session_id.nil? && reconnect_music_session_id == music_session_id
return music_session_id, reconnected
end
# returns the music_session_id, if any, associated with the client
def flag_connection_stale_with_client_id(client_id)
music_session_id = nil
sql =<<SQL
UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}'
WHERE
client_id = '#{client_id}' AND
aasm_state = '#{Connection::CONNECT_STATE.to_s}'
RETURNING music_session_id
SQL
self.pg_conn.exec(sql) do |result|
# if we did update a client to stale, retriee music_session_id
if result.cmd_tuples == 1
music_session_id = result[0]['music_session_id']
end
end
music_session_id
end
# flag connections as stale
def flag_stale_connections(gateway_name)
ConnectionManager.active_record_transaction do |connection_manager, conn|
sql = "UPDATE connections SET aasm_state = '#{Connection::STALE_STATE.to_s}' WHERE updated_at < (NOW() - (interval '1 second' * stale_time)) AND aasm_state = '#{Connection::CONNECT_STATE.to_s}' AND gateway = '#{gateway_name}'"
conn.exec(sql)
end
end
# NOTE this is only used for testing purposes;
# actual deletes will be processed in the websocket context which cleans up dependencies
def expire_stale_connections(gateway_name)
self.stale_connection_client_ids(gateway_name).each { |client| self.delete_connection(client[:client_id]) }
end
def connection_client_ids_for_gateway(gateway_name)
clients = []
ConnectionManager.active_record_transaction do |connection_manager, conn|
sql = "SELECT client_id FROM connections WHERE gateway = '#{gateway_name}'"
conn.exec(sql) do |result|
result.each { |row|
client_id = row['client_id']
clients << client_id
}
end
end
clients
end
# expiring connections in stale state, which deletes them
def stale_connection_client_ids(gateway_name)
clients = []
ConnectionManager.active_record_transaction do |connection_manager, conn|
sql = "SELECT client_id, music_session_id, user_id, client_type FROM connections WHERE updated_at < (NOW() - (interval '1 second' * expire_time)) AND gateway = '#{gateway_name}'"
conn.exec(sql) do |result|
result.each { |row|
client_id = row['client_id']
music_session_id = row['music_session_id']
user_id = row['user_id']
client_type = row['client_type']
clients << {client_id: client_id, music_session_id: music_session_id, client_type: client_type, user_id: user_id}
}
end
end
clients
end
# deletes any connections in active music sessions older than 2 days.
def cleanup_dangling
ConnectionManager.active_record_transaction do |connection_manager, conn|
# select * from connections set music_session_id = null where id in (select id from connections where music_session_id in (select id from active_music_sessions where updated_at < (NOW() - '12 hours'::interval)))
sql = "update connections set music_session_id = null where id in (select id from connections where music_session_id in (select id from active_music_sessions where updated_at < (NOW() - '12 hours'::interval)))"
conn.exec(sql) do |result|
end
end
end
# returns the number of connections that this user currently has across all clients
# this number is used by notification logic elsewhere to know
# 'oh the user joined for the 1st time, so send a friend update', or
# 'don't bother because the user has connected somewhere else already'
def create_connection(user_id, client_id, channel_id, ip_address, client_type, connection_stale_time, connection_expire_time, udp_reachable, gateway, is_jamblaster, &blk)
# validate client_type
raise "invalid client_type: #{client_type}" if client_type != 'client' && client_type != 'browser'
count = 0
ConnectionManager.active_record_transaction do |connection_manager, conn|
# turn ip_address string into a number, then fetch the isp and block records
addr = nil
#addr = JamIsp.ip_to_num(ip_address)
#puts("============= JamIsp.ip_to_num returns #{addr} for #{ip_address} =============")
#isp = JamIsp.lookup(addr)
#puts("============= JamIsp.lookup returns #{isp.inspect} for #{addr} =============")
#if isp.nil? then
# ispid = 0
#else
# ispid = isp.coid
#end
#block = GeoIpBlocks.lookup(addr)
#puts("============= GeoIpBlocks.lookup returns #{block.inspect} for #{addr} =============")
#if block.nil? then
# locid = 0
#else
# locid = block.locid
#end
#location = GeoIpLocations.find_by_locid(locid)
#if location.nil? || isp.nil? || block.nil?
# locidispid = nil
#else
# locidispid = locid*1000000+ispid
#end
addr = IPAddr.new(ip_address).to_i
locidispid = nil
lock_connections(conn)
conn.exec("INSERT INTO connections (user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, aasm_state, stale_time, expire_time, udp_reachable, gateway, is_jamblaster) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
[user_id, client_id, channel_id, ip_address, client_type, addr, locidispid, Connection::CONNECT_STATE.to_s, connection_stale_time, connection_expire_time, udp_reachable, gateway, is_jamblaster]).clear
if user_id
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0).to_i
# we're passing all this stuff so that the user record might be updated as well...
blk.call(conn, count) unless blk.nil?
end
end
return count
end
end
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
# this code is responsible for all cleanup logic associated with a connection going away
# returns how many connections are left for this user; this data is used by callers to know whether
# to tell friends if the user is offline (count==0) or not (count > 0)
# If a blk is passed in, on success, count is also passed back an the db connection, allowing for
# notifications to go out within the table log. music_session_id is also passed, if the music_session still exists
# and this connection was in a session
def delete_connection(client_id, &blk)
ConnectionManager.active_record_transaction do |connection_manager, conn|
count = 0
user_id = nil
music_session_id = nil
lock_connections(conn)
previous_music_session_id = check_already_session(conn, client_id)
conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result|
if result.cmd_tuples == 0
# the client is already gone from the database... do nothing but log error
@log.warn("unable to delete client #{client_id}")
return
elsif result.cmd_tuples == 1
user_id = result[0]['user_id']
music_session_id = result[0]['music_session_id']
else
raise Exception, 'uniqueness constraint has been lost on client_id'
end
end
session_checks(conn, previous_music_session_id, user_id)
# since we did delete a row, check and see if any more connections for that user exist
# if we are down to zero, send out user gone message
conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0).to_i
end
# same for session-if we are down to the last participant, delete the session
unless music_session_id.nil?
before_destroy_music_session(music_session_id)
result = conn.exec("DELETE FROM active_music_sessions WHERE id = $1 AND 0 = (select count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id])
if result.cmd_tuples == 1
music_session_id = nil
end
end
blk.call(conn, count, music_session_id, user_id) unless blk.nil?
return count
end
end
def check_already_session(conn, client_id)
conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
if result.num_tuples == 1
previous_music_session_id = result.getvalue(0, 0)
return previous_music_session_id
elsif result.num_tuples == 0
# there is no connection found matching this criteria; we are done.
@log.debug("when checking for existing session, no connection found with client=#{client_id}")
return nil
else
@log.error("connection table data integrity violation; multiple rows found. client_id=#{client_id}")
raise Exception, "connection table data integrity violation; multiple rows found. client_id=#{client_id}"
end
end
end
def session_checks(conn, previous_music_session_id, user_id)
unless previous_music_session_id.nil?
# TODO: send notification to friends that this user left this session?
@log.debug("user #{user_id} left music_session #{previous_music_session_id}")
# destroy the music_session if it's empty
num_participants = nil
conn.exec("SELECT count(*) FROM connections WHERE music_session_id = $1",
[previous_music_session_id]) do |result|
num_participants = result.getvalue(0, 0).to_i
end
if num_participants == 0
# delete the music_session
before_destroy_music_session(previous_music_session_id)
conn.exec("DELETE from active_music_sessions WHERE id = $1",
[previous_music_session_id]) do |result|
if result.cmd_tuples == 1
# music session deleted!
@log.debug("deleted music session #{previous_music_session_id}")
JamRuby::MusicSession.removed_music_session(previous_music_session_id)
elsif 1 < result.cmd_tuples
msg = "music_sessions table data integrity violation; multiple rows found with music_session_id=#{previous_music_session_id}"
@log.error(msg)
raise Exception, msg
end
end
else
conn.exec("UPDATE active_music_sessions set backing_track_initiator_id = NULL, backing_track_path = NULL where backing_track_initiator_id = $1 and id = $2",
[user_id, previous_music_session_id])
conn.exec("UPDATE active_music_sessions set metronome_initiator_id = NULL, metronome_active = FALSE where metronome_initiator_id = $1 and id = $2",
[user_id, previous_music_session_id])
#ensure that there is no active claimed recording if the owner of that recording left the session
conn.exec("UPDATE active_music_sessions set claimed_recording_id = NULL, claimed_recording_initiator_id = NULL where claimed_recording_initiator_id = $1 and id = $2",
[user_id, previous_music_session_id])
conn.exec("UPDATE active_music_sessions set jam_track_id = NULL, jam_track_initiator_id = NULL where jam_track_initiator_id = $1 and id = $2",
[user_id, previous_music_session_id])
update_session_controller(previous_music_session_id)
end
end
end
def update_session_controller(music_session_id, kick_extras = false)
tracks_changed = false
active_music_session = ActiveMusicSession.find(music_session_id)
if active_music_session
music_session = active_music_session.music_session
if music_session.session_controller_id && !active_music_session.users.exists?(music_session.session_controller.id)
# find next in line, because the current 'session controller' is not part of the session
tracks_changed = next_in_line(music_session, active_music_session)
end
if kick_extras
num_participants = active_music_session.users.count
#puts("kick extras = num_participants #{num_participants}")
active_music_session.users.each do |user|
subscription_rules = user.subscription_rules(false)
#puts "checking max players for #{user.email} #{subscription_rules[:max_players]}"
if subscription_rules[:max_players] && subscription_rules[:max_players] < num_participants
#puts "kicking user #{user.email}"
# XXX TODO? Should we do this?
end
end
end
end
tracks_changed
end
# determine who should be session controller after someone leaves
def next_in_line(music_session, active_music_session)
tracks_changed = false
session_users = active_music_session.users
# check friends 1st
session_friends = music_session.creator.friends && session_users
if session_friends.length > 0
music_session.session_controller = session_friends[0]
if music_session.save
active_music_session.tick_track_changes
tracks_changed = true
return
end
end
# check invited 2nd
invited = music_session.invited_musicians && session_users
if invited.length > 0
music_session.session_controller = invited[0]
if music_session.save
active_music_session.tick_track_changes
tracks_changed = true
return
end
end
# go by who joined earliest
earliest = active_music_session.connections.order(:joined_session_at).first
if earliest
music_session.session_controller = earliest
if music_session.save
active_music_session.tick_track_changes
tracks_changed = true
return
end
end
tracks_changed
end
def join_music_session(user, client_id, music_session, as_musician, tracks, audio_latency, client_role = nil, parent_client_id = nil, video_sources=nil)
connection = nil
tracks_changed = false
ConnectionManager.active_record_transaction do |connection_manager, db_conn|
connection = Connection.find_by_client_id(client_id)
if connection.nil?
raise JamRecordNotFound.new("Unable to find connection by client_id #{client_id}", 'Connection')
elsif connection.user_id.nil?
raise JamPermissionError, "no user_id associated with connection #{client_id}"
elsif connection.user_id != user.id
raise JamPermissionError, "wrong user_id associated with connection #{client_id}"
end
connection.join_the_session(music_session, as_musician, tracks, user, audio_latency, client_role, parent_client_id, video_sources)
JamRuby::MusicSessionUserHistory.join_music_session(user.id, music_session.id, client_id)
# connection.music_session_id = music_session.id
# connection.as_musician = as_musician
# connection.joining_session = true
# connection.joined_session_at = Time.now
# associate_tracks(connection, tracks)
# connection.save
if connection.errors.any?
raise ActiveRecord::Rollback
else
tracks_changed = update_session_controller(music_session.id, kick_extras = true)
end
end
if tracks_changed
Notification.send_tracks_changed(music_session.active_music_session)
end
connection
end
# if a blk is passed in, upon success, it will be called and you can issue notifications
# within the connection table lock
def leave_music_session(user, connection, music_session, &blk)
send_tracks_changed = false
ConnectionManager.active_record_transaction do |connection_manager, conn|
lock_connections(conn)
music_session_id = music_session.id
user_id = user.id
client_id = connection.client_id
previous_music_session_id = check_already_session(conn, client_id)
if previous_music_session_id == nil
@log.debug "the client is not in a session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
raise StateError, "not in session"
elsif previous_music_session_id != music_session_id
@log.debug "the client is in a different session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
raise StateError, "in a session different than that specified"
end
# can throw exception if the session is deleted just before this
conn.exec("UPDATE connections SET music_session_id = NULL, joined_session_at = NULL, as_musician = NULL WHERE client_id = $1 AND user_id = $2 AND music_session_id = $3", [client_id, user_id, music_session_id]) do |result|
if result.cmd_tuples == 1
@log.debug("disassociated music_session with connection for client_id=#{client_id}, user_id=#{user_id}")
send_tracks_changed = update_session_controller(music_session.id)
JamRuby::MusicSessionUserHistory.removed_music_session(user_id, music_session_id)
session_checks(conn, previous_music_session_id, user_id)
blk.call() unless blk.nil?
elsif result.cmd_tuples == 0
@log.debug "leave_music_session no connection found with client_id=#{client_id}"
raise ActiveRecord::RecordNotFound
else
@log.error("database failure or logic error; this path should be impossible if the table is locked (leave_music_session)")
raise Exception, "locked table changed state"
end
end
end
if send_tracks_changed
Notification.send_tracks_changed(music_session.active_music_session)
end
end
def lock_connections(conn)
#if APP_CONFIG.lock_connections
# conn.exec("LOCK connections IN EXCLUSIVE MODE").clear
#end
end
# def associate_tracks(connection, tracks)
# @log.debug "Tracks:"
# @log.debug tracks
# connection.tracks.clear()
#
# unless tracks.nil?
# tracks.each do |track|
# instrument = Instrument.find(track["instrument_id"])
# t = Track.new
# t.instrument = instrument
# t.connection = connection
# t.sound = track["sound"]
# t.client_track_id = track["client_track_id"]
# t.save
# connection.tracks << t
# end
# end
# end
end
end