124 lines
3.6 KiB
Ruby
124 lines
3.6 KiB
Ruby
require 'amqp'
|
|
require 'jam_ruby'
|
|
|
|
# Creates a connection to RabbitMQ
|
|
|
|
# On that single connection, a channel is created (which is a way to multiplex multiple queues/topics over the same TCP connection with rabbitmq)
|
|
# Then connections to the client_exchange and user_exchange are made, and put into the MQRouter static variables
|
|
# If this code completes (which implies that Rails can start to begin with, because this is in an initializer),
|
|
# then the Rails app itself is free to send messages over these exchanges
|
|
|
|
# Also starts websocket-gateway
|
|
module JamWebEventMachine
|
|
|
|
@@log = Logging.logger[JamWebEventMachine]
|
|
|
|
|
|
# THIS WAS USED BY resque jobs needing EventMachine/AMQP, but it's no longer needed. It's useful code though
|
|
|
|
# starts amqp & eventmachine up first.
|
|
# and then calls your block.
|
|
# After the supplied block is done,
|
|
# waits until all EM tasks scheduled in the supplied block are done, or timeout
|
|
def self.run_wait_stop(timeout = 30, &blk)
|
|
|
|
JamWebEventMachine.run
|
|
|
|
thread = Thread.current
|
|
|
|
blk.call
|
|
|
|
# put our thread wake up event on the EM scheduler,
|
|
# meaning we go last (assuming that all EM tasks needed were scheduled in the blk)
|
|
EM.schedule do
|
|
thread.wakeup
|
|
end
|
|
|
|
sleep timeout
|
|
|
|
EM.stop
|
|
end
|
|
|
|
|
|
def self.run_em(calling_thread = nil, semaphore = nil, ran = {})
|
|
|
|
semaphore.lock if semaphore
|
|
EM.run do
|
|
# this is global because we need to check elsewhere if we are currently connected to amqp before signalling success with some APIs, such as 'create session'
|
|
$amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => APP_CONFIG.rabbitmq_host, :port => APP_CONFIG.rabbitmq_port)
|
|
$amqp_connection_manager.connect do |channel|
|
|
|
|
AMQP::Exchange.new(channel, :topic, "clients") do |exchange|
|
|
@@log.debug("#{exchange.name} is ready to go")
|
|
MQRouter.client_exchange = exchange
|
|
end
|
|
|
|
AMQP::Exchange.new(channel, :topic, "users") do |exchange|
|
|
@@log.debug("#{exchange.name} is ready to go")
|
|
MQRouter.user_exchange = exchange
|
|
end
|
|
|
|
AMQP::Exchange.new(channel, :topic, "subscriptions") do |exchange|
|
|
@@log.debug("#{exchange.name} is ready to go")
|
|
MQRouter.subscription_exchange = exchange
|
|
end
|
|
end
|
|
|
|
ran[:ran] = true
|
|
semaphore.unlock if semaphore
|
|
calling_thread.wakeup if calling_thread
|
|
end
|
|
end
|
|
|
|
def self.die_gracefully_on_signal
|
|
@@log.debug("*** die_gracefully_on_signal")
|
|
Signal.trap("INT") { EM.stop }
|
|
Signal.trap("TERM") { EM.stop }
|
|
end
|
|
|
|
def self.run
|
|
|
|
ran = {}
|
|
semaphore = Mutex.new
|
|
current = Thread.current
|
|
Thread.new do
|
|
run_em(current, semaphore, ran)
|
|
end
|
|
semaphore.synchronize {
|
|
unless ran[:ran]
|
|
semaphore.sleep(10)
|
|
end
|
|
}
|
|
end
|
|
|
|
def self.start
|
|
if defined?(PhusionPassenger)
|
|
@@log.debug("PhusionPassenger detected")
|
|
|
|
PhusionPassenger.on_event(:starting_worker_process) do |forked|
|
|
# for passenger, we need to avoid orphaned threads
|
|
if forked && EM.reactor_running?
|
|
@@log.debug("stopping EventMachine")
|
|
EM.stop
|
|
end
|
|
@@log.debug("starting EventMachine")
|
|
current = Thread.current
|
|
Thread.new do
|
|
run_em(current)
|
|
end
|
|
die_gracefully_on_signal
|
|
end
|
|
elsif defined?(Unicorn)
|
|
@@log.debug("Unicorn detected--do nothing at initializer phase")
|
|
else
|
|
@@log.debug("Development environment detected")
|
|
Thread.abort_on_exception = true
|
|
|
|
# create a new thread separate from the Rails main thread that EventMachine can run on
|
|
run
|
|
end
|
|
end
|
|
end
|
|
|
|
|