* websocket-gateway updated to support p2p session
This commit is contained in:
commit
1bbf3248b2
|
|
@ -1,18 +1,18 @@
|
|||
module JamWebsockets
|
||||
module JamWebsockets
|
||||
class ClientContext
|
||||
|
||||
attr_accessor :user, :client, :msg_count, :session
|
||||
|
||||
def initialize(user, client)
|
||||
@user = user
|
||||
@client = client
|
||||
@msg_count = 0
|
||||
@session = nil
|
||||
@client = client
|
||||
@msg_count = 0
|
||||
@session = nil
|
||||
end
|
||||
|
||||
def to_s
|
||||
return "Client[user:#{@user} client:#{@client} msgs:#{@msg_count} session:#{@session}]"
|
||||
end
|
||||
def to_s
|
||||
return "Client[user:#{@user} client:#{@client} msgs:#{@msg_count} session:#{@session}]"
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ end
|
|||
|
||||
module JamWebsockets
|
||||
|
||||
|
||||
class Router
|
||||
|
||||
attr_accessor :user_context_lookup, :session_context_lookup
|
||||
|
|
@ -80,14 +79,18 @@ module JamWebsockets
|
|||
|
||||
if deleted.nil?
|
||||
@log.warn "unable to delete #{client_id} from client_lookup"
|
||||
elsif deleted == client
|
||||
elsif deleted != client
|
||||
# put it back--this is only possible if add_client hit the 'old connection' path
|
||||
# so in other words if this happens:
|
||||
# add_client(1, clientX)
|
||||
# add_client(1, clientY) # but clientX is essentially defunct - this could happen due to a bug in client, or EM doesn't notify always of connection close in time
|
||||
# remove_client(1, clientX) -- this check maintains that clientY stays as the current client in the hash
|
||||
@client_lookup[client_id] = client
|
||||
@log.debug "putting back client into @client_lookup for #{client_id} #{client.inspect}"
|
||||
else
|
||||
@log.debug "cleaned up @client_lookup for #{client_id}"
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def add_user(context)
|
||||
|
|
@ -144,38 +147,42 @@ module JamWebsockets
|
|||
|
||||
# register topic for user messages and session messages
|
||||
def register_topics
|
||||
@users_exchange = @channel.exchange('users', :type => :topic)
|
||||
@sessions_exchange = @channel.exchange('sessions', :type => :topic)
|
||||
@clients_exchange = @channel.exchange('clients', :type => :topic)
|
||||
|
||||
######################## USER MESSAGING ###########################
|
||||
|
||||
# create user exchange
|
||||
@users_exchange = @channel.exchange('users', :type => :topic)
|
||||
# create user messaging topic
|
||||
@user_topic = @channel.queue("", :auto_delete => true)
|
||||
@user_topic.bind(@users_exchange, :routing_key => "user.#")
|
||||
@user_topic.purge
|
||||
|
||||
# TODO: alert friends
|
||||
|
||||
# subscribe for any messages to users
|
||||
|
||||
@user_subscription = @user_topic.subscribe(:ack => false)
|
||||
|
||||
# this code serves as a callback that dequeues messages and processes them
|
||||
@user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
|
||||
begin
|
||||
routing_key = headers.envelope.routing_key
|
||||
user_id = routing_key["user.".length..-1]
|
||||
@sempahore.synchronize do
|
||||
|
||||
@semaphore.synchronize do
|
||||
contexts = @user_context_lookup[user_id]
|
||||
|
||||
unless contexts.nil?
|
||||
if !contexts.nil?
|
||||
|
||||
@log.debug "received user-directed message for session: #{user_id}"
|
||||
@log.debug "received user-directed message for user: #{user_id}"
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
contexts.each do |context|
|
||||
EM.schedule do
|
||||
@log.debug "sending user message to #{context}"
|
||||
send_to_client(context.client, msg)
|
||||
end
|
||||
end
|
||||
else
|
||||
@log.debug "Context is null"
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -185,12 +192,20 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
######################## SESSION MESSAGING ###########################
|
||||
|
||||
# create session exchange
|
||||
@sessions_exchange = @channel.exchange('sessions', :type => :topic)
|
||||
|
||||
# create session messaging topic
|
||||
@session_topic = @channel.queue("", :auto_delete => true)
|
||||
@session_topic.bind(@sessions_exchange, :routing_key => "session.#")
|
||||
@session_topic.purge
|
||||
|
||||
# subscribe for any messages to session
|
||||
@session_subscription = @session_topic.subscribe(:ack => false)
|
||||
|
||||
# this code serves as a callback that dequeues messages and processes them
|
||||
@session_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg|
|
||||
begin
|
||||
routing_key = headers.envelope.routing_key
|
||||
|
|
@ -202,11 +217,10 @@ module JamWebsockets
|
|||
|
||||
@log.debug "received session-directed message for session: #{session_id}"
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
# ok, its very odd to have your own message that you sent bounce back to you.
|
||||
# ok, its very odd to have your own message that you sent bounce back to you.
|
||||
# In one small favor to the client, we purposefully disallow messages a client
|
||||
# sent from bouncing back to itself.
|
||||
# sent from bouncing back to itself.
|
||||
properties = headers.properties unless headers.nil?
|
||||
inner_headers = properties.headers unless properties.nil?
|
||||
origin_client_id = inner_headers["client_id"]
|
||||
|
|
@ -214,7 +228,10 @@ module JamWebsockets
|
|||
# counter-intuitively, even though a string is passed in when you send the header, an (apparently) auto-generated class is sent back which, if you to_s, returns the original value
|
||||
origin_client_id = origin_client_id.to_s unless origin_client_id.nil?
|
||||
|
||||
@log.debug "message received from client #{origin_client_id}"
|
||||
|
||||
@log.debug "session message received from client #{origin_client_id}"
|
||||
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
contexts.each do |context|
|
||||
if context.client.client_id != origin_client_id
|
||||
EM.schedule do
|
||||
|
|
@ -232,6 +249,10 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
############## CLIENT MESSAGING ###################
|
||||
|
||||
@clients_exchange = @channel.exchange('clients', :type => :topic)
|
||||
|
||||
@client_topic = @channel.queue("", :auto_delete => true)
|
||||
@client_topic.bind(@clients_exchange, :routing_key => "client.#")
|
||||
@client_topic.purge
|
||||
|
|
@ -245,19 +266,23 @@ module JamWebsockets
|
|||
@semaphore.synchronize do
|
||||
client = @client_lookup[client_id]
|
||||
|
||||
properties = headers.properties unless headers.nil?
|
||||
inner_headers = properties.headers unless properties.nil?
|
||||
origin_client_id = inner_headers["client_id"]
|
||||
|
||||
@log.debug "p2p message received from client #{origin_client_id} to client #{client_id}"
|
||||
|
||||
unless client.nil?
|
||||
msg = Jampb::ClientMessage.parse(msg)
|
||||
|
||||
properties = headers.properties unless headers.nil?
|
||||
inner_headers = properties.headers unless properties.nil?
|
||||
origin_client_id = inner_headers["client_id"]
|
||||
|
||||
@log.debug "p2p message received from client #{origin_client_id} to client #{client_id}"
|
||||
EM.schedule do
|
||||
@log.debug "sending p2p message to #{client_id}"
|
||||
send_to_client(client, msg)
|
||||
end
|
||||
else
|
||||
@log.debug "p2p message unroutable to disconnected client #{client_id}"
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
||||
|
|
@ -268,60 +293,7 @@ module JamWebsockets
|
|||
end
|
||||
end
|
||||
|
||||
def send_to_client(client, msg)
|
||||
if client.encode_json
|
||||
client.send(msg.to_json.to_s)
|
||||
else
|
||||
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
|
||||
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
|
||||
end
|
||||
end
|
||||
|
||||
def cleanup()
|
||||
# shutdown topic listeners and mq connection
|
||||
begin
|
||||
if !@user_subscription.nil? && @user_subscription.active?
|
||||
@log.debug "cleaning up user subscription"
|
||||
@user_subscription.cancel
|
||||
@user_subscription.shutdown!
|
||||
end
|
||||
|
||||
if !@session_subscription.nil? && @session_subscription.active?
|
||||
@log.debug "cleaning up session subscription"
|
||||
@session_subscription.cancel
|
||||
@session_subscription.shutdown!
|
||||
end
|
||||
|
||||
if !@client_subscription.nil? && @client_subscription.active?
|
||||
@log.debug "cleaning up client subscription"
|
||||
@client_subscription.cancel
|
||||
@client_subscription.shutdown!
|
||||
end
|
||||
|
||||
rescue => e
|
||||
@log.debug "unable to cancel subscription on cleanup: #{e}"
|
||||
end
|
||||
|
||||
@thread_pool.shutdown
|
||||
|
||||
if !@channel.nil?
|
||||
@channel.close
|
||||
end
|
||||
|
||||
if !@connection.nil?
|
||||
@connection.close
|
||||
end
|
||||
|
||||
# tear down each individual client
|
||||
@clients.each do |client, context|
|
||||
cleanup_client(client)
|
||||
end
|
||||
end
|
||||
|
||||
def stop
|
||||
@log.info "shutdown"
|
||||
cleanup
|
||||
end
|
||||
|
||||
def new_client(client)
|
||||
|
||||
|
|
@ -371,7 +343,7 @@ module JamWebsockets
|
|||
|
||||
begin
|
||||
if client.encode_json
|
||||
#example: {"type":"LOGIN", "route_to":"server", "login" : {"username":"hi"}}
|
||||
#example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}}
|
||||
parse = JSON.parse(msg)
|
||||
pb_msg = Jampb::ClientMessage.json_create(parse)
|
||||
self.route(pb_msg, client)
|
||||
|
|
@ -409,10 +381,122 @@ module JamWebsockets
|
|||
cleanup_client(client)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
}
|
||||
end
|
||||
|
||||
def add_user(client_context)
|
||||
user_contexts = @user_context_lookup[client_context.user.id]
|
||||
|
||||
if user_contexts.nil?
|
||||
user_contexts = Set.new
|
||||
@user_context_lookup[client_context.user.id] = user_contexts
|
||||
end
|
||||
|
||||
user_contexts.add(client_context)
|
||||
end
|
||||
|
||||
def remove_user(client_context)
|
||||
user_contexts = @user_context_lookup[client_context.user.id]
|
||||
|
||||
if user_contexts.nil?
|
||||
@log.warn "user can not be removed #{client_context}"
|
||||
else
|
||||
# delete the context from set of user contexts
|
||||
user_contexts.delete(client_context)
|
||||
|
||||
# if last user context, delete entire set (memory leak concern)
|
||||
if user_contexts.length == 0
|
||||
@user_context_lookup.delete(client_context.user.id)
|
||||
end
|
||||
|
||||
client_context.user = nil
|
||||
end
|
||||
end
|
||||
|
||||
def add_session(client_context)
|
||||
session_contexts = @session_context_lookup[client_context.session.id]
|
||||
|
||||
if session_contexts.nil?
|
||||
session_contexts = Set.new
|
||||
@session_context_lookup[client_context.session.id] = session_contexts
|
||||
end
|
||||
|
||||
session_contexts.add(client_context)
|
||||
end
|
||||
|
||||
def remove_session(client_context)
|
||||
session_contexts = @session_context_lookup[client_context.session.id]
|
||||
|
||||
if session_contexts.nil?
|
||||
@log.warn "session can not be removed #{client_context}"
|
||||
else
|
||||
# delete the context from set of session contexts
|
||||
session_contexts.delete(client_context)
|
||||
|
||||
# if last session context, delete entire set (memory leak concern)
|
||||
if session_contexts.length == 0
|
||||
@session_context_lookup.delete(client_context.session.id)
|
||||
end
|
||||
|
||||
client_context.session = nil
|
||||
end
|
||||
end
|
||||
|
||||
def send_to_client(client, msg)
|
||||
@log.debug "SEND TO CLIENT START"
|
||||
if client.encode_json
|
||||
client.send(msg.to_json.to_s)
|
||||
else
|
||||
# this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent
|
||||
client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s)
|
||||
end
|
||||
@log.debug "SEND TO CLIENT STOP"
|
||||
end
|
||||
|
||||
def cleanup()
|
||||
# shutdown topic listeners and mq connection
|
||||
begin
|
||||
if !@user_subscription.nil? && @user_subscription.active?
|
||||
@log.debug "cleaning up user subscription"
|
||||
@user_subscription.cancel
|
||||
@user_subscription.shutdown!
|
||||
end
|
||||
|
||||
if !@session_subscription.nil? && @session_subscription.active?
|
||||
@log.debug "cleaning up session subscription"
|
||||
@session_subscription.cancel
|
||||
@session_subscription.shutdown!
|
||||
end
|
||||
|
||||
if !@client_subscription.nil? && @client_subscription.active?
|
||||
@log.debug "cleaning up client subscription"
|
||||
@client_subscription.cancel
|
||||
@client_subscription.shutdown!
|
||||
end
|
||||
|
||||
rescue => e
|
||||
@log.debug "unable to cancel subscription on cleanup: #{e}"
|
||||
end
|
||||
|
||||
@thread_pool.shutdown
|
||||
|
||||
if !@channel.nil?
|
||||
@channel.close
|
||||
end
|
||||
|
||||
if !@connection.nil?
|
||||
@connection.close
|
||||
end
|
||||
|
||||
# tear down each individual client
|
||||
@clients.each do |client, context|
|
||||
cleanup_client(client)
|
||||
end
|
||||
end
|
||||
|
||||
def stop
|
||||
@log.info "shutdown"
|
||||
cleanup
|
||||
end
|
||||
|
||||
# removes all resources associated with a client
|
||||
|
|
@ -425,19 +509,28 @@ module JamWebsockets
|
|||
@log.debug "cleaning up pending client #{client}"
|
||||
else
|
||||
|
||||
@log.debug "cleanup up logged-in client #{client}"
|
||||
|
||||
remove_client(client.client_id, client)
|
||||
|
||||
context = @clients.delete(client)
|
||||
|
||||
if !context.nil?
|
||||
|
||||
# remove this connection from the database
|
||||
if !context.user.nil? && !context.client.nil?
|
||||
JamRuby::Connection.delete_all "user_id = '#{context.user.id}' AND client_id = '#{context.client.client_id}'"
|
||||
end
|
||||
|
||||
send_friend_update(context.user, false, context.client)
|
||||
|
||||
remove_user(context)
|
||||
|
||||
if !context.session.nil?
|
||||
remove_session(context)
|
||||
end
|
||||
else
|
||||
@log.debug "skipping duplicate cleanup attempt of authorized client"
|
||||
@log.debug "skipping duplicate cleanup attempt of logged-in client"
|
||||
end
|
||||
|
||||
end
|
||||
|
|
@ -469,13 +562,13 @@ module JamWebsockets
|
|||
|
||||
elsif @message_factory.session_directed? client_msg
|
||||
|
||||
session = client_msg.route_to[MessageFactory::SESSION_TARGET_PREFIX.length..-1]
|
||||
handle_session_directed(session, client_msg, client)
|
||||
session_id = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1]
|
||||
handle_session_directed(session_id, client_msg, client)
|
||||
|
||||
elsif @message_factory.user_directed? client_msg
|
||||
|
||||
user = client_msg.route_to[MessageFactory::USER_PREFIX_TARGET.length..-1]
|
||||
handle_user_directed(user, client_msg, client)
|
||||
user_id = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1]
|
||||
handle_user_directed(user_id, client_msg, client)
|
||||
|
||||
else
|
||||
raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}"
|
||||
|
|
@ -514,7 +607,7 @@ module JamWebsockets
|
|||
client_id = login.client_id if login.value_for_tag(4)
|
||||
|
||||
# you don't have to supply client_id in login--if you don't, we'll generate one
|
||||
if client_id.nil?
|
||||
if client_id.nil? || client_id.empty?
|
||||
# give a unique ID to this client. This is used to prevent session messages
|
||||
# from echoing back to the sender, for instance.
|
||||
client_id = UUIDTools::UUID.random_create.to_s
|
||||
|
|
@ -534,23 +627,56 @@ module JamWebsockets
|
|||
login_ack = @message_factory.login_ack(remote_ip, client_id, user.remember_token)
|
||||
send_to_client(client, login_ack)
|
||||
|
||||
# remove from pending_queue
|
||||
@semaphore.synchronize do
|
||||
# remove from pending_queue
|
||||
@pending_clients.delete(client)
|
||||
|
||||
# add a tracker for this user
|
||||
context = ClientContext.new(user, client)
|
||||
@clients[client] = context
|
||||
add_user(context)
|
||||
add_client(client_id, client)
|
||||
add_client(client_id, client) # TODO
|
||||
|
||||
# log this connection in the database
|
||||
connection = JamRuby::Connection.new(:user => user, :client_id => client.client_id)
|
||||
@log.debug "Created connection => #{connection.user}, #{connection.client_id}"
|
||||
|
||||
if connection.save
|
||||
send_friend_update(user, true, context.client)
|
||||
end
|
||||
end
|
||||
else
|
||||
raise SessionError, 'invalid login'
|
||||
end
|
||||
end
|
||||
|
||||
def send_friend_update(user, online, client)
|
||||
@log.debug "sending friend update for user #{user} online = #{online}"
|
||||
|
||||
if !user.nil? && user.friends.exists?
|
||||
@log.debug "user has friends - sending friend updates"
|
||||
|
||||
# create the friend_update message
|
||||
friend_update_msg = @message_factory.friend_update(user.id, online)
|
||||
|
||||
# send the friend_update to each friend that has active connections
|
||||
user.friends.each do |friend|
|
||||
@log.debug "sending friend update message to #{friend}"
|
||||
|
||||
handle_user_directed(friend.id, friend_update_msg, client)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def handle_heartbeat(heartbeat, client)
|
||||
# todo: manage staleness
|
||||
context = @clients[client]
|
||||
@log.debug "updating timestamp for user #{context}"
|
||||
connection = Connection.find_by_user_id_and_client_id(context.user.id, context.client.client_id)
|
||||
|
||||
unless connection.nil?
|
||||
connection.updated_at = DateTime.now
|
||||
connection.save
|
||||
end
|
||||
end
|
||||
|
||||
def handle_join_music_session(join_music_session, client)
|
||||
|
|
@ -561,6 +687,7 @@ module JamWebsockets
|
|||
|
||||
begin
|
||||
session = access_music_session(session_id, context.user)
|
||||
|
||||
@log.debug "user #{context} joining new session #{session}"
|
||||
@semaphore.synchronize do
|
||||
old_session = context.session
|
||||
|
|
@ -627,7 +754,6 @@ module JamWebsockets
|
|||
else
|
||||
raise SessionError, 'no login data was found in Login message'
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def access_music_session(music_session_id, user)
|
||||
|
|
@ -697,9 +823,12 @@ module JamWebsockets
|
|||
@clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}})
|
||||
end
|
||||
|
||||
def handle_user_directed(user, client_msg, client)
|
||||
def handle_user_directed(user_id, client_msg, client)
|
||||
|
||||
raise SessionError, 'not implemented'
|
||||
@log.debug "publishing to user #{user_id} from client_id #{client.client_id}"
|
||||
|
||||
# put it on the topic exchange for users
|
||||
@users_exchange.publish(client_msg.to_s, :routing_key => "user.#{user_id}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
require 'em-websocket'
|
||||
|
||||
module JamWebsockets
|
||||
|
||||
class Server
|
||||
|
||||
def initialize(options={})
|
||||
|
|
@ -16,17 +17,17 @@ module JamWebsockets
|
|||
|
||||
@log.info "starting server #{host}:#{port}"
|
||||
|
||||
@router.start
|
||||
@router.start
|
||||
|
||||
# if you don't do this, the app won't exit unless you kill -9
|
||||
at_exit do
|
||||
@log.info "cleaning up server"
|
||||
@router.cleanup
|
||||
end
|
||||
# if you don't do this, the app won't exit unless you kill -9
|
||||
at_exit do
|
||||
@log.info "cleaning up server"
|
||||
@router.cleanup
|
||||
end
|
||||
|
||||
EventMachine.run {
|
||||
EventMachine::WebSocket.start(:host => "0.0.0.0", :port => options[:port], :debug => options[:emwebsocket_debug]) do |ws|
|
||||
@log.info "new client #{ws}"
|
||||
@log.info "new client #{ws}"
|
||||
@router.new_client(ws)
|
||||
end
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,8 @@ require 'thread'
|
|||
LoginClient = Class.new do
|
||||
attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id
|
||||
|
||||
def initiaize()
|
||||
|
||||
def initialize()
|
||||
|
||||
end
|
||||
|
||||
|
|
@ -41,12 +42,12 @@ def login(router, user, password, client_id)
|
|||
message_factory = MessageFactory.new
|
||||
client = LoginClient.new
|
||||
|
||||
login_ack = message_factory.login_ack("127.0.0.1", client_id)
|
||||
login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token)
|
||||
|
||||
router.should_receive(:send_to_client).with(client, login_ack)
|
||||
client.should_receive(:onclose)
|
||||
client.should_receive(:onerror)
|
||||
client.should_receive(:request).and_return({"query" => {"pb" => "true"}})
|
||||
client.should_receive(:request).and_return({ "query" => { "pb" => "true" } })
|
||||
client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00")
|
||||
|
||||
@router.new_client(client)
|
||||
|
|
@ -134,7 +135,7 @@ describe Router do
|
|||
|
||||
attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id
|
||||
|
||||
def initiaize()
|
||||
def initialize()
|
||||
|
||||
end
|
||||
|
||||
|
|
@ -182,7 +183,6 @@ describe Router do
|
|||
client1 = login(@router, @user, "foobar", "1")
|
||||
end
|
||||
|
||||
|
||||
it "should allow music_session_join of valid user", :mq => true do
|
||||
|
||||
user1 = FactoryGirl.create(:user) # in the music session
|
||||
|
|
|
|||
Loading…
Reference in New Issue