diff --git a/.rvmrc b/.rvmrc index a5e31abc5..435cd9286 100644 --- a/.rvmrc +++ b/.rvmrc @@ -1,2 +1 @@ -rvm use jruby-head@websockets --create -#rvm use ruby-1.9.3@websockets --create +rvm use ruby-1.9.3@websockets --create diff --git a/Gemfile b/Gemfile index 682b2ce94..9ceeece71 100644 --- a/Gemfile +++ b/Gemfile @@ -4,17 +4,13 @@ source 'https://rubygems.org' workspace = ENV["WORKSPACE"] || "~/workspace" gem 'uuidtools', '2.1.2' gem 'bcrypt-ruby', '3.0.1' -gem 'jruby-openssl' gem 'ruby-protocol-buffers', '1.2.2' gem 'jam_ruby', :path => "#{workspace}/jam-ruby" gem 'jampb', :path => "#{workspace}/jam-pb/target/ruby/jampb" gem 'em-websocket'#, :path=> "#{workspace}/em-websocket-jam" -gem 'hot_bunnies', '1.3.8' +gem 'amqp' gem 'activerecord', '3.2.7' gem 'logging' -#gem 'em-http-request' -gem 'activerecord-jdbc-adapter' -gem 'activerecord-jdbcpostgresql-adapter' group :development do gem 'pry' @@ -30,5 +26,4 @@ group :test do gem 'guard', '>= 0.10.0' gem 'guard-rspec', '>= 0.7.3' gem 'pg_migrate','0.1.5' #:path => "#{workspace}/pg_migrate_ruby" - gem 'guard-jruby-rspec' end diff --git a/config/database.yml b/config/database.yml index 518037a56..c1ad9e417 100644 --- a/config/database.yml +++ b/config/database.yml @@ -1,5 +1,5 @@ test: - adapter: jdbcpostgresql + adapter: postgresql database: jam_websockets_test host: localhost port: 5432 @@ -10,7 +10,7 @@ test: encoding: unicode development: - adapter: jdbcpostgresql + adapter: postgresql database: jam host: localhost port: 5432 diff --git a/lib/jam_websockets/router.rb b/lib/jam_websockets/router.rb index 82fa6f2b2..55fcdd099 100644 --- a/lib/jam_websockets/router.rb +++ b/lib/jam_websockets/router.rb @@ -1,12 +1,10 @@ require 'pry' require 'set' -require 'hot_bunnies' +require 'amqp' require 'thread' require 'json' require 'eventmachine' -import java.util.concurrent.Executors - include Jampb # add new field to client connection @@ -24,7 +22,7 @@ module JamWebsockets attr_accessor :user_context_lookup - def initialize(options={}) + def initialize() @log = Logging.logger[self] @pending_clients = Set.new # clients that have connected to server, but not logged in. @clients = {} # clients that have logged in @@ -36,21 +34,19 @@ module JamWebsockets @message_factory = JamRuby::MessageFactory.new @semaphore = Mutex.new @user_topic = nil - @user_subscription = nil @client_topic = nil - @client_subscription = nil @thread_pool = nil end - def start(options = {}) + def start(options={:host => "localhost", :port => 5432}) @log.info "startup" begin - @thread_pool = Executors.new_fixed_thread_pool(8) - @connection = HotBunnies.connect(:host => options[:host], :port => options[:port]) - @channel = @connection.create_channel - @channel.prefetch = 10 + + @connection = AMQP.connect(:host => options[:host]) + @channel = AMQP::Channel.new(@connection) + #@channel.prefetch = 10 register_topics rescue => e @@ -123,17 +119,14 @@ module JamWebsockets ######################## USER MESSAGING ########################### # create user exchange - @users_exchange = @channel.exchange('users', :type => :topic) + @users_exchange = @channel.topic('users') # create user messaging topic @user_topic = @channel.queue("", :auto_delete => true) @user_topic.bind(@users_exchange, :routing_key => "user.#") @user_topic.purge # subscribe for any messages to users - @user_subscription = @user_topic.subscribe(:ack => false) - - # this code serves as a callback that dequeues messages and processes them - @user_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| + @user_topic.subscribe(:ack => false) do |headers, msg| begin routing_key = headers.envelope.routing_key user_id = routing_key["user.".length..-1] @@ -166,15 +159,14 @@ module JamWebsockets ############## CLIENT MESSAGING ################### - @clients_exchange = @channel.exchange('clients', :type => :topic) + @clients_exchange = @channel.topic('clients') @client_topic = @channel.queue("", :auto_delete => true) @client_topic.bind(@clients_exchange, :routing_key => "client.#") @client_topic.purge # subscribe for any p2p messages to a client - @client_subscription = @client_topic.subscribe(:ack => false) - @client_subscription.each(:blocking => false, :executor => @threadpool) do |headers, msg| + @client_topic.subscribe(:ack => false) do |headers, msg| begin routing_key = headers.envelope.routing_key client_id = routing_key["client.".length..-1] @@ -307,24 +299,6 @@ module JamWebsockets def cleanup() # shutdown topic listeners and mq connection - begin - if !@user_subscription.nil? && @user_subscription.active? - @log.debug "cleaning up user subscription" - @user_subscription.cancel - @user_subscription.shutdown! - end - - if !@client_subscription.nil? && @client_subscription.active? - @log.debug "cleaning up client subscription" - @client_subscription.cancel - @client_subscription.shutdown! - end - - rescue => e - @log.debug "unable to cancel subscription on cleanup: #{e}" - end - - @thread_pool.shutdown if !@channel.nil? @channel.close diff --git a/spec/factories.rb b/spec/factories.rb index a5e6f4c1e..2618d0bc0 100644 --- a/spec/factories.rb +++ b/spec/factories.rb @@ -14,7 +14,7 @@ FactoryGirl.define do sequence(:description) { |n| "Jam Session #{n}" } end - factory :music_session_client, :class => JamRuby::MusicSessionClient do + factory :connection, :class => JamRuby::Connection do ip_address "1.1.1.1" end end diff --git a/spec/spec_db.rb b/spec/spec_db.rb index 7ac708075..1f2eb905d 100644 --- a/spec/spec_db.rb +++ b/spec/spec_db.rb @@ -2,25 +2,11 @@ class SpecDb TEST_DB_NAME="jam_websockets_test" - def self.recreate_database(db_config) - recreate_database_jdbc(db_config) - end - - def self.recreate_database_jdbc(db_config) - original = db_config["database"] - db_config["database"] = "postgres" - ActiveRecord::Base.establish_connection(db_config) - ActiveRecord::Base.connection.execute("DROP DATABASE IF EXISTS #{TEST_DB_NAME}") - ActiveRecord::Base.connection.execute("CREATE DATABASE #{TEST_DB_NAME}") - JamDb::Migrator.new.migrate(:dbname => TEST_DB_NAME) - db_config["database"] = original - end - - def self.recreate_database_pg - - conn = PG::Connection.open("dbname=postgres") + def self.recreate_database + conn = PG::Connection.open("dbname=postgres user=postgres password=postgres host=localhost") conn.exec("DROP DATABASE IF EXISTS #{TEST_DB_NAME}") conn.exec("CREATE DATABASE #{TEST_DB_NAME}") - JamDb::Migrator.new.migrate(:dbname => TEST_DB_NAME) + JamDb::Migrator.new.migrate(:dbname => TEST_DB_NAME, :user => "postgres", :password => "postgres", :host => "localhost") end + end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d1c8af6a1..203534124 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -23,7 +23,7 @@ Logging.logger.root.appenders = Logging.appenders.stdout # recreate test database and migrate it db_config = YAML::load(File.open('config/database.yml'))["test"] -SpecDb::recreate_database(db_config) +SpecDb::recreate_database() # initialize ActiveRecord's db connection ActiveRecord::Base.establish_connection(db_config)