* connection manager complete. now will integrate into jam-web and websocket-gateway

This commit is contained in:
Seth Call 2012-10-21 10:05:06 -05:00
parent d8dc08bef6
commit 3e269f2551
5 changed files with 350 additions and 72 deletions

View File

@ -13,7 +13,6 @@ gem 'bcrypt-ruby', '3.0.1'
gem 'ruby-protocol-buffers', '1.2.2'
gem 'eventmachine'
gem 'amqp'
gem 'pry'
group :test do
gem 'jam_db', :path=> "#{workspace}/jam-db/target/ruby_package"
@ -21,6 +20,7 @@ group :test do
gem "rspec", "2.10.0"
gem 'spork', '0.9.0'
gem 'database_cleaner', '0.7.0'
gem 'pry'
end
# Specify your gem's dependencies in jam_ruby.gemspec

View File

@ -3,6 +3,8 @@ require "active_record"
require "jampb"
require "uuidtools"
require "logging"
require "jam_ruby/errors/permission_error"
require "jam_ruby/errors/state_error"
require "jam_ruby/mq_router"
require "jam_ruby/connection_manager"
require "jam_ruby/version"

View File

@ -1,103 +1,243 @@
class ConnectionManager
module JamRuby
# All writes should occur through the ConnectionManager
# Reads can occur freely elsewhere, though
# Because connections are tied to the websocket-connection and we bookkeep them in the database purely
# for 'SQL convenience', this is a obvious place we can go away from a database
# as an optimization if we find it's too much db traffic created'
# At a minimum, though, we could make connections an UNLOGGED table because if the database crashes,
# all clients should reconnect and restablish their connection anyway
#
# All methods in here could also be refactored as stored procedures, if we stick with a database.
# This may make sense in the short term if we are still managing connections in the database, but
# we move to the node-js in the websocket gateway (because the websocket gateway needs to call some of these methods).
# Or of course we could just port the relevant methods to node-js
class ConnectionManager
attr_accessor :mq_router
attr_accessor :mq_router
def initialize(conn)
@log = Logging.logger[self]
@mq_router = MQRouter.new
@pg_conn = conn
@message_factory = MessageFactory.new
def initialize(conn)
@log = Logging.logger[self]
@mq_router = MQRouter.new
@pg_conn = conn
@message_factory = MessageFactory.new
unless PG.threadsafe?
raise Exception "a non-threadsafe build of libpq is present."
unless PG.threadsafe?
raise Exception, "a non-threadsafe build of libpq is present."
end
end
end
def remove_stale_connections()
@pg_conn.transaction do |conn|
def update_staleness()
#TODO
end
end
def create_connection(user_id, client_id, ip_address)
@pg_conn.transaction do |conn|
# remove stale connections
def remove_stale_connections(max_seconds)
stale_clients = []
@pg_conn.exec("SELECT client_id FROM connections WHERE updated_at < (NOW() - interval '#{max_seconds} second')") do |result|
result.each do |row|
stale_clients.push(row['client_id'])
end
end
lock_connections(conn)
stale_clients.each do |client_id|
delete_connection(client_id)
end
end
conn.exec("INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3)", [user_id, client_id, ip_address]).clear
def create_connection(user_id, client_id, ip_address)
@pg_conn.transaction do |conn|
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0)
if count == "1"
# get all friend user_ids using the same query rails does for @user.friends
friend_update = @message_factory.friend_update(user_id, true)
friend_ids = gather_friends(conn, user_id)
@mq_router.publish_to_friends(friend_ids, friend_update, user_id)
lock_connections(conn)
conn.exec("INSERT INTO connections (user_id, client_id, ip_address) VALUES ($1, $2, $3)", [user_id, client_id, ip_address]).clear
# we just created a new connection-if this is the first time the user has shown up, we need to send out a message to his friends
conn.exec("SELECT count(user_id) FROM connections WHERE user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0)
if count == "1"
# get all friend user_ids using the same query rails does for @user.friends
friend_update = @message_factory.friend_update(user_id, true)
friend_ids = gather_friends(conn, user_id)
@mq_router.publish_to_friends(friend_ids, friend_update, user_id)
end
end
end
end
end
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
# this code is responsible for all cleanup logic associated with a connection going away
def delete_connection(client_id)
# once a connection is known gone (whether timeout or because a TCP connection is observed lost)
# this code is responsible for all cleanup logic associated with a connection going away
def delete_connection(client_id)
user_id = nil
music_session_id = nil
user_id = nil
music_session_id = nil
@pg_conn.transaction do |conn|
@pg_conn.transaction do |conn|
lock_connections(conn)
lock_connections(conn)
conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result|
previous_music_session_id = check_already_session(conn, client_id)
if result.cmd_tuples == 0
# the client is already gone from the database... do nothing but log error
@log.error("unable to delete client #{client_id}")
return
elsif result.cmd_tuples == 1
user_id = result[0]['user_id']
music_session_id = result[0]['client_id']
conn.exec("DELETE FROM connections WHERE client_id = $1 RETURNING user_id, music_session_id", [client_id]) do |result|
if result.cmd_tuples == 0
# the client is already gone from the database... do nothing but log error
@log.warn("unable to delete client #{client_id}")
return
elsif result.cmd_tuples == 1
user_id = result[0]['user_id']
music_session_id = result[0]['client_id']
else
raise Exception, 'uniqueness constraint has been lost on client_id'
end
end
session_checks(conn, previous_music_session_id, user_id)
# since we did delete a row, check and see if any more connections for that user exist
# if we are down to zero, send out user gone message
conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0)
if count == "0"
friend_update = @message_factory.friend_update(user_id, false)
friend_ids = gather_friends(conn, user_id)
@mq_router.publish_to_friends(friend_ids, friend_update, user_id)
end
end
# same for session-if we are down to the last participant, delete the session
unless music_session_id.nil?
conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear
end
end
end
def destroy_if_empty(conn, music_session_id)
num_participants = nil
conn.exec("SELECT count(*) FROM connections WHERE music_session_id = $1", [music_session_id]) do |result|
num_participants = result.getvalue(0, 0).to_i
end
if num_participants == 0
# delete the music_session
conn.exec("DELETE from music_sessions WHERE id = $1", [music_session_id]) do |result|
if result.cmd_tuples == 0
# no music session deleted. do nothing
elsif result.cmd_tuples == 1
# music session deleted!
@log.debug("deleted music session #{music_session_id}")
else
@log.error("music_sessions table data integrity violation; multiple rows found with music_session_id=#{music_session_id}")
raise Exception, "music_sessions table data integrity violation; multiple rows found with music_session_id=#{music_session_id}"
end
end
end
end
def check_already_session(conn, client_id)
conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
if result.num_tuples == 1
previous_music_session_id = result.getvalue(0, 0)
return previous_music_session_id
elsif result.num_tuples == 0
# there is no connection found matching this criteria; we are done.
@log.debug("when checking for existing session, no connection found with client=#{client_id}")
return nil
else
raise Exception 'uniqueness constraint has been lost on client_id'
@log.error("connection table data integrity violation; multiple rows found. client_id=#{client_id}")
raise Exception, "connection table data integrity violation; multiple rows found. client_id=#{client_id}"
end
end
end
def session_checks(conn, previous_music_session_id, user_id)
unless previous_music_session_id.nil?
# TODO: send notification to friends that this user left this session?
@log.debug("user #{user_id} left music_session #{previous_music_session_id}")
destroy_if_empty(conn, previous_music_session_id)
end
end
def join_music_session(user_id, client_id, music_session_id)
@pg_conn.transaction do |conn|
lock_connections(conn)
previous_music_session_id = check_already_session(conn, client_id)
begin
conn.exec("UPDATE connections SET music_session_id = $1 WHERE client_id = $2", [music_session_id, client_id]) do |result|
if result.cmd_tuples == 1
@log.debug "associated music_session with connection for client=#{client_id} and music_session=#{music_session_id}"
session_checks(conn, previous_music_session_id, user_id)
elsif result.cmd_tuples == 0
@log.debug "join_music_session no connection found with client_id=#{client_id}"
raise StateError, "no connection found with client=#{client_id}"
else
@log.error "database failure or logic error; this path should be impossible if the table is locked (join_music_session)"
raise Exception, "locked table changed state"
end
end
rescue PG::Error => pg_error
if pg_error.to_s.include?("insert or update on table \"connections\" violates foreign key constraint \"connections_music_session_id_fkey\"")
# if there is no music session that exists, we will receive this message
@log.debug "music_session does not exist. music_session=#{music_session_id}"
raise StateError, "music_session does not exist"
else
raise pg_error
end
# since we did delete a row, check and see if any more connections for that user exist
# if we are down to zero, send out user gone message
conn.exec("SELECT count(user_id) FROM connections where user_id = $1", [user_id]) do |result|
count = result.getvalue(0, 0)
if count == "0"
friend_update = @message_factory.friend_update(user_id, false)
friend_ids = gather_friends(conn, user_id)
@mq_router.publish_to_friends(friend_ids, friend_update, user_id)
end
end
end
# same for session-if we are down to the last participant, delete the session
unless music_session_id.nil?
conn.exec("DELETE FROM music_sessions id = $1 AND 0 = (SELECT count(music_session_id) FROM connections where music_session_id = $1)", [music_session_id]).clear
def leave_music_session(user_id, client_id, music_session_id)
@pg_conn.transaction do |conn|
lock_connections(conn)
previous_music_session_id = check_already_session(conn, client_id)
if previous_music_session_id == nil
@log.debug "the client is not in a session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
raise StateError, "not in session"
elsif previous_music_session_id != music_session_id
@log.debug "the client is in a different session. user=#{user_id}, client=#{client_id}, music_session=#{music_session_id}"
raise StateError, "in a session different than that specified"
end
# can throw exception if the session is deleted just before this
conn.exec("UPDATE connections SET music_session_id = NULL WHERE client_id = $1", [client_id]) do |result|
if result.cmd_tuples == 1
@log.debug("deassociated music_session with connection for client_id #{client_id}")
session_checks(conn, previous_music_session_id, user_id)
elsif result.cmd_tuples == 0
@log.debug "leave_music_session no connection found with client_id=#{client_id}"
raise StateError, "no connection found with client=#{client_id}"
else
@log.error("database failure or logic error; this path should be impossible if the table is locked (leave_music_session)")
raise Exception, "locked table changed state"
end
end
end
end
end
def lock_connections(conn)
conn.exec("LOCK connections IN ACCESS EXCLUSIVE MODE").clear
end
def gather_friends(conn, user_id)
friend_ids = []
conn.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results|
friend_results.each do |friend_result|
friend_ids.push(friend_result['friend_id'])
end
def lock_connections(conn)
conn.exec("LOCK connections IN EXCLUSIVE MODE").clear
end
def gather_friends(conn, user_id)
friend_ids = []
conn.exec("SELECT f1.friend_id as friend_id FROM friendships f1 WHERE f1.user_id = $1 AND f1.friend_id IN (SELECT f2.user_id FROM friendships f2 WHERE f2.friend_id = $1)", [user_id]) do |friend_results|
friend_results.each do |friend_result|
friend_ids.push(friend_result['friend_id'])
end
end
return friend_ids
end
return friend_ids
end
end

