* fixes for EventMachine initialization to work in ruby-only context (without web)

This commit is contained in:
Seth Call 2014-01-16 02:21:28 +00:00
parent 7d90425864
commit 01e50b45b3
6 changed files with 117 additions and 101 deletions

View File

@ -28,6 +28,7 @@ require "jam_ruby/lib/module_overrides"
require "jam_ruby/lib/s3_util"
require "jam_ruby/lib/s3_manager"
require "jam_ruby/lib/profanity"
require "jam_ruby/lib/em_helper.rb"
require "jam_ruby/resque/audiomixer"
require "jam_ruby/resque/scheduled/audiomixer_retry"
require "jam_ruby/mq_router"

View File

@ -0,0 +1,93 @@
require 'amqp'
require 'jam_ruby'
# Creates a connection to RabbitMQ
# On that single connection, a channel is created (which is a way to multiplex multiple queues/topics over the same TCP connection with rabbitmq)
# Then connections to the client_exchange and user_exchange are made, and put into the MQRouter static variables
# If this code completes (which implies that Rails can start to begin with, because this is in an initializer),
# then the Rails app itself is free to send messages over these exchanges
# Also starts websocket-gateway
module JamWebEventMachine
@@log = Logging.logger[JamWebEventMachine]
# starts amqp & eventmachine up first.
# and then calls your block.
# After the supplied block is done,
# waits until all EM tasks scheduled in the supplied block are done, or timeout
def self.run_wait_stop(timeout = 30, &blk)
JamWebEventMachine.start
thread = Thread.current
blk.call
# put our thread wake up event on the EM scheduler,
# meaning we go last (assuming that all EM tasks needed were scheduled in the blk)
EM.schedule do
thread.wakeup
end
sleep timeout
end
def self.run_em
EM.run do
# this is global because we need to check elsewhere if we are currently connected to amqp before signalling success with some APIs, such as 'create session'
$amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => APP_CONFIG.rabbitmq_host, :port => APP_CONFIG.rabbitmq_port)
$amqp_connection_manager.connect do |channel|
AMQP::Exchange.new(channel, :topic, "clients") do |exchange|
@@log.debug("#{exchange.name} is ready to go")
MQRouter.client_exchange = exchange
end
AMQP::Exchange.new(channel, :topic, "users") do |exchange|
@@log.debug("#{exchange.name} is ready to go")
MQRouter.user_exchange = exchange
end
end
end
end
def self.die_gracefully_on_signal
@@log.debug("*** die_gracefully_on_signal")
Signal.trap("INT") { EM.stop }
Signal.trap("TERM") { EM.stop }
end
def self.start
if defined?(PhusionPassenger)
@@log.debug("PhusionPassenger detected")
PhusionPassenger.on_event(:starting_worker_process) do |forked|
# for passenger, we need to avoid orphaned threads
if forked && EM.reactor_running?
@@log.debug("stopping EventMachine")
EM.stop
end
@@log.debug("starting EventMachine")
Thread.new do
run_em
end
die_gracefully_on_signal
end
elsif defined?(Unicorn)
@@log.debug("Unicorn detected--do nothing at initializer phase")
else
@@log.debug("Development environment detected")
Thread.abort_on_exception = true
# create a new thread separate from the Rails main thread that EventMachine can run on
Thread.new do
run_em
end
end
end
end

View File

@ -21,6 +21,14 @@ def app_config
ENV['AUDIOMIXER_PATH'] || audiomixer_workspace_path || "/var/lib/audiomixer/audiomixer/audiomixerapp"
end
def rabbitmq_host
"localhost"
end
def rabbitmq_port
5672
end
private
def audiomixer_workspace_path

View File

@ -62,9 +62,7 @@ gem 'resque'
gem 'resque-retry'
gem 'resque-failed-job-mailer'
gem 'resque-dynamic-queues'
gem 'quiet_assets', :group => :development
gem "bugsnag"
group :development, :test do

View File

