259 lines
11 KiB
Ruby
259 lines
11 KiB
Ruby
module JamRuby
|
|
# All writes should occur through the ConnectionManager
|
|
# Reads can occur freely elsewhere, though
|
|
# Because connections are tied to the websocket-connection and we bookkeep them in the database purely
|
|
# for 'SQL convenience', this is a obvious place we can go away from a database
|
|
# as an optimization if we find it's too much db traffic created'
|
|
# At a minimum, though, we could make connections an UNLOGGED table because if the database crashes,
|
|
# all clients should reconnect and restablish their connection anyway
|
|
#
|
|
# All methods in here could also be refactored as stored procedures, if we stick with a database.
|
|
# This may make sense in the short term if we are still managing connections in the database, but
|
|
# we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods).
|
|
# Or of course we could just port the relevant methods to node-js
|
|
class ConnectionManager
|
|
|
|
attr_accessor :mq_router, :pg_conn
|
|
|
|
def initialize(options={})
|
|
@log = Logging.logger[self]
|
|
@mq_router = MQRouter.new
|
|
@pg_conn = options[:conn]
|
|
@message_factory = MessageFactory.new
|
|
|
|
unless PG.threadsafe?
|
|
raise Exception, "a non-threadsafe build of libpq is present."
|
|
end
|
|
end
|
|
|
|
def update_staleness()
|
|
#TODO
|
|
end
|
|
|
|
# remove stale connections
|
|
def remove_stale_connections(max_seconds)
|
|
stale_clients = []
|
|
@pg_conn.exec("SELECT client_id FROM connections WHERE updated_at < (NOW() - interval '#{max_seconds} second')") do |result|
|
|
result.each do |row|
|
|
stale_clients.push(row['client_id'])
|
|
end
|
|
end
|
|
|
|
@log.debug("deleting #{stale_clients.length} stale connections")
|
|
|
|
stale_clients.each do |client_id|
|
|
delete_connection(client_id)
|
|
end
|
|
end
|
|
|
|
|
|
def create_connection(user_id, client_id, ip_address)
|
|
conn = @pg_conn
|
|
|
|
lock_connections(conn)
|
|
|
|
conn.exec("INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3)", [user_id, client_id, ip_address]).clear
|
|
|
|
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
|
|
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
|
|
count = result.getvalue(0, 0)
|
|
if count == "1"
|
|
# get all friend user_ids using the same query rails does for @user.friends
|
|
friend_update = @message_factory.friend_update(user_id, true)
|
|
friend_ids = gather_friends(conn, user_id)
|
|
@mq_router.publish_to_friends(friend_ids, friend_update, user_id)
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
|
|
# this code is responsible for all cleanup logic associated with a connection going away
|
|
def delete_connection(client_id)
|
|
|
|
user_id = nil
|
|
music_session_id = nil
|
|
|
|
conn = @pg_conn
|
|
|
|
lock_connections(conn)
|
|
|
|
previous_music_session_id = check_already_session(conn, client_id)
|
|
|
|
conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result|
|
|
|
|
if result.cmd_tuples == 0
|
|
# the client is already gone from the database... do nothing but log error
|
|
@log.warn("unable to delete client #{client_id}")
|
|
return
|
|
elsif result.cmd_tuples == 1
|
|
user_id = result[0]['user_id']
|
|
music_session_id = result[0]['client_id']
|
|
|
|
else
|
|
raise Exception, 'uniqueness constraint has been lost on client_id'
|
|
end
|
|
end
|
|
|
|
session_checks(conn, previous_music_session_id, user_id)
|
|
|
|
# since we did delete a row, check and see if any more connections for that user exist
|
|
# if we are down to zero, send out user gone message
|
|
conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result|
|
|
count = result.getvalue(0, 0)
|
|
if count == "0"
|
|
friend_update = @message_factory.friend_update(user_id, false)
|
|
friend_ids = gather_friends(conn, user_id)
|
|
@mq_router.publish_to_friends(friend_ids, friend_update, user_id)
|
|
end
|
|
end
|
|
|
|
# same for session-if we are down to the last participant, delete the session
|
|
unless music_session_id.nil?
|
|
conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear
|
|
end
|
|
end
|
|
|
|
def destroy_if_empty(conn, music_session_id)
|
|
|
|
num_participants = nil
|
|
conn.exec("SELECT count(*) FROM connections WHERE music_session_id = $1", [music_session_id]) do |result|
|
|
num_participants = result.getvalue(0, 0).to_i
|
|
end
|
|
|
|
if num_participants == 0
|
|
# delete the music_session
|
|
conn.exec("DELETE from music_sessions WHERE id = $1", [music_session_id]) do |result|
|
|
if result.cmd_tuples == 0
|
|
# no music session deleted. do nothing
|
|
elsif result.cmd_tuples == 1
|
|
# music session deleted!
|
|
@log.debug("deleted music session #{music_session_id}")
|
|
else
|
|
@log.error("music_sessions table data integrity violation; multiple rows found with music_session_id=#{music_session_id}")
|
|
raise Exception, "music_sessions table data integrity violation; multiple rows found with music_session_id=#{music_session_id}"
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def check_already_session(conn, client_id)
|
|
conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
|
|
if result.num_tuples == 1
|
|
previous_music_session_id = result.getvalue(0, 0)
|
|
return previous_music_session_id
|
|
elsif result.num_tuples == 0
|
|
# there is no connection found matching this criteria; we are done.
|
|
@log.debug("when checking for existing session, no connection found with client=#{client_id}")
|
|
return nil
|
|
else
|
|
@log.error("connection table data integrity violation; multiple rows found. client_id=#{client_id}")
|
|
raise Exception, "connection table data integrity violation; multiple rows found. client_id=#{client_id}"
|
|
end
|
|
end
|
|
end
|
|
|
|
def session_checks(conn, previous_music_session_id, user_id)
|
|
unless previous_music_session_id.nil?
|
|
# TODO: send notification to friends that this user left this session?
|
|
@log.debug("user #{user_id} left music_session #{previous_music_session_id}")
|
|
destroy_if_empty(conn, previous_music_session_id)
|
|
end
|
|
end
|
|
|
|
def join_music_session(user_id, client_id, music_session_id)
|
|
conn = @pg_conn
|
|
|
|
lock_connections(conn)
|
|
|
|
previous_music_session_id = check_already_session(conn, client_id)
|
|
|
|
begin
|
|
conn.exec("UPDATE connections SET music_session_id = $1 WHERE client_id = $2", [music_session_id, client_id]) do |result|
|
|
if result.cmd_tuples == 1
|
|
@log.debug "associated music_session with connection for client=#{client_id} and music_session=#{music_session_id}"
|
|
|
|
session_checks(conn, previous_music_session_id, user_id)
|
|
elsif result.cmd_tuples == 0
|
|
@log.debug "join_music_session no connection found with client_id=#{client_id}"
|
|
raise StateError, "no connection found with client=#{client_id}"
|
|
else
|
|
@log.error "database failure or logic error; this path should be impossible if the table is locked (join_music_session)"
|
|
raise Exception, "locked table changed state"
|
|
end
|
|
end
|
|
rescue PG::Error => pg_error
|
|
if pg_error.to_s.include?("insert or update on table \"connections\" violates foreign key constraint \"connections_music_session_id_fkey\"")
|
|
# if there is no music session that exists, we will receive this message
|
|
@log.debug "music_session does not exist. music_session=#{music_session_id}"
|
|
raise StateError, "music_session does not exist"
|
|
else
|
|
raise pg_error
|
|
end
|
|
|
|
end
|
|
# end
|
|
end
|
|
|
|
def leave_music_session(user_id, client_id, music_session_id)
|
|
conn = @pg_conn
|
|
|
|
lock_connections(conn)
|
|
|
|
previous_music_session_id = check_already_session(conn, client_id)
|
|
|
|
if previous_music_session_id == nil
|
|
@log.debug "the client is not in a session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
|
|
raise StateError, "not in session"
|
|
elsif previous_music_session_id != music_session_id
|
|
@log.debug "the client is in a different session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
|
|
raise StateError, "in a session different than that specified"
|
|
end
|
|
|
|
# can throw exception if the session is deleted just before this
|
|
conn.exec("UPDATE connections SET music_session_id = NULL WHERE client_id = $1", [client_id]) do |result|
|
|
if result.cmd_tuples == 1
|
|
@log.debug("deassociated music_session with connection for client_id #{client_id}")
|
|
|
|
session_checks(conn, previous_music_session_id, user_id)
|
|
elsif result.cmd_tuples == 0
|
|
@log.debug "leave_music_session no connection found with client_id=#{client_id}"
|
|
raise StateError, "no connection found with client=#{client_id}"
|
|
else
|
|
@log.error("database failure or logic error; this path should be impossible if the table is locked (leave_music_session)")
|
|
raise Exception, "locked table changed state"
|
|
end
|
|
end
|
|
end
|
|
|
|
def lock_connections(conn)
|
|
conn.exec("LOCK connections IN EXCLUSIVE MODE").clear
|
|
end
|
|
|
|
def gather_friends(conn, user_id)
|
|
friend_ids = []
|
|
conn.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results|
|
|
friend_results.each do |friend_result|
|
|
friend_ids.push(friend_result['friend_id'])
|
|
end
|
|
end
|
|
return friend_ids
|
|
end
|
|
|
|
# Creates a connection manager, and associates the connection created by active_record with ourselves
|
|
def self.active_record_transaction(&block)
|
|
|
|
connection_manager = ConnectionManager.new
|
|
ActiveRecord::Base.connection_pool.with_connection do |connection|
|
|
# create a transaction, and pass the current connection to ConnectionManager.
|
|
# this lets the entire operation work with the same transaction,
|
|
# across Rails ActiveRecord and the pg-gem based code in ConnectionManager.
|
|
connection_manager.pg_conn = connection.instance_variable_get("@connection")
|
|
|
|
connection.transaction do
|
|
block.call(connection_manager)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end |