View File

@ -1,3 +1,5 @@
class PermissionError < Exception
module JamRuby
class PermissionError < Exception
end
end

View File

@ -12,7 +12,31 @@ describe ConnectionManager do
def create_user(name, email)
@conn.exec("INSERT INTO users (name, email, password_digest) VALUES ($1, $2, $3) RETURNING id", [name, email, '1']) do |result|
return result.getvalue(0,0)
return result.getvalue(0, 0)
end
end
def create_music_session(user_id)
description = "some session"
@conn.exec("INSERT INTO music_sessions (user_id, description) VALUES ($1, $2) RETURNING id", [user_id, description]) do |result|
return result.getvalue(0, 0)
end
end
def assert_num_connections(client_id, expected_num_connections)
# make sure the connection is still there
@conn.exec("SELECT count(*) FROM connections where client_id = $1", [client_id]) do |result|
result.getvalue(0, 0).to_i.should == expected_num_connections
end
end
def assert_session_exists(music_session_id, exists)
@conn.exec("SELECT count(*) FROM music_sessions where id = $1", [music_session_id]) do |result|
if exists
result.getvalue(0, 0).should == "1"
else
result.getvalue(0, 0).should == "0"
end
end
end
@ -43,7 +67,7 @@ describe ConnectionManager do
@connman.delete_connection(client_id)
@conn.exec("SELECT count(*) FROM connections where user_id = $1", [user_id]) do |result|
result.getvalue(0,0).should == "0"
result.getvalue(0, 0).should == "0"
end
end
@ -141,7 +165,117 @@ describe ConnectionManager do
@connman.gather_friends(@conn, user_id1).should =~ [user_id2, user_id3]
@connman.gather_friends(@conn, user_id2).should == [user_id1]
@connman.gather_friends(@conn, user_id3).should == [user_id1]
end
it "remove stale connection" do
client_id = "client_id8"
user_id = create_user("user8", "user8@jamkazam.com")
@connman.create_connection(user_id, client_id, "1.1.1.1")
@connman.remove_stale_connections(60)
assert_num_connections(client_id, 1)
sleep(1)
# this should remove the stale connection
@connman.remove_stale_connections(1)
assert_num_connections(client_id, 0)
end
it "connections with music_sessions associated" do
client_id = "client_id9"
user_id = create_user("user9", "user9@jamkazam.com")
music_session_id = create_music_session(user_id)
@connman.create_connection(user_id, client_id, "1.1.1.1")
@connman.join_music_session(user_id, client_id, music_session_id)
assert_session_exists(music_session_id, true)
@conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
result.getvalue(0, 0).should == music_session_id
end
@connman.delete_connection(client_id)
assert_num_connections(client_id, 0)
assert_session_exists(music_session_id, false)
end
it "join_music_session fails if no connection" do
client_id = "client_id10"
user_id = create_user("user10", "user10@jamkazam.com")
music_session_id = create_music_session(user_id)
expect { @connman.join_music_session(user_id, client_id, music_session_id)}.to raise_error(JamRuby::StateError)
end
it "join_music_session fails if no music_session" do
client_id = "client_id11"
user_id = create_user("user11", "user11@jamkazam.com")
@connman.create_connection(user_id, client_id, "1.1.1.1")
expect { @connman.join_music_session(user_id, client_id, "some_bogus_music_session_id")}.to raise_error(JamRuby::StateError)
end
it "leave_music_session fails if no music_session" do
client_id = "client_id12"
user_id = create_user("user12", "user12@jamkazam.com")
@connman.create_connection(user_id, client_id, "1.1.1.1")
expect { @connman.leave_music_session(user_id, client_id, "some_bogus_music_session_id")}.to raise_error(JamRuby::StateError)
end
it "leave_music_session fails if in different music_session" do
client_id = "client_id13"
user_id = create_user("user13", "user13@jamkazam.com")
music_session_id = create_music_session(user_id)
@connman.create_connection(user_id, client_id, "1.1.1.1")
@connman.join_music_session(user_id, client_id, music_session_id)
expect { @connman.leave_music_session(user_id, client_id, "some_bogus_music_session_id")}.to raise_error(JamRuby::StateError)
end
it "leave_music_session works" do
client_id = "client_id14"
user_id = create_user("user14", "user14@jamkazam.com")
music_session_id = create_music_session(user_id)
@connman.create_connection(user_id, client_id, "1.1.1.1")
@connman.join_music_session(user_id, client_id, music_session_id)
assert_session_exists(music_session_id, true)
@conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
result.getvalue(0, 0).should == music_session_id
end
@connman.leave_music_session(user_id, client_id, music_session_id)
@conn.exec("SELECT music_session_id FROM connections WHERE client_id = $1", [client_id]) do |result|
result.getvalue(0, 0).should == nil
end
assert_session_exists(music_session_id, false)
@connman.delete_connection(client_id)
assert_num_connections(client_id, 0)
end
end