@ -1,3 +1,5 @@
require 'bugsnag'
# Load the rails application
require File.expand_path('../application', __FILE__)
@ -5,5 +7,9 @@ Mime::Type.register "audio/ogg", :audio_ogg
APP_CONFIG = Rails.application.config
EventMachine.error_handler { |e|
Bugsnag.notify(e)
}
# Initialize the rails application
SampleApp::Application.initialize!

View File

@ -1,105 +1,15 @@
require 'amqp'
require 'jam_ruby'
require 'bugsnag'
unless $rails_rake_task
# Creates a connection to RabbitMQ
JamWebEventMachine.start
# On that single connection, a channel is created (which is a way to multiplex multiple queues/topics over the same TCP connection with rabbitmq)
# Then connections to the client_exchange and user_exchange are made, and put into the MQRouter static variables
# If this code completes (which implies that Rails can start to begin with, because this is in an initializer),
# then the Rails app itself is free to send messages over these exchanges
if APP_CONFIG.websocket_gateway_enable && !$rails_rake_task
# Also starts websocket-gateway
module JamWebEventMachine
# starts amqp & eventmachine up first.
# and then calls your block.
# After the supplied block is done,
# waits until all EM tasks scheduled in the supplied block are done, or timeout
def self.run_wait_stop(timeout = 30, &blk)
JamWebEventMachine.start
thread = Thread.current
blk.call
# put our thread wake up event on the EM scheduler,
# meaning we go last (assuming that all EM tasks needed were scheduled in the blk)
EM.schedule do
thread.wakeup
end
sleep timeout
end
def self.run_em
EventMachine.error_handler { |e|
Bugsnag.notify(e)
}
EM.run do
# this is global because we need to check elsewhere if we are currently connected to amqp before signalling success with some APIs, such as 'create session'
$amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => Rails.application.config.rabbitmq_host, :port => Rails.application.config.rabbitmq_port)
$amqp_connection_manager.connect do |channel|
AMQP::Exchange.new(channel, :topic, "clients") do |exchange|
Rails.logger.debug("#{exchange.name} is ready to go")
MQRouter.client_exchange = exchange
end
AMQP::Exchange.new(channel, :topic, "users") do |exchange|
Rails.logger.debug("#{exchange.name} is ready to go")
MQRouter.user_exchange = exchange
end
end
if Rails.application.config.websocket_gateway_enable && !$rails_rake_task
Thread.new { JamWebsockets::Server.new.run :port => Rails.application.config.websocket_gateway_port,
:emwebsocket_debug => Rails.application.config.websocket_gateway_internal_debug,
:connect_time_stale => Rails.application.config.websocket_gateway_connect_time_stale,
:connect_time_expire => Rails.application.config.websocket_gateway_connect_time_expire }
end
end
end
def self.die_gracefully_on_signal
Rails.logger.debug("*** die_gracefully_on_signal")
Signal.trap("INT") { EM.stop }
Signal.trap("TERM") { EM.stop }
end
def self.start
if defined?(PhusionPassenger)
Rails.logger.debug("PhusionPassenger detected")
PhusionPassenger.on_event(:starting_worker_process) do |forked|
# for passenger, we need to avoid orphaned threads
if forked && EM.reactor_running?
Rails.logger.debug("stopping EventMachine")
EM.stop
end
Rails.logger.debug("starting EventMachine")
Thread.new do
run_em
end
die_gracefully_on_signal
end
elsif defined?(Unicorn)
Rails.logger.debug("Unicorn detected--do nothing at initializer phase")
else
Rails.logger.debug("Development environment detected")
Thread.abort_on_exception = true
# create a new thread separate from the Rails main thread that EventMachine can run on
Thread.new do
run_em
end
Thread.new do
JamWebsockets::Server.new.run(
:port => APP_CONFIG.websocket_gateway_port,
:emwebsocket_debug => APP_CONFIG.websocket_gateway_internal_debug,
:connect_time_stale => APP_CONFIG.websocket_gateway_connect_time_stale,
:connect_time_expire => APP_CONFIG.websocket_gateway_connect_time_expire)
end
end
end
JamWebEventMachine.start unless $rails_rake_task