* ConnectionManager integrated

* ruby-1.9.3 instead of JRuby
* Heartbeat Interval added to LoginAck
This commit is contained in:
Seth Call 2012-10-23 06:42:28 -05:00
parent 1660b2b01a
commit 7a3b20814a
7 changed files with 90 additions and 54 deletions

View File

@ -26,5 +26,5 @@ group :test do
gem 'guard', '>= 0.10.0'
gem 'guard-rspec', '>= 0.7.3'
gem 'pg_migrate','0.1.5' #:path => "#{workspace}/pg_migrate_ruby"
gem 'amqp-spec'
gem 'evented-spec'
end

View File

@ -30,4 +30,4 @@ end
Logging.logger.root.appenders = Logging.appenders.stdout
ActiveRecord::Base.establish_connection(db_config)
Server.new.run :port => config["port"], :emwebsocket_debug => config["emwebsocket_debug"]
Server.new.run :port => config["port"], :emwebsocket_debug => config["emwebsocket_debug"], :max_stale_connection_time => config["max_stale_connection_time"]

View File

@ -2,11 +2,14 @@ development:
port: 6767
verbose: true
emwebsocket_debug: false
max_stale_connection_time: 30
test:
port: 6769
verbose: true
max_stale_connection_time: 30
production:
port: 80
verbose: false
max_stale_connection_time: 30

View File

@ -36,12 +36,16 @@ module JamWebsockets
@user_topic = nil
@client_topic = nil
@thread_pool = nil
@heartbeat_interval = nil
end
def start(options={:host => "localhost", :port => 5672})
def start(max_stale_connection_time, options={:host => "localhost", :port => 5672})
@log.info "startup"
@heartbeat_interval = max_stale_connection_time / 2
begin
@connection = AMQP.connect(:host => options[:host], :port => options[:port])
@channel = AMQP::Channel.new(@connection)
@ -167,23 +171,23 @@ module JamWebsockets
# subscribe for any p2p messages to a client
@client_topic.subscribe(:ack => false) do |headers, msg|
begin
routing_key = headers.envelope.routing_key
routing_key = headers.routing_key
client_id = routing_key["client.".length..-1]
@semaphore.synchronize do
client = @client_lookup[client_id]
msg = Jampb::ClientMessage.parse(msg)
@log.debug "p2p message received from #{msg.from} to client #{client_id}"
@log.debug "client-directed message received from #{msg.from} to client #{client_id}"
unless client.nil?
EM.schedule do
@log.debug "sending p2p message to #{client_id}"
@log.debug "sending client-directed down websocket to #{client_id}"
send_to_client(client, msg)
end
else
@log.debug "p2p message unroutable to disconnected client #{client_id}"
@log.debug "client-directed message unroutable to disconnected client #{client_id}"
end
end
rescue => e
@ -338,11 +342,11 @@ module JamWebsockets
# remove this connection from the database
if !context.user.nil? && !context.client.nil?
JamRuby::Connection.delete_all "user_id = '#{context.user.id}' AND client_id = '#{context.client.client_id}'"
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.delete_connection(client.client_id)
end
end
send_friend_update(context.user, false, context.client)
remove_user(context)
else
@log.debug "skipping duplicate cleanup attempt of logged-in client"
@ -430,8 +434,9 @@ module JamWebsockets
# respond with LOGIN_ACK to let client know it was successful
#binding.pry
remote_port, remote_ip = Socket.unpack_sockaddr_in(client.get_peername)
login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token)
remote_ip = extract_ip(client)
login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token, @heartbeat_interval)
send_to_client(client, login_ack)
@semaphore.synchronize do
@ -445,11 +450,8 @@ module JamWebsockets
add_client(client_id, client) # TODO
# log this connection in the database
connection = JamRuby::Connection.new(:user => user, :client_id => client.client_id)
@log.debug "Created connection => #{connection.user}, #{connection.client_id}"
if connection.save
send_friend_update(user, true, context.client)
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.create_connection(user.id, client.client_id, extract_ip(client))
end
end
else
@ -457,6 +459,7 @@ module JamWebsockets
end
end
# TODO: deprecated; jam_ruby has routine inspired by this
def send_friend_update(user, online, client)
@log.debug "sending friend update for user #{user} online = #{online}"
@ -614,5 +617,9 @@ module JamWebsockets
end
end
end
def extract_ip(client)
return Socket.unpack_sockaddr_in(client.get_peername)[1]
end
end
end

View File

