* VRFS-997 - mix notifications

This commit is contained in:
Seth Call 2014-01-15 22:15:33 +00:00
parent dc781f04ad
commit 63d6f428b3
5 changed files with 62 additions and 43 deletions

View File

@ -475,8 +475,8 @@ module JamRuby
def recording_master_mix_complete(receiver_id, recording_id, band_id, msg, notification_id, created_at)
recording_master_mix_complete = Jampb::RecordingMasterMixComplete.new(
:band_invitation_id => invitation_id,
:photo_url => photo_url,
:recording_id => recording_id,
:band_id => band_id,
:msg => msg,
:notification_id => notification_id,
:created_at => created_at

View File

@ -77,6 +77,8 @@ class MQRouter
# sends a message to a user with no checking of permissions (RAW USAGE)
# this method deliberately has no database interactivity/active_record objects
def publish_to_user(user_id, user_msg)
@@log.warn "EM not running in publish_to_user" unless EM.reactor_running?
EM.schedule do
@@log.debug "publishing to user:#{user_id} from server"
# put it on the topic exchange for users

View File

@ -17,10 +17,14 @@ module JamRuby
:error_reason, :error_detail
def self.perform(mix_id, postback_output_url)
audiomixer = AudioMixer.new()
audiomixer.postback_output_url = postback_output_url
audiomixer.mix_id = mix_id
audiomixer.run
JamWebEventMachine.run_wait_stop do
audiomixer = AudioMixer.new()
audiomixer.postback_output_url = postback_output_url
audiomixer.mix_id = mix_id
audiomixer.run
end
end
def initialize

View File

@ -12,60 +12,79 @@ require 'bugsnag'
# Also starts websocket-gateway
module JamWebEventMachine
# starts amqp & eventmachine up first.
# and then calls your block.
# After the supplied block is done,
# waits until all EM tasks scheduled in the supplied block are done, or timeout
def self.run_wait_stop(timeout = 30, &blk)
JamWebEventMachine.start
thread = Thread.current
blk.call
# put our thread wake up event on the EM scheduler,
# meaning we go last (assuming that all EM tasks needed were scheduled in the blk)
EM.schedule do
thread.wakeup
end
sleep timeout
end
def self.run_em
EventMachine.error_handler{|e|
Bugsnag.notify(e)
}
EventMachine.error_handler { |e|
Bugsnag.notify(e)
}
EM.run do
# this is global because we need to check elsewhere if we are currently connected to amqp before signalling success with some APIs, such as 'create session'
$amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => Rails.application.config.rabbitmq_host, :port => Rails.application.config.rabbitmq_port)
$amqp_connection_manager.connect do |channel|
EM.run do
# this is global because we need to check elsewhere if we are currently connected to amqp before signalling success with some APIs, such as 'create session'
$amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => Rails.application.config.rabbitmq_host, :port => Rails.application.config.rabbitmq_port)
$amqp_connection_manager.connect do |channel|
AMQP::Exchange.new(channel, :topic, "clients") do |exchange|
Rails.logger.debug("#{exchange.name} is ready to go")
MQRouter.client_exchange = exchange
end
AMQP::Exchange.new(channel, :topic, "users") do |exchange|
Rails.logger.debug("#{exchange.name} is ready to go")
MQRouter.user_exchange = exchange
Rails.logger.debug "MQRouter.user_exchange = #{MQRouter.user_exchange}"
end
AMQP::Exchange.new(channel, :topic, "clients") do |exchange|
Rails.logger.debug("#{exchange.name} is ready to go")
MQRouter.client_exchange = exchange
end
if Rails.application.config.websocket_gateway_enable && !$rails_rake_task
AMQP::Exchange.new(channel, :topic, "users") do |exchange|
Rails.logger.debug("#{exchange.name} is ready to go")
MQRouter.user_exchange = exchange
end
end
Thread.new { JamWebsockets::Server.new.run :port => Rails.application.config.websocket_gateway_port,
if Rails.application.config.websocket_gateway_enable && !$rails_rake_task
Thread.new { JamWebsockets::Server.new.run :port => Rails.application.config.websocket_gateway_port,
:emwebsocket_debug => Rails.application.config.websocket_gateway_internal_debug,
:connect_time_stale => Rails.application.config.websocket_gateway_connect_time_stale,
:connect_time_expire => Rails.application.config.websocket_gateway_connect_time_expire }
end
end
end
end
def self.die_gracefully_on_signal
Rails.logger.debug("*** die_gracefully_on_signal")
Signal.trap("INT") { EM.stop }
Signal.trap("INT") { EM.stop }
Signal.trap("TERM") { EM.stop }
end
def self.start
if defined?(PhusionPassenger)
if defined?(PhusionPassenger)
Rails.logger.debug("PhusionPassenger detected")
PhusionPassenger.on_event(:starting_worker_process) do |forked|
# for passenger, we need to avoid orphaned threads
# for passenger, we need to avoid orphaned threads
if forked && EM.reactor_running?
Rails.logger.debug("stopping EventMachine")
Rails.logger.debug("stopping EventMachine")
EM.stop
end
Rails.logger.debug("starting EventMachine")
Thread.new {
run_em
}
Rails.logger.debug("starting EventMachine")
Thread.new do
run_em
end
die_gracefully_on_signal
end
elsif defined?(Unicorn)
@ -82,4 +101,5 @@ module JamWebEventMachine
end
end
JamWebEventMachine.start
JamWebEventMachine.start unless $rails_rake_task

View File

@ -1,7 +0,0 @@
if Rails.application.config.websocket_gateway_enable
# Thread.new { JamWebsockets::Server.new.run :port => Rails.application.config.websocket_gateway_port,
# :emwebsocket_debug => Rails.application.config.websocket_gateway_internal_debug,
# :connect_time_stale => Rails.application.config.websocket_gateway_connect_time_stale,
# :connect_time_expire => Rails.application.config.websocket_gateway_connect_time_expire }
end