From 3e269f2551d4d7aed965a1c07d0d4d8af7f05f0a Mon Sep 17 00:00:00 2001 From: Seth Call Date: Sun, 21 Oct 2012 10:05:06 -0500 Subject: [PATCH] * connection manager complete. now will integrate into jam-web and websocket-gateway --- Gemfile | 2 +- lib/jam_ruby.rb | 2 + lib/jam_ruby/connection_manager.rb | 276 +++++++++++++++++------ lib/jam_ruby/errors/permission_error.rb | 4 +- spec/jam_ruby/connection_manager_spec.rb | 138 +++++++++++- 5 files changed, 350 insertions(+), 72 deletions(-) diff --git a/Gemfile b/Gemfile index 3c442ecfe..e69e2ea75 100644 --- a/Gemfile +++ b/Gemfile @@ -13,7 +13,6 @@ gem 'bcrypt-ruby', '3.0.1' gem 'ruby-protocol-buffers', '1.2.2' gem 'eventmachine' gem 'amqp' -gem 'pry' group :test do gem 'jam_db', :path=> "#{workspace}/jam-db/target/ruby_package" @@ -21,6 +20,7 @@ group :test do gem "rspec", "2.10.0" gem 'spork', '0.9.0' gem 'database_cleaner', '0.7.0' + gem 'pry' end # Specify your gem's dependencies in jam_ruby.gemspec diff --git a/lib/jam_ruby.rb b/lib/jam_ruby.rb index bd1056063..669f23788 100644 --- a/lib/jam_ruby.rb +++ b/lib/jam_ruby.rb @@ -3,6 +3,8 @@ require "active_record" require "jampb" require "uuidtools" require "logging" +require "jam_ruby/errors/permission_error" +require "jam_ruby/errors/state_error" require "jam_ruby/mq_router" require "jam_ruby/connection_manager" require "jam_ruby/version" diff --git a/lib/jam_ruby/connection_manager.rb b/lib/jam_ruby/connection_manager.rb index a9c47f8de..c0b02fcb6 100644 --- a/lib/jam_ruby/connection_manager.rb +++ b/lib/jam_ruby/connection_manager.rb @@ -1,103 +1,243 @@ -class ConnectionManager +module JamRuby +# All writes should occur through the ConnectionManager +# Reads can occur freely elsewhere, though +# Because connections are tied to the websocket-connection and we bookkeep them in the database purely +# for 'SQL convenience', this is a obvious place we can go away from a database +# as an optimization if we find it's too much db traffic created' +# At a minimum, though, we could make connections an UNLOGGED table because if the database crashes, +# all clients should reconnect and restablish their connection anyway +# +# All methods in here could also be refactored as stored procedures, if we stick with a database. +# This may make sense in the short term if we are still managing connections in the database, but +# we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods). +# Or of course we could just port the relevant methods to node-js + class ConnectionManager - attr_accessor :mq_router + attr_accessor :mq_router - def initialize(conn) - @log = Logging.logger[self] - @mq_router = MQRouter.new - @pg_conn = conn - @message_factory = MessageFactory.new + def initialize(conn) + @log = Logging.logger[self] + @mq_router = MQRouter.new + @pg_conn = conn + @message_factory = MessageFactory.new - - unless PG.threadsafe? - raise Exception "a non-threadsafe build of libpq is present." + unless PG.threadsafe? + raise Exception, "a non-threadsafe build of libpq is present." + end end - end - - def remove_stale_connections() - @pg_conn.transaction do |conn| + def update_staleness() + #TODO end - end - def create_connection(user_id, client_id, ip_address) - @pg_conn.transaction do |conn| + # remove stale connections + def remove_stale_connections(max_seconds) + stale_clients = [] + @pg_conn.exec("SELECT client_id FROM connections WHERE updated_at < (NOW() - interval '#{max_seconds} second')") do |result| + result.each do |row| + stale_clients.push(row['client_id']) + end + end - lock_connections(conn) + stale_clients.each do |client_id| + delete_connection(client_id) + end + end - conn.exec("INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3)", [user_id, client_id, ip_address]).clear + def create_connection(user_id, client_id, ip_address) + @pg_conn.transaction do |conn| - # we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends - conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result| - count = result.getvalue(0, 0) - if count == "1" - # get all friend user_ids using the same query rails does for @user.friends - friend_update = @message_factory.friend_update(user_id, true) - friend_ids = gather_friends(conn, user_id) - @mq_router.publish_to_friends(friend_ids, friend_update, user_id) + lock_connections(conn) + + conn.exec("INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3)", [user_id, client_id, ip_address]).clear + + # we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends + conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result| + count = result.getvalue(0, 0) + if count == "1" + # get all friend user_ids using the same query rails does for @user.friends + friend_update = @message_factory.friend_update(user_id, true) + friend_ids = gather_friends(conn, user_id) + @mq_router.publish_to_friends(friend_ids, friend_update, user_id) + end end end end - end - # once a connection is known gone (whether timeout or because a TCP connection is observed lost) - # this code is responsible for all cleanup logic associated with a connection going away - def delete_connection(client_id) + # once a connection is known gone (whether timeout or because a TCP connection is observed lost) + # this code is responsible for all cleanup logic associated with a connection going away + def delete_connection(client_id) - user_id = nil - music_session_id = nil + user_id = nil + music_session_id = nil - @pg_conn.transaction do |conn| + @pg_conn.transaction do |conn| - lock_connections(conn) + lock_connections(conn) - conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result| + previous_music_session_id = check_already_session(conn, client_id) - if result.cmd_tuples == 0 - # the client is already gone from the database... do nothing but log error - @log.error("unable to delete client #{client_id}") - return - elsif result.cmd_tuples == 1 - user_id = result[0]['user_id'] - music_session_id = result[0]['client_id'] + conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result| + if result.cmd_tuples == 0 + # the client is already gone from the database... do nothing but log error + @log.warn("unable to delete client #{client_id}") + return + elsif result.cmd_tuples == 1 + user_id = result[0]['user_id'] + music_session_id = result[0]['client_id'] + + else + raise Exception, 'uniqueness constraint has been lost on client_id' + end + end + + session_checks(conn, previous_music_session_id, user_id) + + # since we did delete a row, check and see if any more connections for that user exist + # if we are down to zero, send out user gone message + conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result| + count = result.getvalue(0, 0) + if count == "0" + friend_update = @message_factory.friend_update(user_id, false) + friend_ids = gather_friends(conn, user_id) + @mq_router.publish_to_friends(friend_ids, friend_update, user_id) + end + end + + # same for session-if we are down to the last participant, delete the session + unless music_session_id.nil? + conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear + end + end + end + + def destroy_if_empty(conn, music_session_id) + + num_participants = nil + conn.exec("SELECT count(*) FROM connections WHERE music_session_id = $1", [music_session_id]) do |result| + num_participants = result.getvalue(0, 0).to_i + end + + if num_participants == 0 + # delete the music_session + conn.exec("DELETE from music_sessions WHERE id = $1", [music_session_id]) do |result| + if result.cmd_tuples == 0 + # no music session deleted. do nothing + elsif result.cmd_tuples == 1 + # music session deleted! + @log.debug("deleted music session #{music_session_id}") + else + @log.error("music_sessions table data integrity violation; multiple rows found with music_session_id=#{music_session_id}") + raise Exception, "music_sessions table data integrity violation; multiple rows found with music_session_id=#{music_session_id}" + end + end + end + end + + def check_already_session(conn, client_id) + conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result| + if result.num_tuples == 1 + previous_music_session_id = result.getvalue(0, 0) + return previous_music_session_id + elsif result.num_tuples == 0 + # there is no connection found matching this criteria; we are done. + @log.debug("when checking for existing session, no connection found with client=#{client_id}") + return nil else - raise Exception 'uniqueness constraint has been lost on client_id' + @log.error("connection table data integrity violation; multiple rows found. client_id=#{client_id}") + raise Exception, "connection table data integrity violation; multiple rows found. client_id=#{client_id}" end end + end + def session_checks(conn, previous_music_session_id, user_id) + unless previous_music_session_id.nil? + # TODO: send notification to friends that this user left this session? + @log.debug("user #{user_id} left music_session #{previous_music_session_id}") + destroy_if_empty(conn, previous_music_session_id) + end + end + + def join_music_session(user_id, client_id, music_session_id) + @pg_conn.transaction do |conn| + + lock_connections(conn) + + previous_music_session_id = check_already_session(conn, client_id) + + begin + conn.exec("UPDATE connections SET music_session_id = $1 WHERE client_id = $2", [music_session_id, client_id]) do |result| + if result.cmd_tuples == 1 + @log.debug "associated music_session with connection for client=#{client_id} and music_session=#{music_session_id}" + + session_checks(conn, previous_music_session_id, user_id) + elsif result.cmd_tuples == 0 + @log.debug "join_music_session no connection found with client_id=#{client_id}" + raise StateError, "no connection found with client=#{client_id}" + else + @log.error "database failure or logic error; this path should be impossible if the table is locked (join_music_session)" + raise Exception, "locked table changed state" + end + end + rescue PG::Error => pg_error + if pg_error.to_s.include?("insert or update on table \"connections\" violates foreign key constraint \"connections_music_session_id_fkey\"") + # if there is no music session that exists, we will receive this message + @log.debug "music_session does not exist. music_session=#{music_session_id}" + raise StateError, "music_session does not exist" + else + raise pg_error + end - # since we did delete a row, check and see if any more connections for that user exist - # if we are down to zero, send out user gone message - conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result| - count = result.getvalue(0, 0) - if count == "0" - friend_update = @message_factory.friend_update(user_id, false) - friend_ids = gather_friends(conn, user_id) - @mq_router.publish_to_friends(friend_ids, friend_update, user_id) end end + end - # same for session-if we are down to the last participant, delete the session - unless music_session_id.nil? - conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear + def leave_music_session(user_id, client_id, music_session_id) + @pg_conn.transaction do |conn| + + lock_connections(conn) + + previous_music_session_id = check_already_session(conn, client_id) + + if previous_music_session_id == nil + @log.debug "the client is not in a session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}" + raise StateError, "not in session" + elsif previous_music_session_id != music_session_id + @log.debug "the client is in a different session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}" + raise StateError, "in a session different than that specified" + end + + # can throw exception if the session is deleted just before this + conn.exec("UPDATE connections SET music_session_id = NULL WHERE client_id = $1", [client_id]) do |result| + if result.cmd_tuples == 1 + @log.debug("deassociated music_session with connection for client_id #{client_id}") + + session_checks(conn, previous_music_session_id, user_id) + elsif result.cmd_tuples == 0 + @log.debug "leave_music_session no connection found with client_id=#{client_id}" + raise StateError, "no connection found with client=#{client_id}" + else + @log.error("database failure or logic error; this path should be impossible if the table is locked (leave_music_session)") + raise Exception, "locked table changed state" + end + end end end - end - def lock_connections(conn) - conn.exec("LOCK connections IN ACCESS EXCLUSIVE MODE").clear - end - - - def gather_friends(conn, user_id) - friend_ids = [] - conn.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results| - friend_results.each do |friend_result| - friend_ids.push(friend_result['friend_id']) - end + def lock_connections(conn) + conn.exec("LOCK connections IN EXCLUSIVE MODE").clear + end + + def gather_friends(conn, user_id) + friend_ids = [] + conn.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results| + friend_results.each do |friend_result| + friend_ids.push(friend_result['friend_id']) + end + end + return friend_ids end - return friend_ids end end \ No newline at end of file diff --git a/lib/jam_ruby/errors/permission_error.rb b/lib/jam_ruby/errors/permission_error.rb index 80c4b1ff6..f0b4e3a2f 100644 --- a/lib/jam_ruby/errors/permission_error.rb +++ b/lib/jam_ruby/errors/permission_error.rb @@ -1,3 +1,5 @@ -class PermissionError < Exception +module JamRuby + class PermissionError < Exception + end end \ No newline at end of file diff --git a/spec/jam_ruby/connection_manager_spec.rb b/spec/jam_ruby/connection_manager_spec.rb index 876987b8e..0cc8faf2c 100644 --- a/spec/jam_ruby/connection_manager_spec.rb +++ b/spec/jam_ruby/connection_manager_spec.rb @@ -12,7 +12,31 @@ describe ConnectionManager do def create_user(name, email) @conn.exec("INSERT INTO users (name, email, password_digest) VALUES ($1, $2, $3) RETURNING id", [name, email, '1']) do |result| - return result.getvalue(0,0) + return result.getvalue(0, 0) + end + end + + def create_music_session(user_id) + description = "some session" + @conn.exec("INSERT INTO music_sessions (user_id, description) VALUES ($1, $2) RETURNING id", [user_id, description]) do |result| + return result.getvalue(0, 0) + end + end + + def assert_num_connections(client_id, expected_num_connections) + # make sure the connection is still there + @conn.exec("SELECT count(*) FROM connections where client_id = $1", [client_id]) do |result| + result.getvalue(0, 0).to_i.should == expected_num_connections + end + end + + def assert_session_exists(music_session_id, exists) + @conn.exec("SELECT count(*) FROM music_sessions where id = $1", [music_session_id]) do |result| + if exists + result.getvalue(0, 0).should == "1" + else + result.getvalue(0, 0).should == "0" + end end end @@ -43,7 +67,7 @@ describe ConnectionManager do @connman.delete_connection(client_id) @conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result| - result.getvalue(0,0).should == "0" + result.getvalue(0, 0).should == "0" end end @@ -141,7 +165,117 @@ describe ConnectionManager do @connman.gather_friends(@conn, user_id1).should =~ [user_id2, user_id3] @connman.gather_friends(@conn, user_id2).should == [user_id1] @connman.gather_friends(@conn, user_id3).should == [user_id1] + end + it "remove stale connection" do + + client_id = "client_id8" + + user_id = create_user("user8", "user8@jamkazam.com") + + @connman.create_connection(user_id, client_id, "1.1.1.1") + + @connman.remove_stale_connections(60) + + assert_num_connections(client_id, 1) + + sleep(1) + + # this should remove the stale connection + @connman.remove_stale_connections(1) + + assert_num_connections(client_id, 0) + end + + it "connections with music_sessions associated" do + + client_id = "client_id9" + user_id = create_user("user9", "user9@jamkazam.com") + music_session_id = create_music_session(user_id) + + @connman.create_connection(user_id, client_id, "1.1.1.1") + @connman.join_music_session(user_id, client_id, music_session_id) + + assert_session_exists(music_session_id, true) + + @conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result| + result.getvalue(0, 0).should == music_session_id + end + + @connman.delete_connection(client_id) + + assert_num_connections(client_id, 0) + + assert_session_exists(music_session_id, false) + end + + it "join_music_session fails if no connection" do + + client_id = "client_id10" + user_id = create_user("user10", "user10@jamkazam.com") + music_session_id = create_music_session(user_id) + + expect { @connman.join_music_session(user_id, client_id, music_session_id)}.to raise_error(JamRuby::StateError) + end + + it "join_music_session fails if no music_session" do + + client_id = "client_id11" + user_id = create_user("user11", "user11@jamkazam.com") + + @connman.create_connection(user_id, client_id, "1.1.1.1") + + expect { @connman.join_music_session(user_id, client_id, "some_bogus_music_session_id")}.to raise_error(JamRuby::StateError) + end + + it "leave_music_session fails if no music_session" do + + client_id = "client_id12" + user_id = create_user("user12", "user12@jamkazam.com") + + @connman.create_connection(user_id, client_id, "1.1.1.1") + + expect { @connman.leave_music_session(user_id, client_id, "some_bogus_music_session_id")}.to raise_error(JamRuby::StateError) + end + + it "leave_music_session fails if in different music_session" do + + client_id = "client_id13" + user_id = create_user("user13", "user13@jamkazam.com") + music_session_id = create_music_session(user_id) + + @connman.create_connection(user_id, client_id, "1.1.1.1") + @connman.join_music_session(user_id, client_id, music_session_id) + + expect { @connman.leave_music_session(user_id, client_id, "some_bogus_music_session_id")}.to raise_error(JamRuby::StateError) + end + + it "leave_music_session works" do + + client_id = "client_id14" + user_id = create_user("user14", "user14@jamkazam.com") + music_session_id = create_music_session(user_id) + + @connman.create_connection(user_id, client_id, "1.1.1.1") + @connman.join_music_session(user_id, client_id, music_session_id) + + assert_session_exists(music_session_id, true) + + @conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result| + result.getvalue(0, 0).should == music_session_id + end + + @connman.leave_music_session(user_id, client_id, music_session_id) + + @conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result| + result.getvalue(0, 0).should == nil + end + + assert_session_exists(music_session_id, false) + + @connman.delete_connection(client_id) + + assert_num_connections(client_id, 0) end end