@ -14,11 +14,12 @@ module JamWebsockets
host = "0.0.0.0"
port = options[:port]
max_stale_connection_time = options[:max_stale_connection_time]
@log.info "starting server #{host}:#{port}"
@log.info "starting server #{host}:#{port} with staleness_time=#{max_stale_connection_time}"
EventMachine.run do
@router.start
@router.start(max_stale_connection_time)
# if you don't do this, the app won't exit unless you kill -9
at_exit do
@ -26,10 +27,33 @@ module JamWebsockets
@router.cleanup
end
EventMachine::WebSocket.start(:host => "0.0.0.0", :port => options[:port], :debug => options[:emwebsocket_debug]) do |ws|
@log.info "new client #{ws}"
@router.new_client(ws)
end
start_connection_cleaner(max_stale_connection_time)
start_websocket_listener(host, port, options[:emwebsocket_debug])
end
end
def start_websocket_listener(listen_ip, port, emwebsocket_debug)
EventMachine::WebSocket.start(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws|
@log.info "new client #{ws}"
@router.new_client(ws)
end
end
def start_connection_cleaner(stale_max_time)
# one cleanup on startup
cleanup_stale_connections(stale_max_time)
EventMachine::PeriodicTimer.new(15) do
cleanup_stale_connections(stale_max_time)
end
end
def cleanup_stale_connections(stale_max_time)
ConnectionManager.active_record_transaction do |connection_manager|
connection_manager.remove_stale_connections(stale_max_time)
end
end
end

View File

@ -42,13 +42,15 @@ def login(router, user, password, client_id)
message_factory = MessageFactory.new
client = LoginClient.new
login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token)
login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token, 15)
router.should_receive(:send_to_client).with(client, login_ack)
router.should_receive(:extract_ip).at_least(:once).with(client).and_return("127.0.0.1")
client.should_receive(:onclose)
client.should_receive(:onerror)
client.should_receive(:request).and_return({ "query" => { "pb" => "true" } })
client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00")
#client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00")
@router.new_client(client)
client.onopenblock.call
@ -75,31 +77,31 @@ end
describe Router do
include AMQP::Spec
include EventedSpec::EMSpec
message_factory = MessageFactory.new
amqp_before do
em_before do
@router = Router.new()
@router.start()
@router.start(30)
end
subject { @router }
amqp_after do
em_after do
@router.stop
end
describe "servicability" do
describe "serviceability" do
it "should start and stop", :mq => true do
em do
#em do
done
end
#end
end
it "should register for client events", :mq => true do
em do
#em do
client = double("client")
client.should_receive(:onopen)
client.should_receive(:onclose)
@ -109,13 +111,13 @@ describe Router do
@router.new_client(client)
done
end
#end
end
end
describe "topic routing helpers" do
it "create and delete user lookup set" do
em do
#em do
user = double(User)
user.should_receive(:id).any_number_of_times.and_return("1")
client = double("client")
@ -131,14 +133,14 @@ describe Router do
@router.user_context_lookup.length.should == 0
done
end
#end
end
end
describe "login" do
it "should not allow login of bogus user", :mq => true do
em do
#em do
TestClient = Class.new do
attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id
@ -181,42 +183,42 @@ describe Router do
client.onmsgblock.call login.to_s
done
end
#end
end
it "should allow login of valid user", :mq => true do
em do
#em do
@user = User.new(:name => "Example User", :email => "user@example.com",
:password => "foobar", :password_confirmation => "foobar")
@user.save
client1 = login(@router, @user, "foobar", "1")
done
end
#end
end
it "should allow music_session_join of valid user", :mq => true do
em do
#em do
user1 = FactoryGirl.create(:user) # in the music session
user2 = FactoryGirl.create(:user) # in the music session
user3 = FactoryGirl.create(:user) # not in the music session
music_session = FactoryGirl.create(:music_session, :creator => user1)
music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :client_id => "1")
music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :client_id => "2")
music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "4")
music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "5")
# make a music_session and define two members
# create client 1, log him in, and log him in to music session
client1 = login(@router, user1, "foobar", "1")
login_music_session(@router, client1, music_session) \
login_music_session(@router, client1, music_session)
done
end
#end
end
it "should allow two valid subscribers to communicate with session-directed messages", :mq => true do
em do
#em do
user1 = FactoryGirl.create(:user) # in the music session
user2 = FactoryGirl.create(:user) # in the music session
@ -232,15 +234,15 @@ describe Router do
# make a music_session and define two members
music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :client_id => "1")
music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :client_id => "2")
music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "6")
music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "7")
done
end
#end
end
it "should allow two valid subscribers to communicate with p2p messages", :mq => true do
em do
#em do
user1 = FactoryGirl.create(:user) # in the music session
user2 = FactoryGirl.create(:user) # in the music session
@ -254,7 +256,7 @@ describe Router do
#login_music_session(@router, client2, music_session)
# by creating
music_session_member1 = FactoryGirl.create(:music_session_client, :user => user1, :music_session => music_session, :client_id => "1")
music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "8")
# now attempt to message p2p!
@ -268,10 +270,10 @@ describe Router do
## send ping to client 2
#client2.onmsgblock.call ping.to_s
#music_session_member2 = FactoryGirl.create(:music_session_client, :user => user2, :music_session => music_session, :client_id => "2")
#music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "2")
done
end
#end
end
end
end

View File

@ -3,7 +3,7 @@ require 'jam_db'
require 'spec_db'
require 'jam_websockets'
require 'timeout'
require 'amqp-spec/rspec'
require 'evented-spec'
jamenv = ENV['JAMENV']
jamenv ||= 'development'