module JamRuby # All writes should occur through the ConnectionManager for the connection table # Reads can occur freely elsewhere, though # Because connections are tied to the websocket-connection and we bookkeep them in the database purely # for 'SQL convenience', this is a obvious place we can go away from a database # as an optimization if we find it's too much db traffic created' # At a minimum, though, we could make connections an UNLOGGED table because if the database crashes, # all clients should reconnect and re-establish their connection anyway # # All methods in here could also be refactored as stored procedures, if we stick with a database. # This may make sense in the short term if we are still managing connections in the database, but # we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods). # Or of course we could just port the relevant methods to node-js # # Also we don't send notifications from ConnectionManager; # we just return enough data so that a caller can make the determination if it needs to class ConnectionManager < BaseManager def initialize(options={}) super(options) @log = Logging.logger[self] end def update_staleness() #TODO end ##### TODO: refactored to notification.rb but left here for backwards compatibility w/ connection_manager_spec.rb def gather_friends(connection, user_id) friend_ids = [] connection.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results| friend_results.each do |friend_result| friend_ids.push(friend_result['friend_id']) end end return friend_ids end # this simulates music_session destroy callbacks with activerecord def before_destroy_music_session(music_session_id) music_session = ActiveMusicSession.find_by_id(music_session_id) music_session.before_destroy if music_session end # reclaim the existing connection, if ip_address is not nil then perhaps a new address as well def reconnect(conn, channel_id, reconnect_music_session_id, ip_address, connection_stale_time, connection_expire_time, udp_reachable, gateway) music_session_id = nil reconnected = false # we will reconnect the same music_session that the connection was previously in, # if it matches the same value currently in the database for music_session_id music_session_id_expression = 'NULL' joined_session_at_expression = 'NULL' unless reconnect_music_session_id.nil? music_session_id_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN music_session_id ELSE NULL END)" joined_session_at_expression = "(CASE WHEN music_session_id='#{reconnect_music_session_id}' THEN NOW() ELSE NULL END)" end if ip_address and !ip_address.eql?(conn.ip_address) # turn ip_address string into a number, then fetch the isp and block records and update location info #addr = JamIsp.ip_to_num(ip_address) #puts("============= JamIsp.ip_to_num returns #{addr} for #{ip_address} =============") #isp = JamIsp.lookup(addr) #puts("============= JamIsp.lookup returns #{isp.inspect} for #{addr} =============") #if isp.nil? then # ispid = 0 #else # ispid = isp.coid #end #block = GeoIpBlocks.lookup(addr) #puts("============= GeoIpBlocks.lookup returns #{block.inspect} for #{addr} =============") #if block.nil? then # locid = 0 #else # locid = block.locid #end #location = GeoIpLocations.find_by_locid(locid) #if location.nil? || isp.nil? || block.nil? # locidispid = nil #else # locidispid = locid*1000000+ispid #end conn.ip_address = ip_address #conn.addr = addr #conn.locidispid = locidispid conn.save!(validate: false) end # if udp_reachable is nil, it means it's unknown. Since this is a reconnect, we'll, preserve existing value in this case # otherwise, pass in the value of boolean udp_reachable var udp_reachable_value = udp_reachable.nil? ? 'udp_reachable' : udp_reachable sql =< 0) # If a blk is passed in, on success, count is also passed back an the db connection, allowing for # notifications to go out within the table log. music_session_id is also passed, if the music_session still exists # and this connection was in a session def delete_connection(client_id, &blk) ConnectionManager.active_record_transaction do |connection_manager| conn = connection_manager.pg_conn count = 0 user_id = nil music_session_id = nil lock_connections(conn) previous_music_session_id = check_already_session(conn, client_id) conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result| if result.cmd_tuples == 0 # the client is already gone from the database... do nothing but log error @log.warn("unable to delete client #{client_id}") return elsif result.cmd_tuples == 1 user_id = result[0]['user_id'] music_session_id = result[0]['music_session_id'] else raise Exception, 'uniqueness constraint has been lost on client_id' end end session_checks(conn, previous_music_session_id, user_id) # since we did delete a row, check and see if any more connections for that user exist # if we are down to zero, send out user gone message conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result| count = result.getvalue(0, 0).to_i end # same for session-if we are down to the last participant, delete the session unless music_session_id.nil? before_destroy_music_session(music_session_id) result = conn.exec("DELETE FROM active_music_sessions WHERE id = $1 AND 0 = (select count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]) if result.cmd_tuples == 1 music_session_id = nil end end blk.call(conn, count, music_session_id, user_id) unless blk.nil? return count end end def check_already_session(conn, client_id) conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result| if result.num_tuples == 1 previous_music_session_id = result.getvalue(0, 0) return previous_music_session_id elsif result.num_tuples == 0 # there is no connection found matching this criteria; we are done. @log.debug("when checking for existing session, no connection found with client=#{client_id}") return nil else @log.error("connection table data integrity violation; multiple rows found. client_id=#{client_id}") raise Exception, "connection table data integrity violation; multiple rows found. client_id=#{client_id}" end end end def session_checks(conn, previous_music_session_id, user_id) unless previous_music_session_id.nil? # TODO: send notification to friends that this user left this session? @log.debug("user #{user_id} left music_session #{previous_music_session_id}") # destroy the music_session if it's empty num_participants = nil conn.exec("SELECT count(*) FROM connections WHERE music_session_id = $1", [previous_music_session_id]) do |result| num_participants = result.getvalue(0, 0).to_i end if num_participants == 0 # delete the music_session before_destroy_music_session(previous_music_session_id) conn.exec("DELETE from active_music_sessions WHERE id = $1", [previous_music_session_id]) do |result| if result.cmd_tuples == 1 # music session deleted! @log.debug("deleted music session #{previous_music_session_id}") JamRuby::MusicSession.removed_music_session(previous_music_session_id) elsif 1 < result.cmd_tuples msg = "music_sessions table data integrity violation; multiple rows found with music_session_id=#{previous_music_session_id}" @log.error(msg) raise Exception, msg end end else conn.exec("UPDATE active_music_sessions set backing_track_initiator_id = NULL, backing_track_path = NULL where backing_track_initiator_id = $1 and id = $2", [user_id, previous_music_session_id]) conn.exec("UPDATE active_music_sessions set metronome_initiator_id = NULL, metronome_active = FALSE where metronome_initiator_id = $1 and id = $2", [user_id, previous_music_session_id]) #ensure that there is no active claimed recording if the owner of that recording left the session conn.exec("UPDATE active_music_sessions set claimed_recording_id = NULL, claimed_recording_initiator_id = NULL where claimed_recording_initiator_id = $1 and id = $2", [user_id, previous_music_session_id]) conn.exec("UPDATE active_music_sessions set jam_track_id = NULL, jam_track_initiator_id = NULL where jam_track_initiator_id = $1 and id = $2", [user_id, previous_music_session_id]) update_session_controller(previous_music_session_id) end end end def update_session_controller(music_session_id) tracks_changed = false active_music_session = ActiveMusicSession.find(music_session_id) if active_music_session music_session = active_music_session.music_session if music_session.session_controller_id && !active_music_session.users.exists?(music_session.session_controller.id) # find next in line, because the current 'session controller' is not part of the session tracks_changed = next_in_line(music_session, active_music_session) end end tracks_changed end # determine who should be session controller after someone leaves def next_in_line(music_session, active_music_session) tracks_changed = false session_users = active_music_session.users # check friends 1st session_friends = music_session.creator.friends && session_users if session_friends.length > 0 music_session.session_controller = session_friends[0] if music_session.save active_music_session.tick_track_changes tracks_changed = true return end end # check invited 2nd invited = music_session.invited_musicians && session_users if invited.length > 0 music_session.session_controller = invited[0] if music_session.save active_music_session.tick_track_changes tracks_changed = true return end end # go by who joined earliest earliest = active_music_session.connections.order(:joined_session_at).first if earliest music_session.session_controller = earliest if music_session.save active_music_session.tick_track_changes tracks_changed = true return end end tracks_changed end def join_music_session(user, client_id, music_session, as_musician, tracks, audio_latency, client_role = nil, parent_client_id = nil, video_sources=nil) connection = nil tracks_changed = false ConnectionManager.active_record_transaction do |connection_manager| db_conn = connection_manager.pg_conn connection = Connection.find_by_client_id(client_id) if connection.nil? raise JamRecordNotFound.new("Unable to find connection by client_id #{client_id}", 'Connection') elsif connection.user_id.nil? raise JamPermissionError, "no user_id associated with connection #{client_id}" elsif connection.user_id != user.id raise JamPermissionError, "wrong user_id associated with connection #{client_id}" end connection.join_the_session(music_session, as_musician, tracks, user, audio_latency, client_role, parent_client_id, video_sources) JamRuby::MusicSessionUserHistory.join_music_session(user.id, music_session.id, client_id) # connection.music_session_id = music_session.id # connection.as_musician = as_musician # connection.joining_session = true # connection.joined_session_at = Time.now # associate_tracks(connection, tracks) # connection.save if connection.errors.any? raise ActiveRecord::Rollback else tracks_changed = update_session_controller(music_session.id) end end if tracks_changed Notification.send_tracks_changed(music_session.active_music_session) end connection end # if a blk is passed in, upon success, it will be called and you can issue notifications # within the connection table lock def leave_music_session(user, connection, music_session, &blk) send_tracks_changed = false ConnectionManager.active_record_transaction do |connection_manager| conn = connection_manager.pg_conn lock_connections(conn) music_session_id = music_session.id user_id = user.id client_id = connection.client_id previous_music_session_id = check_already_session(conn, client_id) if previous_music_session_id == nil @log.debug "the client is not in a session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}" raise StateError, "not in session" elsif previous_music_session_id != music_session_id @log.debug "the client is in a different session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}" raise StateError, "in a session different than that specified" end # can throw exception if the session is deleted just before this conn.exec("UPDATE connections SET music_session_id = NULL, joined_session_at = NULL, as_musician = NULL WHERE client_id = $1 AND user_id = $2 AND music_session_id = $3", [client_id, user_id, music_session_id]) do |result| if result.cmd_tuples == 1 @log.debug("disassociated music_session with connection for client_id=#{client_id}, user_id=#{user_id}") send_tracks_changed = update_session_controller(music_session.id) JamRuby::MusicSessionUserHistory.removed_music_session(user_id, music_session_id) session_checks(conn, previous_music_session_id, user_id) blk.call() unless blk.nil? elsif result.cmd_tuples == 0 @log.debug "leave_music_session no connection found with client_id=#{client_id}" raise ActiveRecord::RecordNotFound else @log.error("database failure or logic error; this path should be impossible if the table is locked (leave_music_session)") raise Exception, "locked table changed state" end end end if send_tracks_changed Notification.send_tracks_changed(music_session.active_music_session) end end def lock_connections(conn) #if APP_CONFIG.lock_connections # conn.exec("LOCK connections IN EXCLUSIVE MODE").clear #end end # def associate_tracks(connection, tracks) # @log.debug "Tracks:" # @log.debug tracks # connection.tracks.clear() # # unless tracks.nil? # tracks.each do |track| # instrument = Instrument.find(track["instrument_id"]) # t = Track.new # t.instrument = instrument # t.connection = connection # t.sound = track["sound"] # t.client_track_id = track["client_track_id"] # t.save # connection.tracks << t # end # end # end end end