diff --git a/lib/jam_ruby.rb b/lib/jam_ruby.rb index 669f23788..f886f77ad 100644 --- a/lib/jam_ruby.rb +++ b/lib/jam_ruby.rb @@ -5,6 +5,7 @@ require "uuidtools" require "logging" require "jam_ruby/errors/permission_error" require "jam_ruby/errors/state_error" +require "jam_ruby/errors/jam_argument_error" require "jam_ruby/mq_router" require "jam_ruby/connection_manager" require "jam_ruby/version" diff --git a/lib/jam_ruby/connection_manager.rb b/lib/jam_ruby/connection_manager.rb index c0b02fcb6..bb8100c45 100644 --- a/lib/jam_ruby/connection_manager.rb +++ b/lib/jam_ruby/connection_manager.rb @@ -13,12 +13,12 @@ module JamRuby # Or of course we could just port the relevant methods to node-js class ConnectionManager - attr_accessor :mq_router + attr_accessor :mq_router, :pg_conn - def initialize(conn) + def initialize(options={}) @log = Logging.logger[self] @mq_router = MQRouter.new - @pg_conn = conn + @pg_conn = options[:conn] @message_factory = MessageFactory.new unless PG.threadsafe? @@ -44,24 +44,25 @@ module JamRuby end end + def create_connection(user_id, client_id, ip_address) - @pg_conn.transaction do |conn| + conn = @pg_conn - lock_connections(conn) + lock_connections(conn) - conn.exec("INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3)", [user_id, client_id, ip_address]).clear + conn.exec("INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3)", [user_id, client_id, ip_address]).clear - # we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends - conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result| - count = result.getvalue(0, 0) - if count == "1" - # get all friend user_ids using the same query rails does for @user.friends - friend_update = @message_factory.friend_update(user_id, true) - friend_ids = gather_friends(conn, user_id) - @mq_router.publish_to_friends(friend_ids, friend_update, user_id) - end + # we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends + conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result| + count = result.getvalue(0, 0) + if count == "1" + # get all friend user_ids using the same query rails does for @user.friends + friend_update = @message_factory.friend_update(user_id, true) + friend_ids = gather_friends(conn, user_id) + @mq_router.publish_to_friends(friend_ids, friend_update, user_id) end end + #end end @@ -72,45 +73,45 @@ module JamRuby user_id = nil music_session_id = nil - @pg_conn.transaction do |conn| + conn = @pg_conn - lock_connections(conn) + lock_connections(conn) - previous_music_session_id = check_already_session(conn, client_id) + previous_music_session_id = check_already_session(conn, client_id) - conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result| + conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result| - if result.cmd_tuples == 0 - # the client is already gone from the database... do nothing but log error - @log.warn("unable to delete client #{client_id}") - return - elsif result.cmd_tuples == 1 - user_id = result[0]['user_id'] - music_session_id = result[0]['client_id'] + if result.cmd_tuples == 0 + # the client is already gone from the database... do nothing but log error + @log.warn("unable to delete client #{client_id}") + return + elsif result.cmd_tuples == 1 + user_id = result[0]['user_id'] + music_session_id = result[0]['client_id'] - else - raise Exception, 'uniqueness constraint has been lost on client_id' - end - end - - session_checks(conn, previous_music_session_id, user_id) - - # since we did delete a row, check and see if any more connections for that user exist - # if we are down to zero, send out user gone message - conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result| - count = result.getvalue(0, 0) - if count == "0" - friend_update = @message_factory.friend_update(user_id, false) - friend_ids = gather_friends(conn, user_id) - @mq_router.publish_to_friends(friend_ids, friend_update, user_id) - end - end - - # same for session-if we are down to the last participant, delete the session - unless music_session_id.nil? - conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear + else + raise Exception, 'uniqueness constraint has been lost on client_id' end end + + session_checks(conn, previous_music_session_id, user_id) + + # since we did delete a row, check and see if any more connections for that user exist + # if we are down to zero, send out user gone message + conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result| + count = result.getvalue(0, 0) + if count == "0" + friend_update = @message_factory.friend_update(user_id, false) + friend_ids = gather_friends(conn, user_id) + @mq_router.publish_to_friends(friend_ids, friend_update, user_id) + end + end + + # same for session-if we are down to the last participant, delete the session + unless music_session_id.nil? + conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear + end + # end end def destroy_if_empty(conn, music_session_id) @@ -161,83 +162,97 @@ module JamRuby end def join_music_session(user_id, client_id, music_session_id) - @pg_conn.transaction do |conn| + conn = @pg_conn - lock_connections(conn) + lock_connections(conn) - previous_music_session_id = check_already_session(conn, client_id) + previous_music_session_id = check_already_session(conn, client_id) - begin - conn.exec("UPDATE connections SET music_session_id = $1 WHERE client_id = $2", [music_session_id, client_id]) do |result| - if result.cmd_tuples == 1 - @log.debug "associated music_session with connection for client=#{client_id} and music_session=#{music_session_id}" - - session_checks(conn, previous_music_session_id, user_id) - elsif result.cmd_tuples == 0 - @log.debug "join_music_session no connection found with client_id=#{client_id}" - raise StateError, "no connection found with client=#{client_id}" - else - @log.error "database failure or logic error; this path should be impossible if the table is locked (join_music_session)" - raise Exception, "locked table changed state" - end - end - rescue PG::Error => pg_error - if pg_error.to_s.include?("insert or update on table \"connections\" violates foreign key constraint \"connections_music_session_id_fkey\"") - # if there is no music session that exists, we will receive this message - @log.debug "music_session does not exist. music_session=#{music_session_id}" - raise StateError, "music_session does not exist" - else - raise pg_error - end - - end - end - end - - def leave_music_session(user_id, client_id, music_session_id) - @pg_conn.transaction do |conn| - - lock_connections(conn) - - previous_music_session_id = check_already_session(conn, client_id) - - if previous_music_session_id == nil - @log.debug "the client is not in a session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}" - raise StateError, "not in session" - elsif previous_music_session_id != music_session_id - @log.debug "the client is in a different session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}" - raise StateError, "in a session different than that specified" - end - - # can throw exception if the session is deleted just before this - conn.exec("UPDATE connections SET music_session_id = NULL WHERE client_id = $1", [client_id]) do |result| + begin + conn.exec("UPDATE connections SET music_session_id = $1 WHERE client_id = $2", [music_session_id, client_id]) do |result| if result.cmd_tuples == 1 - @log.debug("deassociated music_session with connection for client_id #{client_id}") + @log.debug "associated music_session with connection for client=#{client_id} and music_session=#{music_session_id}" session_checks(conn, previous_music_session_id, user_id) elsif result.cmd_tuples == 0 - @log.debug "leave_music_session no connection found with client_id=#{client_id}" + @log.debug "join_music_session no connection found with client_id=#{client_id}" raise StateError, "no connection found with client=#{client_id}" else - @log.error("database failure or logic error; this path should be impossible if the table is locked (leave_music_session)") + @log.error "database failure or logic error; this path should be impossible if the table is locked (join_music_session)" raise Exception, "locked table changed state" end end + rescue PG::Error => pg_error + if pg_error.to_s.include?("insert or update on table \"connections\" violates foreign key constraint \"connections_music_session_id_fkey\"") + # if there is no music session that exists, we will receive this message + @log.debug "music_session does not exist. music_session=#{music_session_id}" + raise StateError, "music_session does not exist" + else + raise pg_error + end + end + # end end - def lock_connections(conn) - conn.exec("LOCK connections IN EXCLUSIVE MODE").clear - end + def leave_music_session(user_id, client_id, music_session_id) + conn = @pg_conn - def gather_friends(conn, user_id) - friend_ids = [] - conn.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results| - friend_results.each do |friend_result| - friend_ids.push(friend_result['friend_id']) + lock_connections(conn) + + previous_music_session_id = check_already_session(conn, client_id) + + if previous_music_session_id == nil + @log.debug "the client is not in a session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}" + raise StateError, "not in session" + elsif previous_music_session_id != music_session_id + @log.debug "the client is in a different session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}" + raise StateError, "in a session different than that specified" + end + + # can throw exception if the session is deleted just before this + conn.exec("UPDATE connections SET music_session_id = NULL WHERE client_id = $1", [client_id]) do |result| + if result.cmd_tuples == 1 + @log.debug("deassociated music_session with connection for client_id #{client_id}") + + session_checks(conn, previous_music_session_id, user_id) + elsif result.cmd_tuples == 0 + @log.debug "leave_music_session no connection found with client_id=#{client_id}" + raise StateError, "no connection found with client=#{client_id}" + else + @log.error("database failure or logic error; this path should be impossible if the table is locked (leave_music_session)") + raise Exception, "locked table changed state" end end - return friend_ids end end + + # Creates a connection manager, and associates the connection created by active_record with ourselves + def self.active_record_transaction(&block) + + connection_manager = ConnectionManager.new + ActiveRecord::Base.connection_pool.with_connection do |connection| + # create a transaction, and pass the current connection to ConnectionManager. + # this lets the entire operation work with the same transaction, + # across Rails ActiveRecord and the pg-gem based code in ConnectionManager. + connection_manager.pg_conn = connection.instance_variable_get("@connection") + + block.call(connection_manager) + end + end + + def lock_connections(conn) + conn.exec("LOCK connections IN EXCLUSIVE MODE").clear + end + + def gather_friends(conn, user_id) + friend_ids = [] + conn.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results| + friend_results.each do |friend_result| + friend_ids.push(friend_result['friend_id']) + end + end + return friend_ids + end +# end end \ No newline at end of file diff --git a/lib/jam_ruby/errors/jam_argument_error.rb b/lib/jam_ruby/errors/jam_argument_error.rb new file mode 100644 index 000000000..9f208363b --- /dev/null +++ b/lib/jam_ruby/errors/jam_argument_error.rb @@ -0,0 +1,7 @@ +module JamRuby + # if a bad argument is supplied. + # Why not use the default one? This allows us to know our API layer threw this, versus us using some core library incorrectly + class JamArgumentError < ArgumentError + + end +end \ No newline at end of file diff --git a/spec/jam_ruby/connection_manager_spec.rb b/spec/jam_ruby/connection_manager_spec.rb index 0cc8faf2c..4cb838957 100644 --- a/spec/jam_ruby/connection_manager_spec.rb +++ b/spec/jam_ruby/connection_manager_spec.rb @@ -3,10 +3,10 @@ require 'spec_helper' # these tests avoid the use of ActiveRecord and FactoryGirl to do blackbox, non test-instrumented tests describe ConnectionManager do - before do + before do @conn = PG::Connection.new(:dbname => SpecDb::TEST_DB_NAME, :user => "postgres", :password => "postgres", :host => "localhost") - @connman = ConnectionManager.new(@conn) + @connman = ConnectionManager.new(:conn => @conn) @message_factory = MessageFactory.new end @@ -42,30 +42,39 @@ describe ConnectionManager do it "can't create bogus user_id" do - expect { @connman.create_connection("aeonuthaoentuh", "client_id", "1.1.1.1") }.to raise_error(PG::Error) + @conn.transaction do + expect { @connman.create_connection("aeonuthaoentuh", "client_id", "1.1.1.1") }.to raise_error(PG::Error) + end + end it "can't create two client_ids of same value" do client_id = "client_id1" user_id = create_user("user1", "user1@jamkazam.com") - @connman.create_connection(user_id, client_id, "1.1.1.1") - expect { @connman.create_connection(user_id, client_id, "1.1.1.1") }.to raise_error(PG::Error) + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + expect { @connman.create_connection(user_id, client_id, "1.1.1.1") }.to raise_error(PG::Error) + end end it "create connection then delete it" do client_id = "client_id2" user_id = create_user("user2", "user2@jamkazam.com") - @connman.create_connection(user_id, client_id, "1.1.1.1") - + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + end # make sure the connection is seen @conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result| result.getvalue(0, 0).should == "1" end - @connman.delete_connection(client_id) + @conn.transaction do + @connman.delete_connection(client_id) + end + @conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result| result.getvalue(0, 0).should == "0" end @@ -82,13 +91,16 @@ describe ConnectionManager do friend_update = @message_factory.friend_update(user_id, true) @connman.mq_router.should_receive(:publish_to_friends).with([], friend_update, user_id) - @connman.create_connection(user_id, client_id, "1.1.1.1") - + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + end # but a second connection from the same user should cause no such message @connman.should_receive(:publish_to_friends).exactly(0).times - @connman.create_connection(user_id, client_id2, "1.1.1.1") + @conn.transaction do + @connman.create_connection(user_id, client_id2, "1.1.1.1") + end end @@ -102,19 +114,29 @@ describe ConnectionManager do # we should get a message saying that this user is online - @connman.create_connection(user_id, client_id, "1.1.1.1") - @connman.create_connection(user_id, client_id2, "1.1.1.1") + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + end + + @conn.transaction do + @connman.create_connection(user_id, client_id2, "1.1.1.1") + end # deleting one of the two connections should cause no messages @connman.should_receive(:publish_to_friends).exactly(0).times - @connman.delete_connection(client_id) + @conn.transaction do + @connman.delete_connection(client_id) + end # but deleting the final connection should cause a left message friend_update = @message_factory.friend_update(user_id, false) @connman.mq_router.should_receive(:publish_to_friends).with([], friend_update, user_id) - @connman.delete_connection(client_id2) + @conn.transaction do + @connman.delete_connection(client_id2) + end + end @@ -173,16 +195,22 @@ describe ConnectionManager do user_id = create_user("user8", "user8@jamkazam.com") - @connman.create_connection(user_id, client_id, "1.1.1.1") + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + end - @connman.remove_stale_connections(60) + @conn.transaction do + @connman.remove_stale_connections(60) + end assert_num_connections(client_id, 1) sleep(1) - # this should remove the stale connection - @connman.remove_stale_connections(1) + @conn.transaction do + # this should remove the stale connection + @connman.remove_stale_connections(1) + end assert_num_connections(client_id, 0) end @@ -193,8 +221,10 @@ describe ConnectionManager do user_id = create_user("user9", "user9@jamkazam.com") music_session_id = create_music_session(user_id) - @connman.create_connection(user_id, client_id, "1.1.1.1") - @connman.join_music_session(user_id, client_id, music_session_id) + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + @connman.join_music_session(user_id, client_id, music_session_id) + end assert_session_exists(music_session_id, true) @@ -202,8 +232,9 @@ describe ConnectionManager do result.getvalue(0, 0).should == music_session_id end - @connman.delete_connection(client_id) - + @conn.transaction do + @connman.delete_connection(client_id) + end assert_num_connections(client_id, 0) assert_session_exists(music_session_id, false) @@ -215,7 +246,10 @@ describe ConnectionManager do user_id = create_user("user10", "user10@jamkazam.com") music_session_id = create_music_session(user_id) - expect { @connman.join_music_session(user_id, client_id, music_session_id)}.to raise_error(JamRuby::StateError) + @conn.transaction do + expect { @connman.join_music_session(user_id, client_id, music_session_id) }.to raise_error(JamRuby::StateError) + end + end it "join_music_session fails if no music_session" do @@ -223,9 +257,14 @@ describe ConnectionManager do client_id = "client_id11" user_id = create_user("user11", "user11@jamkazam.com") - @connman.create_connection(user_id, client_id, "1.1.1.1") + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + end + + @conn.transaction do + expect { @connman.join_music_session(user_id, client_id, "some_bogus_music_session_id") }.to raise_error(JamRuby::StateError) + end - expect { @connman.join_music_session(user_id, client_id, "some_bogus_music_session_id")}.to raise_error(JamRuby::StateError) end it "leave_music_session fails if no music_session" do @@ -233,9 +272,13 @@ describe ConnectionManager do client_id = "client_id12" user_id = create_user("user12", "user12@jamkazam.com") - @connman.create_connection(user_id, client_id, "1.1.1.1") + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + end - expect { @connman.leave_music_session(user_id, client_id, "some_bogus_music_session_id")}.to raise_error(JamRuby::StateError) + @conn.transaction do + expect { @connman.leave_music_session(user_id, client_id, "some_bogus_music_session_id") }.to raise_error(JamRuby::StateError) + end end it "leave_music_session fails if in different music_session" do @@ -244,10 +287,17 @@ describe ConnectionManager do user_id = create_user("user13", "user13@jamkazam.com") music_session_id = create_music_session(user_id) - @connman.create_connection(user_id, client_id, "1.1.1.1") - @connman.join_music_session(user_id, client_id, music_session_id) + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + end - expect { @connman.leave_music_session(user_id, client_id, "some_bogus_music_session_id")}.to raise_error(JamRuby::StateError) + @conn.transaction do + @connman.join_music_session(user_id, client_id, music_session_id) + end + + @conn.transaction do + expect { @connman.leave_music_session(user_id, client_id, "some_bogus_music_session_id") }.to raise_error(JamRuby::StateError) + end end it "leave_music_session works" do @@ -256,8 +306,14 @@ describe ConnectionManager do user_id = create_user("user14", "user14@jamkazam.com") music_session_id = create_music_session(user_id) - @connman.create_connection(user_id, client_id, "1.1.1.1") - @connman.join_music_session(user_id, client_id, music_session_id) + + @conn.transaction do + @connman.create_connection(user_id, client_id, "1.1.1.1") + end + + @conn.transaction do + @connman.join_music_session(user_id, client_id, music_session_id) + end assert_session_exists(music_session_id, true) @@ -265,7 +321,9 @@ describe ConnectionManager do result.getvalue(0, 0).should == music_session_id end - @connman.leave_music_session(user_id, client_id, music_session_id) + @conn.transaction do + @connman.leave_music_session(user_id, client_id, music_session_id) + end @conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result| result.getvalue(0, 0).should == nil @@ -273,7 +331,9 @@ describe ConnectionManager do assert_session_exists(music_session_id, false) - @connman.delete_connection(client_id) + @conn.transaction do + @connman.delete_connection(client_id) + end assert_num_connections(client_id, 0)