diff --git a/Gemfile b/Gemfile index 449911a01..4ae6d0390 100644 --- a/Gemfile +++ b/Gemfile @@ -26,5 +26,5 @@ group :test do gem 'guard', '>= 0.10.0' gem 'guard-rspec', '>= 0.7.3' gem 'pg_migrate','0.1.5' #:path => "#{workspace}/pg_migrate_ruby" - gem 'amqp-spec' + gem 'evented-spec' end diff --git a/bin/websocket_gateway b/bin/websocket_gateway index 480ca4f1a..0b3c71ec7 100755 --- a/bin/websocket_gateway +++ b/bin/websocket_gateway @@ -30,4 +30,4 @@ end Logging.logger.root.appenders = Logging.appenders.stdout ActiveRecord::Base.establish_connection(db_config) -Server.new.run :port => config["port"], :emwebsocket_debug => config["emwebsocket_debug"] +Server.new.run :port => config["port"], :emwebsocket_debug => config["emwebsocket_debug"], :max_stale_connection_time => config["max_stale_connection_time"] diff --git a/config/application.yml b/config/application.yml index 34bf0c45d..9d31c4ccc 100644 --- a/config/application.yml +++ b/config/application.yml @@ -2,11 +2,14 @@ development: port: 6767 verbose: true emwebsocket_debug: false + max_stale_connection_time: 30 test: port: 6769 verbose: true + max_stale_connection_time: 30 production: port: 80 verbose: false + max_stale_connection_time: 30 \ No newline at end of file diff --git a/lib/jam_websockets/router.rb b/lib/jam_websockets/router.rb index 5b1a6b708..03c394aea 100644 --- a/lib/jam_websockets/router.rb +++ b/lib/jam_websockets/router.rb @@ -36,12 +36,16 @@ module JamWebsockets @user_topic = nil @client_topic = nil @thread_pool = nil + @heartbeat_interval = nil + end - def start(options={:host => "localhost", :port => 5672}) + def start(max_stale_connection_time, options={:host => "localhost", :port => 5672}) @log.info "startup" + @heartbeat_interval = max_stale_connection_time / 2 + begin @connection = AMQP.connect(:host => options[:host], :port => options[:port]) @channel = AMQP::Channel.new(@connection) @@ -167,23 +171,23 @@ module JamWebsockets # subscribe for any p2p messages to a client @client_topic.subscribe(:ack => false) do |headers, msg| begin - routing_key = headers.envelope.routing_key + routing_key = headers.routing_key client_id = routing_key["client.".length..-1] @semaphore.synchronize do client = @client_lookup[client_id] msg = Jampb::ClientMessage.parse(msg) - @log.debug "p2p message received from #{msg.from} to client #{client_id}" + @log.debug "client-directed message received from #{msg.from} to client #{client_id}" unless client.nil? EM.schedule do - @log.debug "sending p2p message to #{client_id}" + @log.debug "sending client-directed down websocket to #{client_id}" send_to_client(client, msg) end else - @log.debug "p2p message unroutable to disconnected client #{client_id}" + @log.debug "client-directed message unroutable to disconnected client #{client_id}" end end rescue => e @@ -338,11 +342,11 @@ module JamWebsockets # remove this connection from the database if !context.user.nil? && !context.client.nil? - JamRuby::Connection.delete_all "user_id = '#{context.user.id}' AND client_id = '#{context.client.client_id}'" + ConnectionManager.active_record_transaction do |connection_manager| + connection_manager.delete_connection(client.client_id) + end end - send_friend_update(context.user, false, context.client) - remove_user(context) else @log.debug "skipping duplicate cleanup attempt of logged-in client" @@ -430,8 +434,9 @@ module JamWebsockets # respond with LOGIN_ACK to let client know it was successful #binding.pry - remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername) - login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token) + + remote_ip = extract_ip(client) + login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token, @heartbeat_interval) send_to_client(client, login_ack) @semaphore.synchronize do @@ -445,11 +450,8 @@ module JamWebsockets add_client(client_id, client) # TODO # log this connection in the database - connection = JamRuby::Connection.new(:user => user, :client_id => client.client_id) - @log.debug "Created connection => #{connection.user}, #{connection.client_id}" - - if connection.save - send_friend_update(user, true, context.client) + ConnectionManager.active_record_transaction do |connection_manager| + connection_manager.create_connection(user.id, client.client_id, extract_ip(client)) end end else @@ -457,6 +459,7 @@ module JamWebsockets end end + # TODO: deprecated; jam_ruby has routine inspired by this def send_friend_update(user, online, client) @log.debug "sending friend update for user #{user} online = #{online}" @@ -614,5 +617,9 @@ module JamWebsockets end end end + + def extract_ip(client) + return Socket.unpack_sockaddr_in(client.get_peername)[1] + end end end diff --git a/lib/jam_websockets/server.rb b/lib/jam_websockets/server.rb index d3b37c8be..d4eca90f8 100644 --- a/lib/jam_websockets/server.rb +++ b/lib/jam_websockets/server.rb @@ -14,11 +14,12 @@ module JamWebsockets host = "0.0.0.0" port = options[:port] + max_stale_connection_time = options[:max_stale_connection_time] - @log.info "starting server #{host}:#{port}" + @log.info "starting server #{host}:#{port} with staleness_time=#{max_stale_connection_time}" EventMachine.run do - @router.start + @router.start(max_stale_connection_time) # if you don't do this, the app won't exit unless you kill -9 at_exit do @@ -26,10 +27,33 @@ module JamWebsockets @router.cleanup end - EventMachine::WebSocket.start(:host => "0.0.0.0", :port => options[:port], :debug => options[:emwebsocket_debug]) do |ws| - @log.info "new client #{ws}" - @router.new_client(ws) - end + start_connection_cleaner(max_stale_connection_time) + + start_websocket_listener(host, port, options[:emwebsocket_debug]) + end + end + + def start_websocket_listener(listen_ip, port, emwebsocket_debug) + EventMachine::WebSocket.start(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws| + @log.info "new client #{ws}" + @router.new_client(ws) + end + end + + + def start_connection_cleaner(stale_max_time) + # one cleanup on startup + cleanup_stale_connections(stale_max_time) + + EventMachine::PeriodicTimer.new(15) do + cleanup_stale_connections(stale_max_time) + end + + end + + def cleanup_stale_connections(stale_max_time) + ConnectionManager.active_record_transaction do |connection_manager| + connection_manager.remove_stale_connections(stale_max_time) end end end diff --git a/spec/jam_websockets/router_spec.rb b/spec/jam_websockets/router_spec.rb index 84940dd0d..d7f80e176 100644 --- a/spec/jam_websockets/router_spec.rb +++ b/spec/jam_websockets/router_spec.rb @@ -42,13 +42,15 @@ def login(router, user, password, client_id) message_factory = MessageFactory.new client = LoginClient.new - login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token) + login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token, 15) router.should_receive(:send_to_client).with(client, login_ack) + router.should_receive(:extract_ip).at_least(:once).with(client).and_return("127.0.0.1") client.should_receive(:onclose) client.should_receive(:onerror) client.should_receive(:request).and_return({ "query" => { "pb" => "true" } }) - client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00") + + #client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00") @router.new_client(client) client.onopenblock.call @@ -75,31 +77,31 @@ end describe Router do - include AMQP::Spec + include EventedSpec::EMSpec message_factory = MessageFactory.new - amqp_before do + em_before do @router = Router.new() - @router.start() + @router.start(30) end subject { @router } - amqp_after do + em_after do @router.stop end - describe "servicability" do + describe "serviceability" do it "should start and stop", :mq => true do - em do + #em do done - end + #end end it "should register for client events", :mq => true do - em do + #em do client = double("client") client.should_receive(:onopen) client.should_receive(:onclose) @@ -109,13 +111,13 @@ describe Router do @router.new_client(client) done - end + #end end end describe "topic routing helpers" do it "create and delete user lookup set" do - em do + #em do user = double(User) user.should_receive(:id).any_number_of_times.and_return("1") client = double("client") @@ -131,14 +133,14 @@ describe Router do @router.user_context_lookup.length.should == 0 done - end + #end end end describe "login" do it "should not allow login of bogus user", :mq => true do - em do + #em do TestClient = Class.new do attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id @@ -181,42 +183,42 @@ describe Router do client.onmsgblock.call login.to_s done - end + #end end it "should allow login of valid user", :mq => true do - em do + #em do @user = User.new(:name => "Example User", :email => "user@example.com", :password => "foobar", :password_confirmation => "foobar") @user.save client1 = login(@router, @user, "foobar", "1") done - end + #end end it "should allow music_session_join of valid user", :mq => true do - em do + #em do user1 = FactoryGirl.create(:user) # in the music session user2 = FactoryGirl.create(:user) # in the music session user3 = FactoryGirl.create(:user) # not in the music session music_session = FactoryGirl.create(:music_session, :creator => user1) - music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :client_id => "1") - music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :client_id => "2") + music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "4") + music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "5") # make a music_session and define two members # create client 1, log him in, and log him in to music session client1 = login(@router, user1, "foobar", "1") - login_music_session(@router, client1, music_session) \ + login_music_session(@router, client1, music_session) done - end + #end end it "should allow two valid subscribers to communicate with session-directed messages", :mq => true do - em do + #em do user1 = FactoryGirl.create(:user) # in the music session user2 = FactoryGirl.create(:user) # in the music session @@ -232,15 +234,15 @@ describe Router do # make a music_session and define two members - music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :client_id => "1") - music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :client_id => "2") + music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "6") + music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "7") done - end + #end end it "should allow two valid subscribers to communicate with p2p messages", :mq => true do - em do + #em do user1 = FactoryGirl.create(:user) # in the music session user2 = FactoryGirl.create(:user) # in the music session @@ -254,7 +256,7 @@ describe Router do #login_music_session(@router, client2, music_session) # by creating - music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :client_id => "1") + music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "8") # now attempt to message p2p! @@ -268,10 +270,10 @@ describe Router do ## send ping to client 2 #client2.onmsgblock.call ping.to_s - #music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :client_id => "2") + #music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "2") done - end + #end end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 0e2fb42f1..2706e081b 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -3,7 +3,7 @@ require 'jam_db' require 'spec_db' require 'jam_websockets' require 'timeout' -require 'amqp-spec/rspec' +require 'evented-spec' jamenv = ENV['JAMENV'] jamenv ||= 'development'