diff --git a/websocket-gateway/.gitignore b/websocket-gateway/.gitignore new file mode 100644 index 000000000..f2cd712b0 --- /dev/null +++ b/websocket-gateway/.gitignore @@ -0,0 +1,24 @@ +*.gem +*.rbc +.bundle +.config +.yardoc +Gemfile.lock +InstalledFiles +_yardoc +coverage +doc/ +lib/bundler/man +pkg +rdoc +spec/reports +test/tmp +test/version_tmp +tmp +log/* +.idea +*~ +*.swp +*.iml +target +vendor diff --git a/websocket-gateway/.pg_migrate b/websocket-gateway/.pg_migrate new file mode 100644 index 000000000..1a32f454a --- /dev/null +++ b/websocket-gateway/.pg_migrate @@ -0,0 +1 @@ +up.connopts=dbname:jam diff --git a/websocket-gateway/.ruby-gemset b/websocket-gateway/.ruby-gemset new file mode 100644 index 000000000..14774b465 --- /dev/null +++ b/websocket-gateway/.ruby-gemset @@ -0,0 +1 @@ +websockets diff --git a/websocket-gateway/.ruby-version b/websocket-gateway/.ruby-version new file mode 100644 index 000000000..abf2ccea0 --- /dev/null +++ b/websocket-gateway/.ruby-version @@ -0,0 +1 @@ +ruby-2.0.0-p247 diff --git a/websocket-gateway/Gemfile b/websocket-gateway/Gemfile new file mode 100644 index 000000000..75ebedf18 --- /dev/null +++ b/websocket-gateway/Gemfile @@ -0,0 +1,57 @@ +#ruby=1.9.3-p327 +source 'https://rubygems.org' +unless ENV['LOCAL_DEV'] == '1' + source 'https://jamjam:blueberryjam@int.jamkazam.com/gems/' +end + +# Look for $WORKSPACE, otherwise use "workspace" as dev path. +workspace = ENV["WORKSPACE"] || "~/workspace" +devenv = ENV["BUILD_NUMBER"].nil? # Jenkins sets a build number environment variable + +if devenv + gem 'jam_db', :path=> "#{workspace}/jam-db/target/ruby_package" + gem 'jampb', :path => "#{workspace}/jam-pb/target/ruby/jampb" + gem 'jam_ruby', :path => "#{workspace}/jam-ruby" +else + gem 'jam_db' + gem 'jampb' + gem 'jam_ruby' +end + + +gem 'uuidtools', '2.1.2' +gem 'bcrypt-ruby', '3.0.1' +gem 'ruby-protocol-buffers', '1.2.2' +gem 'em-websocket', '>=0.4.0' #, :path=> "#{workspace}/em-websocket-jam" +gem 'amqp' +gem 'activerecord', '3.2.7' +gem 'logging' +gem 'will_paginate' +gem 'actionmailer' +gem 'sendgrid' +gem 'rb-readline' +gem 'aasm', '3.0.16' +gem 'carrierwave' +gem 'devise' +gem 'postgres-copy' +gem 'aws-sdk' + +group :development do + gem 'pry' +end + +group :test do + gem 'cucumber' + gem 'rspec' + gem 'factory_girl' + #gem 'spork', '0.9.0' + gem 'database_cleaner', '0.7.0' + gem 'guard', '>= 0.10.0' + gem 'guard-rspec', '>= 0.7.3' + gem 'pg_migrate','0.1.11' #:path => "#{workspace}/pg_migrate_ruby" + gem 'evented-spec' +end + +group :package do + gem 'fpm' +end diff --git a/websocket-gateway/Guardfile b/websocket-gateway/Guardfile new file mode 100644 index 000000000..e84dab66b --- /dev/null +++ b/websocket-gateway/Guardfile @@ -0,0 +1,2 @@ +interactor :simple +guard 'jruby-rspec', :spec_paths => ["spec"] diff --git a/websocket-gateway/README.md b/websocket-gateway/README.md new file mode 100644 index 000000000..cd61f2a75 --- /dev/null +++ b/websocket-gateway/README.md @@ -0,0 +1,12 @@ +TODO & DESIGN LIMITATIONS +========================= + +* !!!! lock up multi-threaded unsafe data structures + +* The rabbitmq connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy) +* The database connection isn't pooled. Throughput limitation (but could be resolved by just starting more instances of JamWebsocket behind Haproxy) +* We make just one user topic registration and session registration for all users/sessions. If ever we had 10 of servers, it could be wasteful. It just depends on how fast the bogus messaging can be ignored +* The database connection is pooled. +* The user model is stored in memory, meaning periodically it should be reloaded from the database (in case a user was marked inactive and you want them knocked out of the system) +* The user could easily join to multiple sessions. Currently, though, the ClientContext object only tracks one jam session topic subscription. This is minimial to change. +* peek logic not implemented on server for protoc messages; this could be done to save cost of deserialization and serialization for session/user directed messages diff --git a/websocket-gateway/Rakefile b/websocket-gateway/Rakefile new file mode 100644 index 000000000..f57ae68a8 --- /dev/null +++ b/websocket-gateway/Rakefile @@ -0,0 +1,2 @@ +#!/usr/bin/env rake +require "bundler/gem_tasks" diff --git a/websocket-gateway/bin/websocket_gateway b/websocket-gateway/bin/websocket_gateway new file mode 100755 index 000000000..19cf17391 --- /dev/null +++ b/websocket-gateway/bin/websocket_gateway @@ -0,0 +1,41 @@ +#!/usr/bin/env ruby + +# establish database connection before including JamRuby +require 'active_record' +bin_dir = File.expand_path(File.dirname(__FILE__)) + +app_config_file = File.join(bin_dir, '..', 'config', 'application.yml') +db_config_file = File.join(bin_dir, '..', 'config', 'database.yml') +jamenv = ENV['JAMENV'] +jamenv ||= 'development' +config = YAML::load(File.open(app_config_file))[jamenv] +db_config = YAML::load(File.open(db_config_file))[jamenv] + +ActiveRecord::Base.establish_connection(db_config) + + +# now bring in the Jam code +require 'jam_websockets' + +include JamWebsockets + +# run some method + + +if config["verbose"] + Logging.logger.root.level = :debug +else + Logging.logger.root.level = :info +end + +if jamenv == "production" + one_meg = 1024 * 1024 + Logging.logger.root.appenders = Logging.appenders.rolling_file("log/#{jamenv}.log", :truncate=>true, :age=>'daily', :size=>one_meg, :keep=>20) +else + Logging.logger.root.appenders = Logging.appenders.stdout +end + +Server.new.run(:port => config["port"], + :emwebsocket_debug => config["emwebsocket_debug"], + :connect_time_stale => config["connect_time_stale"], + :connect_time_expire => config["connect_time_expire"]) diff --git a/websocket-gateway/bin/websocket_gateway.sh b/websocket-gateway/bin/websocket_gateway.sh new file mode 100755 index 000000000..affad8beb --- /dev/null +++ b/websocket-gateway/bin/websocket_gateway.sh @@ -0,0 +1,4 @@ +#!/bin/sh +# this script is used only in development + +bundle exec ruby -Ilib bin/websocket_gateway $* diff --git a/websocket-gateway/bin/websocket_gateway_win.sh b/websocket-gateway/bin/websocket_gateway_win.sh new file mode 100755 index 000000000..b6c741b4e --- /dev/null +++ b/websocket-gateway/bin/websocket_gateway_win.sh @@ -0,0 +1,10 @@ +#!/bin/sh +# this script is used only in development + +# Look for $WORKSPACE, otherwise use "workspace" as dev path. +if [ -z "$WORKSPACE" ]; then + WORKSPACE="~/workspace" +fi + + +jruby -I"$WORKSPACE/jam-ruby/lib" -I"$WORKSPACE/jam-pb/target/ruby/jampb/lib" -I"$WORKSPACE/jam-db/target/ruby_package/lib" -Ilib bin/websocket_gateway $* diff --git a/websocket-gateway/build b/websocket-gateway/build new file mode 100755 index 000000000..0cb842ea4 --- /dev/null +++ b/websocket-gateway/build @@ -0,0 +1,56 @@ +#!/bin/bash + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +# 'target' is the output directory +rm -rf $DIR/target +mkdir $DIR/target +mkdir $DIR/target/deb + +# put all dependencies into vendor/bundle +rm -rf vendor/bundle +echo "updating dependencies" + +bundle install --path vendor/bundle +bundle update + + +if [ -z $SKIP_TESTS ]; then + echo "running rspec tests" + bundle exec rspec + + if [ "$?" = "0" ]; then + echo "tests completed" + else + echo "tests failed." + exit 1 + fi +fi + +if [ -n "$PACKAGE" ]; then + if [ -z "$BUILD_NUMBER" ]; then + echo "BUILD NUMBER is not defined" + exit 1 + fi + + type -P dpkg-architecture > /dev/null + + if [ "$?" = "0" ]; then + ARCH=`dpkg-architecture -qDEB_HOST_ARCH` + else + echo "WARN: unable to determine architecture." + ARCH=`all` + fi + + set -e + # cache all gems local, and tell bundle to use local gems only + bundle install --path vendor/bundle --local + + # create debian using fpm + bundle exec fpm -s dir -t deb -p target/deb/websocket-gateway_0.1.${BUILD_NUMBER}_${ARCH}.deb -n "websocket-gateway" -v "0.1.$BUILD_NUMBER" --prefix /var/lib/websocket-gateway --after-install $DIR/script/package/post-install.sh --before-install $DIR/script/package/pre-install.sh --before-remove $DIR/script/package/pre-uninstall.sh --after-remove $DIR/script/package/post-uninstall.sh Gemfile lib bin vendor .bundle config script + +fi + + +echo "build complete" + diff --git a/websocket-gateway/config/application.yml b/websocket-gateway/config/application.yml new file mode 100644 index 000000000..038b59167 --- /dev/null +++ b/websocket-gateway/config/application.yml @@ -0,0 +1,19 @@ +Defaults: &defaults + connect_time_stale: 30 + connect_time_expire: 60 + +development: + port: 6767 + verbose: true + emwebsocket_debug: false + <<: *defaults + +test: + port: 6769 + verbose: true + <<: *defaults + +production: + port: 6767 + verbose: false + <<: *defaults diff --git a/websocket-gateway/config/database.yml b/websocket-gateway/config/database.yml new file mode 100644 index 000000000..435b92b3f --- /dev/null +++ b/websocket-gateway/config/database.yml @@ -0,0 +1,32 @@ +test: + adapter: postgresql + database: jam_websockets_test + host: localhost + port: 5432 + pool: 3 + username: postgres + password: postgres + timeout: 2000 + encoding: unicode + +development: + adapter: postgresql + database: jam + host: localhost + port: 5432 + pool: 3 + username: postgres + password: postgres + timeout: 2000 + encoding: unicode + +production: + adapter: postgresql + database: jam + host: localhost + port: 5432 + pool: 3 + username: postgres + password: postgres + timeout: 2000 + encoding: unicode diff --git a/websocket-gateway/features/login.feature b/websocket-gateway/features/login.feature new file mode 100644 index 000000000..31f025966 --- /dev/null +++ b/websocket-gateway/features/login.feature @@ -0,0 +1,8 @@ +Feature: Login + In order to protect the internal cloud from malicious actors, login is required to remain connected and send commands as a particular user. + + Scenario: Login with bad credentials and be bounced + Given I supply bad credentials in Login command + When I send the command + Then the result should be an error seen and a closed connection + diff --git a/websocket-gateway/jam_websockets.gemspec b/websocket-gateway/jam_websockets.gemspec new file mode 100644 index 000000000..79146ce0b --- /dev/null +++ b/websocket-gateway/jam_websockets.gemspec @@ -0,0 +1,17 @@ +# -*- encoding: utf-8 -*- +require File.expand_path('../lib/jam_websockets/version', __FILE__) + +Gem::Specification.new do |gem| + gem.authors = ["Seth Call"] + gem.email = ["sethcall@gmail.com"] + gem.description = %q{websocket server for clients} + gem.summary = %q{websocket server for clients} + gem.homepage = "" + + gem.files = `git ls-files`.split($\) + gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) } + gem.test_files = gem.files.grep(%r{^(test|spec|features)/}) + gem.name = "jam_websockets" + gem.require_paths = ["lib"] + gem.version = JamWebsockets::VERSION +end diff --git a/websocket-gateway/jenkins b/websocket-gateway/jenkins new file mode 100755 index 000000000..5dc8f4c18 --- /dev/null +++ b/websocket-gateway/jenkins @@ -0,0 +1,58 @@ +#!/bin/bash + +GEM_SERVER=http://localhost:9000/gems +DEB_SERVER=http://localhost:9010/apt-`uname -p` + +echo "starting build..." +./build + +if [ "$?" = "0" ]; then + echo "build succeeded" + + # generate gem version based on jenkins build number + if [ -z $BUILD_NUMBER ]; then + BUILD_NUMBER="1" + fi + VERSION="0.0.${BUILD_NUMBER}" + echo "packaging gem jam_websockets-$VERSION" + cat > lib/jam_websockets/version.rb << EOF +module JamWebsockets + VERSION = "$VERSION" +end +EOF + + gem build jam_websockets.gemspec + + GEMNAME="jam_websockets-${VERSION}.gem" + + echo "publishing gem" + curl -f -T $GEMNAME $GEM_SERVER/$GEMNAME + + if [ "$?" != "0" ]; then + echo "gem publish failed" + exit 1 + fi + echo "done publishing gem" + + if [ ! -z "$PACKAGE" ]; then + echo "publishing ubuntu package (.deb)" + DEBPATH=`find target/deb -name *.deb` + DEBNAME=`basename $DEBPATH` + + curl -f -T $DEBPATH $DEB_SERVER/$DEBNAME + + if [ "$?" != "0" ]; then + echo "deb publish failed" + exit 1 + fi + echo "done publishing deb" + + + fi +else + echo "build failed" + exit 1 +fi + + + diff --git a/websocket-gateway/lib/jam_websockets.rb b/websocket-gateway/lib/jam_websockets.rb new file mode 100644 index 000000000..f57f55206 --- /dev/null +++ b/websocket-gateway/lib/jam_websockets.rb @@ -0,0 +1,14 @@ +require "logging" +require "jam_ruby" +require "jam_websockets/version" +require "jam_websockets/session_error" +require "jam_websockets/permission_error" +require "jam_websockets/client_context" +require "jam_websockets/message" +require "jam_websockets/router" +require "jam_websockets/server" + +include JamRuby +module JamWebsockets + # Your code goes here... +end diff --git a/websocket-gateway/lib/jam_websockets/client_context.rb b/websocket-gateway/lib/jam_websockets/client_context.rb new file mode 100644 index 000000000..c3f302778 --- /dev/null +++ b/websocket-gateway/lib/jam_websockets/client_context.rb @@ -0,0 +1,28 @@ + module JamWebsockets + class ClientContext + + attr_accessor :user, :client, :msg_count, :session, :sent_bad_state_previously + + def initialize(user, client) + @user = user + @client = client + @msg_count = 0 + @session = nil + @sent_bad_state_previously = false + end + + def to_s + return "Client[user:#{@user} client:#{@client} msgs:#{@msg_count} session:#{@session}]" + end + + def hash + @client.hash + end + + def ==(o) + o.class == self.class && o.client == @client + end + alias_method :eql?, :== + + end +end diff --git a/websocket-gateway/lib/jam_websockets/message.rb b/websocket-gateway/lib/jam_websockets/message.rb new file mode 100644 index 000000000..91c877a8f --- /dev/null +++ b/websocket-gateway/lib/jam_websockets/message.rb @@ -0,0 +1,71 @@ +require 'json' +require 'protocol_buffers' +require 'protocol_buffers/compiler' + +class ProtocolBuffers::Message + + def to_json(*args) + + json = to_json_ready_object() + + json.to_json(*args) + end + + def to_json_ready_object() + hash = {} + + # simpler version, includes all fields in the output, using the default + # values if unset. also includes empty repeated fields as empty arrays. + # fields.each do |tag, field| + # hash[field.name] = value_for_tag(field.tag) + # end + + # prettier output, only includes non-empty repeated fields and set fields + fields.each do |tag, field| + if field.repeated? + value = value_for_tag(field.tag) + hash[field.name] = value unless value.empty? + else + if value_for_tag?(field.tag) + value = value_for_tag(field.tag) + if field.instance_of? ProtocolBuffers::Field::EnumField # if value is enum, resolve string value as ruby const + hash[field.name] = field.value_to_name[value] + else + proxy_class = field.instance_variable_get(:@proxy_class) + if proxy_class.nil? + hash[field.name] = value + else + hash[field.name] = value.to_json_ready_object + end + end + end + end + end + + return hash + end + + def self.json_create(hash) + # initialize takes a hash of { attribute_name => value } so you can just + # pass the hash into the constructor. but we're supposed to be showing off + # reflection, here. plus, that raises an exception if there is an unknown + # key in the hash. + # new(hash) + + message = new + fields.each do |tag, field| + if value = hash[field.name.to_s] + if value.instance_of? Hash # if the value is a Hash, descend down into PB hierachy + inner_class = field.instance_variable_get(:@proxy_class) + value = inner_class.json_create(value) + message.set_value_for_tag(field.tag, value) + elsif field.instance_of? ProtocolBuffers::Field::EnumField # if value is enum, resolve string value as ruby const + message.set_value_for_tag(field.tag, field.instance_variable_get(:@proxy_enum).const_get(value)) + else + message.set_value_for_tag(field.tag, value) + end + end + end + message + end +end diff --git a/websocket-gateway/lib/jam_websockets/permission_error.rb b/websocket-gateway/lib/jam_websockets/permission_error.rb new file mode 100644 index 000000000..83a0af45a --- /dev/null +++ b/websocket-gateway/lib/jam_websockets/permission_error.rb @@ -0,0 +1,4 @@ +class PermissionError < Exception + +end + diff --git a/websocket-gateway/lib/jam_websockets/router.rb b/websocket-gateway/lib/jam_websockets/router.rb new file mode 100644 index 000000000..7c2681498 --- /dev/null +++ b/websocket-gateway/lib/jam_websockets/router.rb @@ -0,0 +1,705 @@ +require 'set' +require 'amqp' +require 'thread' +require 'json' +require 'eventmachine' + +include Jampb + +# add new field to client connection +module EventMachine + module WebSocket + class Connection < EventMachine::Connection + attr_accessor :encode_json, :client_id # client_id is uuid we give to each client to track them as we like + end + end +end + +module JamWebsockets + + class Router + + attr_accessor :user_context_lookup + + def initialize() + @log = Logging.logger[self] + @pending_clients = Set.new # clients that have connected to server, but not logged in. + @clients = {} # clients that have logged in + @user_context_lookup = {} # lookup a set of client_contexts by user_id + @client_lookup = {} # lookup a client by client_id + @amqp_connection_manager = nil + @users_exchange = nil + @message_factory = JamRuby::MessageFactory.new + @semaphore = Mutex.new + @user_topic = nil + @client_topic = nil + @thread_pool = nil + @heartbeat_interval = nil + + end + + def start(connect_time_stale, options={:host => "localhost", :port => 5672}, &block) + + @log.info "startup" + + @heartbeat_interval = connect_time_stale / 2 + + begin + @amqp_connection_manager = AmqpConnectionManager.new(true, 4, :host => options[:host], :port => options[:port]) + @amqp_connection_manager.connect do |channel| + register_topics(channel) + block.call + end + + rescue => e + @log.error "unable to initialize #{e.to_s}" + cleanup + raise e + end + + @log.info "started" + end + + def add_client(client_id, client_context) + @client_lookup[client_id] = client_context + end + + def remove_client(client_id, client) + deleted = @client_lookup.delete(client_id) + + if deleted.nil? + @log.warn "unable to delete #{client_id} from client_lookup" + elsif deleted.client != client + # put it back--this is only possible if add_client hit the 'old connection' path + # so in other words if this happens: + # add_client(1, clientX) + # add_client(1, clientY) # but clientX is essentially defunct - this could happen due to a bug in client, or EM doesn't notify always of connection close in time + # remove_client(1, clientX) -- this check maintains that clientY stays as the current client in the hash + @client_lookup[client_id] = deleted + @log.debug "putting back client into @client_lookup for #{client_id} #{client.inspect}" + else + @log.debug "cleaned up @client_lookup for #{client_id}" + end + + end + + def add_user(context) + user_contexts = @user_context_lookup[context.user.id] + if user_contexts.nil? + user_contexts = Hash.new + @user_context_lookup[context.user.id] = user_contexts + end + + user_contexts[context.client] = context + end + + def remove_user(client_context) + user_contexts = @user_context_lookup[client_context.user.id] + + if user_contexts.nil? + @log.warn "user can not be removed #{client_context}" + else + # delete the context from set of user contexts + user_contexts.delete(client_context.client) + + # if last user context, delete entire set (memory leak concern) + if user_contexts.length == 0 + @user_context_lookup.delete(client_context.user.id) + end + + client_context.user = nil + end + end + + # register topic for user messages and session messages + def register_topics(channel) + + ######################## USER MESSAGING ########################### + + # create user exchange + @users_exchange = channel.topic('users') + # create user messaging topic + @user_topic = channel.queue("", :auto_delete => true) + @user_topic.bind(@users_exchange, :routing_key => "user.#") + @user_topic.purge + + # subscribe for any messages to users + @user_topic.subscribe(:ack => false) do |headers, msg| + begin + routing_key = headers.routing_key + user_id = routing_key["user.".length..-1] + + @semaphore.synchronize do + contexts = @user_context_lookup[user_id] + + if !contexts.nil? + + @log.debug "received user-directed message for user: #{user_id}" + + msg = Jampb::ClientMessage.parse(msg) + + contexts.each do |client_id, context| + EM.schedule do + @log.debug "sending user message to #{context}" + send_to_client(context.client, msg) + end + end + else + @log.debug "Can't route message: no user connected with id #{user_id}" + end + end + + rescue => e + @log.error "unhandled error in messaging to client" + @log.error e + end + end + + MQRouter.user_exchange = @users_exchange + + ############## CLIENT MESSAGING ################### + + @clients_exchange = channel.topic('clients') + + @client_topic = channel.queue("", :auto_delete => true) + @client_topic.bind(@clients_exchange, :routing_key => "client.#") + @client_topic.purge + + # subscribe for any p2p messages to a client + @client_topic.subscribe(:ack => false) do |headers, msg| + begin + routing_key = headers.routing_key + client_id = routing_key["client.".length..-1] + @semaphore.synchronize do + client_context = @client_lookup[client_id] + client = client_context.client + + msg = Jampb::ClientMessage.parse(msg) + + @log.debug "client-directed message received from #{msg.from} to client #{client_id}" + + unless client.nil? + + EM.schedule do + @log.debug "sending client-directed down websocket to #{client_id}" + send_to_client(client, msg) + end + else + @log.debug "client-directed message unroutable to disconnected client #{client_id}" + end + end + rescue => e + @log.error "unhandled error in messaging to client" + @log.error e + end + end + + MQRouter.client_exchange = @clients_exchange + end + + + def new_client(client) + @semaphore.synchronize do + @pending_clients.add(client) + end + + # default to using json instead of pb + client.encode_json = true + + client.onopen { |handshake| + #binding.pry + @log.debug "client connected #{client}" + + # check for '?pb' or '?pb=true' in url query parameters + query_pb = handshake.query["pb"] + + if !query_pb.nil? && (query_pb == "" || query_pb == "true") + client.encode_json = false + end + + } + + client.onclose { + @log.debug "Connection closed" + stale_client(client) + } + + client.onerror { |error| + if error.kind_of?(EM::WebSocket::WebSocketError) + @log.error "websockets error: #{error}" + else + @log.error "generic error: #{error} #{error.backtrace}" + end + + cleanup_client(client) + client.close_websocket + } + + client.onmessage { |msg| + @log.debug("msg received") + + # TODO: set a max message size before we put it through PB? + # TODO: rate limit? + + pb_msg = nil + + begin + if client.encode_json + #example: {"type":"LOGIN", "target":"server", "login" : {"username":"hi"}} + parse = JSON.parse(msg) + pb_msg = Jampb::ClientMessage.json_create(parse) + self.route(pb_msg, client) + else + pb_msg = Jampb::ClientMessage.parse(msg.to_s) + self.route(pb_msg, client) + end + rescue SessionError => e + @log.info "ending client session deliberately due to malformed client behavior. reason=#{e}" + begin + # wrap the message up and send it down + error_msg = @message_factory.server_rejection_error(e.to_s) + send_to_client(client, error_msg) + ensure + client.close_websocket + cleanup_client(client) + end + rescue PermissionError => e + @log.info "permission error. reason=#{e.to_s}" + @log.info e + + # wrap the message up and send it down + error_msg = @message_factory.server_permission_error(pb_msg.message_id, e.to_s) + send_to_client(client, error_msg) + rescue => e + @log.error "ending client session due to server programming or runtime error. reason=#{e.to_s}" + @log.error e + + begin + # wrap the message up and send it down + error_msg = @message_factory.server_generic_error(e.to_s) + send_to_client(client, error_msg) + ensure + client.close_websocket + cleanup_client(client) + end + end + } + end + + + def send_to_client(client, msg) + @log.debug "SEND TO CLIENT (#{@message_factory.get_message_type(msg)})" + if client.encode_json + client.send(msg.to_json.to_s) + else + # this is so odd that this is necessary from an API perspective. but searching through the source code... it's all I could find in em-websocket for allowing a binary message to be sent + client.instance_variable_get(:@handler).send_frame(:binary, msg.to_s) + end + end + + def cleanup() + # shutdown topic listeners and mq connection + + unless @amqp_connection_manager.nil? + @amqp_connection_manager.disconnect + end + + # tear down each individual client + @clients.each do |client, context| + cleanup_client(client) + end + end + + def stop + @log.info "shutdown" + cleanup + end + + # caused a client connection to be marked stale + def stale_client(client) + if cid = client.client_id + ConnectionManager.active_record_transaction do |connection_manager| + music_session_id = connection_manager.flag_connection_stale_with_client_id(cid) + # update the session members, letting them know this client went stale + context = @client_lookup[client.client_id] + music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil? + Notification.send_musician_session_stale(music_session, client.client_id, context.user) unless music_session.nil? + end + end + end + + def cleanup_clients_with_ids(client_ids) + # @log.debug("*** cleanup_clients_with_ids: client_ids = #{client_ids.inspect}") + client_ids.each do |cid| + + client_context = @client_lookup[cid] + self.cleanup_client(client_context.client) unless client_context.nil? + + # remove this connection from the database + ConnectionManager.active_record_transaction do |mgr| + mgr.delete_connection(cid) { |conn, count, music_session_id, user_id| + Notification.send_friend_update(user_id, false, conn) if count == 0 + music_session = MusicSession.find_by_id(music_session_id) unless music_session_id.nil? + user = User.find_by_id(user_id) unless user_id.nil? + Notification.send_musician_session_depart(music_session, cid, user) unless music_session.nil? || user.nil? + } + end + end + end + + # removes all resources associated with a client + def cleanup_client(client) + @semaphore.synchronize do + # @log.debug("*** cleanup_clients: client = #{client}") + pending = @pending_clients.delete?(client) + + if !pending.nil? + @log.debug "cleaning up not-logged-in client #{client}" + else + + @log.debug "cleanup up logged-in client #{client}" + + remove_client(client.client_id, client) + + context = @clients.delete(client) + + if !context.nil? + remove_user(context) + else + @log.debug "skipping duplicate cleanup attempt of logged-in client" + end + + end + end + end + + def route(client_msg, client) + message_type = @message_factory.get_message_type(client_msg) + + raise SessionError, "unknown message type received: #{client_msg.type}" if message_type.nil? + + @log.debug("msg received #{message_type}") + + raise SessionError, 'client_msg.route_to is null' if client_msg.route_to.nil? + + if @pending_clients.include? client and client_msg.type != ClientMessage::Type::LOGIN + # this client has not logged in and is trying to send a non-login message + raise SessionError, "must 'Login' first" + end + + if @message_factory.server_directed? client_msg + + handle_server_directed(client_msg, client) + + elsif @message_factory.client_directed? client_msg + + to_client_id = client_msg.route_to[MessageFactory::CLIENT_TARGET_PREFIX.length..-1] + handle_client_directed(to_client_id, client_msg, client) + + elsif @message_factory.session_directed? client_msg + + session_id = client_msg.target[MessageFactory::SESSION_TARGET_PREFIX.length..-1] + handle_session_directed(session_id, client_msg, client) + + elsif @message_factory.user_directed? client_msg + + user_id = client_msg.target[MessageFactory::USER_PREFIX_TARGET.length..-1] + handle_user_directed(user_id, client_msg, client) + + else + raise SessionError, "client_msg.route_to is unknown type: #{client_msg.route_to}" + end + + end + + def handle_server_directed(client_msg, client) + # @log.info("*** handle_server_directed(#{client_msg.inspect}, #{client})") + + if client_msg.type == ClientMessage::Type::LOGIN + + handle_login(client_msg.login, client) + + elsif client_msg.type == ClientMessage::Type::HEARTBEAT + + handle_heartbeat(client_msg.heartbeat, client_msg.message_id, client) + + else + raise SessionError, "unknown message type '#{client_msg.type}' for #{client_msg.route_to}-directed message" + end + end + + def handle_login(login, client) + username = login.username if login.value_for_tag(1) + password = login.password if login.value_for_tag(2) + token = login.token if login.value_for_tag(3) + client_id = login.client_id if login.value_for_tag(4) + reconnect_music_session_id = login.client_id if login.value_for_tag(5) + + @log.info("*** handle_login: token=#{token}; client_id=#{client_id}") + connection = nil + reconnected = false + + # you don't have to supply client_id in login--if you don't, we'll generate one + if client_id.nil? || client_id.empty? + # give a unique ID to this client. This is used to prevent session messages + # from echoing back to the sender, for instance. + client_id = UUIDTools::UUID.random_create.to_s + else + # check if there's a connection for the client... if it's stale, reconnect it + if connection = JamRuby::Connection.find_by_client_id(client_id) + # FIXME: I think connection table needs to updated within connection_manager + # otherwise this would be 1 line of code (connection.connect!) + + music_session_upon_reentry = connection.music_session + + ConnectionManager.active_record_transaction do |connection_manager| + music_session_id, reconnected = connection_manager.reconnect(connection, reconnect_music_session_id) + context = @client_lookup[client_id] + if music_session_id.nil? + # if this is a reclaim of a connection, but music_session_id comes back null, then we need to check if this connection was IN a music session before. + # if so, then we need to tell the others in the session that this user is now departed + Notification.send_musician_session_depart(music_session_upon_reentry, client.client_id, context.user) unless context.nil? || music_session_upon_reentry.nil? || music_session_upon_reentry.destroyed? + else + music_session = MusicSession.find_by_id(music_session_id) + Notification.send_musician_session_fresh(music_session, client.client_id, context.user) unless context.nil? + end + + end if connection.stale? + end + # if there's a client_id but no connection object, create new client_id + client_id = UUIDTools::UUID.random_create.to_s if !connection + end + + client.client_id = client_id + + user = valid_login(username, password, token, client_id) + + if !user.nil? + @log.debug "user #{user} logged in" + + # respond with LOGIN_ACK to let client know it was successful + remote_ip = extract_ip(client) + + + @semaphore.synchronize do + # remove from pending_queue + @pending_clients.delete(client) + + # add a tracker for this user + context = ClientContext.new(user, client) + @clients[client] = context + add_user(context) + add_client(client_id, context) + + unless connection + # log this connection in the database + ConnectionManager.active_record_transaction do |connection_manager| + connection_manager.create_connection(user.id, client.client_id, remote_ip) do |conn, count| + if count == 1 + Notification.send_friend_update(user.id, true, conn) + end + end + end + end + login_ack = @message_factory.login_ack(remote_ip, + client_id, + user.remember_token, + @heartbeat_interval, + connection.try(:music_session_id), + reconnected) + send_to_client(client, login_ack) + end + else + raise SessionError, 'invalid login' + end + end + + # TODO: deprecated; jam_ruby has routine inspired by this + def send_friend_update(user, online, client) + @log.debug "sending friend update for user #{user} online = #{online}" + + if !user.nil? && user.friends.exists? + @log.debug "user has friends - sending friend updates" + + # create the friend_update message + friend_update_msg = @message_factory.friend_update(user.id, online) + + # send the friend_update to each friend that has active connections + user.friends.each do |friend| + @log.debug "sending friend update message to #{friend}" + + handle_user_directed(friend.id, friend_update_msg, client) + end + end + end + + def handle_heartbeat(heartbeat, heartbeat_message_id, client) + unless context = @clients[client] + @log.warn "*** WARNING: unable to find context due to heartbeat from client: #{client.client_id}; calling cleanup" + cleanup_client(client) + else + connection = Connection.find_by_user_id_and_client_id(context.user.id, context.client.client_id) + if connection.nil? + @log.warn "*** WARNING: unable to find connection due to heartbeat from client: #{context}; calling cleanup_client" + cleanup_client(client) + else + connection.touch + + ConnectionManager.active_record_transaction do |connection_manager| + connection_manager.reconnect(connection, connection.music_session_id) + end if connection.stale? + end + + heartbeat_ack = @message_factory.heartbeat_ack() + + send_to_client(client, heartbeat_ack) + + # send errors to clients in response to heartbeats if rabbitmq is down + if !@amqp_connection_manager.connected? + error_msg = @message_factory.server_bad_state_error(heartbeat_message_id, "messaging system down") + context.sent_bad_state_previously = true + send_to_client(client, error_msg) + return + elsif context.sent_bad_state_previously + context.sent_bad_state_previously = false + recovery_msg = @message_factory.server_bad_state_recovered(heartbeat_message_id) + send_to_client(client, recovery_msg) + end + end + end + + def valid_login(username, password, token, client_id) + + if !token.nil? && token != '' + @log.debug "logging in via token" + # attempt login with token + user = JamRuby::User.find_by_remember_token(token) + + if user.nil? + @log.debug "no user found with token #{token}" + return nil + else + @log.debug "#{user} login via token" + return user + end + + elsif !username.nil? and !password.nil? + + @log.debug "logging in via user/pass '#{username}' '#{password}'" + # attempt login with username and password + user = User.find_by_email(username) + + if !user.nil? && user.valid_password?(password) + @log.debug "#{user} login via password" + return user + else + @log.debug "#{username} login failure" + return nil + end + else + raise SessionError, 'no login data was found in Login message' + end + end + + def access_music_session(music_session_id, user) + music_session = MusicSession.find_by_id(music_session_id) + + if music_session.nil? + raise SessionError, 'specified session not found' + end + + if !music_session.access? user + raise SessionError, 'not allowed to join the specified session' + end + + return music_session + end + + # client_id = the id of the client being accessed + # client = the current client + def access_p2p(client_id, user, msg) + + return nil + # ping_request and ping_ack messages are special in that they are simply allowed + if msg.type == ClientMessage::Type::PING_REQUEST || msg.type == ClientMessage::Type::PING_ACK + return nil + end + + client_connection = Connection.find_by_client_id(client_id) + + if client_connection.nil? + raise PermissionError, 'specified client not found' + end + + if !client_connection.access_p2p? user + raise SessionError, 'not allowed to message this client' + end + end + + + def handle_client_directed(to_client_id, client_msg, client) + context = @clients[client] + + # by not catching any exception here, a PermissionError will be thrown if this isn't valid + # if for some reason the client is trying to send to a client that it doesn't + # belong to + access_p2p(to_client_id, context.user, client_msg) + + # populate routing data + client_msg.from = client.client_id + + @log.debug "publishing to client #{to_client_id} from client_id #{client.client_id}" + + # put it on the topic exchange for clients + @clients_exchange.publish(client_msg.to_s, :routing_key => "client.#{to_client_id}", :properties => {:headers => {"client_id" => client.client_id}}) + end + + def handle_user_directed(user_id, client_msg, client) + + @log.debug "publishing to user #{user_id} from client_id #{client.client_id}" + + # put it on the topic exchange for users + @users_exchange.publish(client_msg.to_s, :routing_key => "user.#{user_id}") + end + + def handle_session_directed(session_id, client_msg, client) + context = @clients[client] + + user_publish_to_session(session_id, context.user, client_msg, :client_id => client.client_id) + end + + # sends a message to a session on behalf of a user + # if this is originating in the context of a client, it should be specified as :client_id => "value" + # client_msg should be a well-structure message (jam-pb message) + def user_publish_to_session(music_session_id, user, client_msg, sender = {:client_id => ""}) + music_session = access_music_session(music_session_id, user) + + # gather up client_ids in the session + client_ids = music_session.music_session_clients.map { |client| client.client_id }.reject { |client_id| client_id == sender[:client_id] } + + publish_to_session(music_session.id, client_ids, client_msg.to_s, sender) + end + + + # sends a message to a session with no checking of permissions + # this method deliberately has no database interactivity/active_record objects + def publish_to_session(music_session_id, client_ids, client_msg, sender = {:client_id => ""}) + + EM.schedule do + sender_client_id = sender[:client_id] + + # iterate over each person in the session, and send a p2p message + client_ids.each do |client_id| + + @@log.debug "publishing to session:#{music_session_id} client:#{client_id} from client:#{sender_client_id}" + # put it on the topic exchange3 for clients + self.class.client_exchange.publish(client_msg, :routing_key => "client.#{music_session_id}") + end + end + end + + def extract_ip(client) + return Socket.unpack_sockaddr_in(client.get_peername)[1] + end + end +end diff --git a/websocket-gateway/lib/jam_websockets/server.rb b/websocket-gateway/lib/jam_websockets/server.rb new file mode 100644 index 000000000..795e8455d --- /dev/null +++ b/websocket-gateway/lib/jam_websockets/server.rb @@ -0,0 +1,87 @@ +require 'em-websocket' + +module JamWebsockets + + class Server + + def initialize(options={}) + @log = Logging.logger[self] + @count=0 + @router = Router.new + end + + def run(options={}) + + host = "0.0.0.0" + port = options[:port] + connect_time_stale = options[:connect_time_stale].to_i + connect_time_expire = options[:connect_time_expire].to_i + + @log.info "starting server #{host}:#{port} with staleness_time=#{connect_time_stale}; reconnect time = #{connect_time_expire}" + + EventMachine.run do + @router.start(connect_time_stale) do + # take stale off the expire limit because the call to stale will + # touch the updated_at column, adding an extra stale limit to the expire time limit + # expire_time = connect_time_expire > connect_time_stale ? connect_time_expire - connect_time_stale : connect_time_expire + expire_time = connect_time_expire + start_connection_expiration(expire_time) + start_connection_flagger(connect_time_stale) + + start_websocket_listener(host, port, options[:emwebsocket_debug]) + end + + # if you don't do this, the app won't exit unless you kill -9 + at_exit do + @log.info "cleaning up server" + @router.cleanup + end + + end + end + + def start_websocket_listener(listen_ip, port, emwebsocket_debug) + EventMachine::WebSocket.start(:host => listen_ip, :port => port, :debug => emwebsocket_debug) do |ws| + @log.info "new client #{ws}" + @router.new_client(ws) + end + end + + def start_connection_expiration(stale_max_time) + # one cleanup on startup + expire_stale_connections(stale_max_time) + + EventMachine::PeriodicTimer.new(stale_max_time) do + expire_stale_connections(stale_max_time) + end + + end + + def expire_stale_connections(stale_max_time) + client_ids = [] + ConnectionManager.active_record_transaction do |connection_manager| + client_ids = connection_manager.stale_connection_client_ids(stale_max_time) + end + # @log.debug("*** expire_stale_connections(#{stale_max_time}): client_ids = #{client_ids.inspect}") + @router.cleanup_clients_with_ids(client_ids) + end + + def start_connection_flagger(flag_max_time) + # one cleanup on startup + flag_stale_connections(flag_max_time) + + EventMachine::PeriodicTimer.new(flag_max_time/2) do + flag_stale_connections(flag_max_time) + end + end + + def flag_stale_connections(flag_max_time) + # @log.debug("*** flag_stale_connections: fires each #{flag_max_time} seconds") + ConnectionManager.active_record_transaction do |connection_manager| + connection_manager.flag_stale_connections(flag_max_time) + end + end + + end + +end diff --git a/websocket-gateway/lib/jam_websockets/session_error.rb b/websocket-gateway/lib/jam_websockets/session_error.rb new file mode 100644 index 000000000..4d8a82166 --- /dev/null +++ b/websocket-gateway/lib/jam_websockets/session_error.rb @@ -0,0 +1,4 @@ +class SessionError < Exception + +end + diff --git a/websocket-gateway/lib/jam_websockets/version.rb b/websocket-gateway/lib/jam_websockets/version.rb new file mode 100644 index 000000000..67a2193ef --- /dev/null +++ b/websocket-gateway/lib/jam_websockets/version.rb @@ -0,0 +1,3 @@ +module JamWebsockets + VERSION = "0.0.1" +end diff --git a/websocket-gateway/log/.gitkeep b/websocket-gateway/log/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/websocket-gateway/migrate.sh b/websocket-gateway/migrate.sh new file mode 100755 index 000000000..04b31b1bc --- /dev/null +++ b/websocket-gateway/migrate.sh @@ -0,0 +1,3 @@ + + +bundle exec jam_db up --connopts=dbname:jam host:localhost user:postgres password:postgres --verbose diff --git a/websocket-gateway/script/package/passenger.sh b/websocket-gateway/script/package/passenger.sh new file mode 100755 index 000000000..810673e30 --- /dev/null +++ b/websocket-gateway/script/package/passenger.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +export BUILD_NUMBER=1 + +exec "/usr/local/rbenv/versions/1.9.3-p327/bin/ruby" "$@" + diff --git a/websocket-gateway/script/package/post-install.sh b/websocket-gateway/script/package/post-install.sh new file mode 100755 index 000000000..fb09f8d7f --- /dev/null +++ b/websocket-gateway/script/package/post-install.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +set -eu + +NAME="websocket-gateway" + +USER="$NAME" +GROUP="$NAME" + +# copy upstart file +cp /var/lib/$NAME/script/package/$NAME.conf /etc/init/$NAME.conf + +mkdir -p /var/lib/$NAME/log + +chown -R $USER:$GROUP /var/lib/$NAME diff --git a/websocket-gateway/script/package/post-uninstall.sh b/websocket-gateway/script/package/post-uninstall.sh new file mode 100755 index 000000000..976157e2e --- /dev/null +++ b/websocket-gateway/script/package/post-uninstall.sh @@ -0,0 +1,27 @@ +#!/bin/sh + + + +NAME="websocket-gateway" + +set -e +if [ "$1" = "remove" ] +then + set +e + # stop the process, if any is found. we don't want this failing to cause an error, though. + stop websocket-gateway + set -e + + if [ -f /etc/init/websocket-gateway.conf ]; then + rm /etc/init/websocket-gateway.conf + fi +fi + +if [ "$1" = "purge" ] +then + if [ -d /var/lib/$NAME ]; then + rm -rf /var/lib/$NAME + fi + + userdel $NAME +fi diff --git a/websocket-gateway/script/package/pre-install.sh b/websocket-gateway/script/package/pre-install.sh new file mode 100755 index 000000000..2c12b2e5c --- /dev/null +++ b/websocket-gateway/script/package/pre-install.sh @@ -0,0 +1,36 @@ +#!/bin/sh + +set -eu + +NAME="websocket-gateway" + +HOME="/var/lib/$NAME" +USER="$NAME" +GROUP="$NAME" + +# if NIS is used, then errors can occur but be non-fatal +if which ypwhich >/dev/null 2>&1 && ypwhich >/dev/null 2>&1 +then + set +e +fi + +if ! getent group "$GROUP" >/dev/null +then + addgroup --system "$GROUP" >/dev/null +fi + +# creating user if it isn't already there +if ! getent passwd "$USER" >/dev/null +then + adduser \ + --system \ + --home $HOME \ + --shell /bin/false \ + --disabled-login \ + --ingroup "$GROUP" \ + --gecos "$USER" \ + "$USER" >/dev/null +fi + +# NISno longer a possible problem; stop ignoring errors +set -e diff --git a/websocket-gateway/script/package/pre-uninstall.sh b/websocket-gateway/script/package/pre-uninstall.sh new file mode 100755 index 000000000..e69de29bb diff --git a/websocket-gateway/script/package/upstart-run.sh b/websocket-gateway/script/package/upstart-run.sh new file mode 100755 index 000000000..af5cbee66 --- /dev/null +++ b/websocket-gateway/script/package/upstart-run.sh @@ -0,0 +1,17 @@ +#!/bin/bash -l + +# default config values +BUILD_NUMBER=1 + +CONFIG_FILE="/etc/websocket-gateway/upstart.conf" +if [ -e "$CONFIG_FILE" ]; then + . "$CONFIG_FILE" +fi + +# I don't like doing this, but the next command (bundle exec) retouches/generates +# the gemfile. This unfortunately means the next debian update doesn't update this file. +# Ultimately this means an old Gemfile.lock is left behind for a new package, +# and bundle won't run because it thinks it has the wrong versions of gems +rm -f Gemfile.lock + +BUILD_NUMBER=$BUILD_NUMBER JAMENV=production exec bundle exec ruby -Ilib bin/websocket_gateway diff --git a/websocket-gateway/script/package/websocket-gateway.conf b/websocket-gateway/script/package/websocket-gateway.conf new file mode 100755 index 000000000..c51febf4b --- /dev/null +++ b/websocket-gateway/script/package/websocket-gateway.conf @@ -0,0 +1,7 @@ +description "websocket-gateway" + +start on startup +start on runlevel [2345] +stop on runlevel [016] + +exec start-stop-daemon --start --chdir /var/lib/websocket-gateway --exec /var/lib/websocket-gateway/script/package/upstart-run.sh diff --git a/websocket-gateway/spec/factories.rb b/websocket-gateway/spec/factories.rb new file mode 100644 index 000000000..3522a5e5f --- /dev/null +++ b/websocket-gateway/spec/factories.rb @@ -0,0 +1,48 @@ +FactoryGirl.define do + factory :user, :class => JamRuby::User do + sequence(:email) { |n| "person_#{n}@example.com"} + sequence(:first_name) { |n| "Person" } + sequence(:last_name) { |n| "#{n}" } + password "foobar" + password_confirmation "foobar" + email_confirmed true + musician true + city "Apex" + state "NC" + country "USA" + terms_of_service true + + + factory :admin do + admin true + end + + before(:create) do |user| + user.musician_instruments << FactoryGirl.build(:musician_instrument, user: user) + end + end + + factory :music_session, :class => JamRuby::MusicSession do + sequence(:description) { |n| "Jam Session #{n}" } + fan_chat true + fan_access true + approval_required false + musician_access true + legal_terms true + end + + factory :connection, :class => JamRuby::Connection do + ip_address "1.1.1.1" + as_musician true + end + + factory :instrument, :class => JamRuby::Instrument do + description { |n| "Instrument #{n}" } + end + + factory :musician_instrument, :class=> JamRuby::MusicianInstrument do + instrument { Instrument.find('electric guitar') } + proficiency_level 1 + priority 0 + end +end diff --git a/websocket-gateway/spec/jam_websockets/client_context_spec.rb b/websocket-gateway/spec/jam_websockets/client_context_spec.rb new file mode 100644 index 000000000..522e77b10 --- /dev/null +++ b/websocket-gateway/spec/jam_websockets/client_context_spec.rb @@ -0,0 +1,14 @@ +require 'spec_helper' + +describe ClientContext do + + let(:context) {ClientContext.new({}, "client1")} + + describe 'hashing' do + it "hash correctly" do + set = Set.new + set.add?(context).should eql(set) + set.add?(context).should be_nil + end + end +end diff --git a/websocket-gateway/spec/jam_websockets/router_spec.rb b/websocket-gateway/spec/jam_websockets/router_spec.rb new file mode 100644 index 000000000..61193a26f --- /dev/null +++ b/websocket-gateway/spec/jam_websockets/router_spec.rb @@ -0,0 +1,283 @@ +require 'spec_helper' +require 'thread' + +LoginClient = Class.new do + attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id + + + def initialize() + + end + + def onopen(&block) + @onopenblock = block + end + + def onmessage(&block) + @onmsgblock = block + end + + def close(&block) + @oncloseblock = block + end + + def close_websocket() + + end + + def send(msg) + puts msg + end + + def get_peername + return "\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" # 37643, "localhost" + end + +end + + +# does a login and returns client +def login(router, user, password, client_id) + + message_factory = MessageFactory.new + client = LoginClient.new + + login_ack = message_factory.login_ack("127.0.0.1", client_id, user.remember_token, 15, nil, false) + + router.should_receive(:send_to_client) do |*args| + args.count.should == 2 + args[0].should == client + args[1].is_a?(Jampb::ClientMessage).should be_true + end + router.should_receive(:extract_ip).at_least(:once).with(client).and_return("127.0.0.1") + client.should_receive(:onclose) + client.should_receive(:onerror) + + #client.should_receive(:get_peername).and_return("\x00\x02\x93\v\x7F\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00") + + @router.new_client(client) + handshake = double("handshake") + handshake.should_receive(:query).and_return({ "pb" => "true" }) + client.onopenblock.call handshake + + # create a login message, and pass it into the router via onmsgblock.call + login = message_factory.login_with_user_pass(user.email, password, :client_id => client_id) + + # first log in + client.onmsgblock.call login.to_s + + # then join music session + return client +end + +# currently commented out; we have deprecated logging in for jam sessions via websocket-gateway; +# use rest API instead (or direct db access with factory-girl) +def login_music_session(router, client, music_session) + #message_factory = MessageFactory.new + #login_music_session = message_factory.login_music_session(music_session.id) + #login_ack = message_factory.login_music_session_ack(false, nil); + #router.should_receive(:send_to_client).with(client, login_ack) + #client.onmsgblock.call login_music_session.to_s +end + + +describe Router do + include EventedSpec::EMSpec + + message_factory = MessageFactory.new + + em_before do + @router = Router.new() + end + + subject { @router } + + em_after do + + end + + + describe "serviceability" do + it "should start and stop", :mq => true do + #em do + done + #end + end + + it "should register for client events", :mq => true do + #em do + client = double("client") + client.should_receive(:onopen) + client.should_receive(:onclose) + client.should_receive(:onerror) + client.should_receive(:onmessage) + client.should_receive(:encode_json=) + + @router.new_client(client) + done + #end + end + end + + describe "topic routing helpers" do + it "create and delete user lookup set" do + #em do + user = double(User) + user.should_receive(:id).any_number_of_times.and_return("1") + client = double("client") + context = ClientContext.new(user, client) + + @router.user_context_lookup.length.should == 0 + + @router.add_user(context) + + @router.user_context_lookup.length.should == 1 + + @router.remove_user(context) + + @router.user_context_lookup.length.should == 0 + done + #end + end + end + + + describe "login" do + it "should not allow login of bogus user", :mq => true do + #em do + TestClient = Class.new do + + attr_accessor :onmsgblock, :onopenblock, :encode_json, :client_id + + def initialize() + + end + + def onopen(&block) + @onopenblock = block + end + + def onmessage(&block) + @onmsgblock = block + end + + def close_websocket() + end + + def close() + end + end + + client = TestClient.new + + error_msg = message_factory.server_rejection_error("invalid login") + + @router.should_receive(:send_to_client).with(client, error_msg) + client.should_receive(:close_websocket) + client.should_receive(:onclose) + client.should_receive(:onerror) + + + @router.new_client(client) + handshake = double("handshake") + handshake.should_receive(:query).and_return({"pb" => "true"}) + client.onopenblock.call handshake + + # create a login message, and pass it into the router via onmsgblock.call + login = message_factory.login_with_user_pass("baduser@example.com", "foobar") + + client.onmsgblock.call login.to_s + done + #end + end + + it "should allow login of valid user", :mq => true do + #em do + @user = FactoryGirl.create(:user, + :password => "foobar", :password_confirmation => "foobar") + client1 = login(@router, @user, "foobar", "1") + done + #end + end + + it "should allow music_session_join of valid user", :mq => true do + #em do + user1 = FactoryGirl.create(:user) # in the music session + user2 = FactoryGirl.create(:user) # in the music session + user3 = FactoryGirl.create(:user) # not in the music session + + music_session = FactoryGirl.create(:music_session, :creator => user1) + + music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "4") + music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "5") + + # make a music_session and define two members + + # create client 1, log him in, and log him in to music session + client1 = login(@router, user1, "foobar", "1") + login_music_session(@router, client1, music_session) + done + #end + end + + it "should allow two valid subscribers to communicate with session-directed messages", :mq => true do + #em do + user1 = FactoryGirl.create(:user) # in the music session + user2 = FactoryGirl.create(:user) # in the music session + + music_session = FactoryGirl.create(:music_session, :creator => user1) + + + # create client 1, log him in, and log him in to music session + client1 = login(@router, user1, "foobar", "1") + login_music_session(@router, client1, music_session) + + client2 = login(@router, user2, "foobar", "2") + login_music_session(@router, client2, music_session) + + # make a music_session and define two members + + music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "6") + music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "7") + + done + #end + end + + it "should allow two valid subscribers to communicate with p2p messages", :mq => true do + #em do + user1 = FactoryGirl.create(:user) # in the music session + user2 = FactoryGirl.create(:user) # in the music session + + music_session = FactoryGirl.create(:music_session, :creator => user1) + + # create client 1, log him in, and log him in to music session + client1 = login(@router, user1, "foobar", "1") + #login_music_session(@router, client1, music_session) + + client2 = login(@router, user2, "foobar", "2") + #login_music_session(@router, client2, music_session) + + # by creating + music_session_member1 = FactoryGirl.create(:connection, :user => user1, :music_session => music_session, :client_id => "8") + + # now attempt to message p2p! + + # first test: user 2 should be able to send a ping message to user 1, even though he isn't in the same session yet + + # create a login message, and pass it into the router via onmsgblock.call + ping = message_factory.ping_request("1", "2") + + #@router.should_receive(:send_to_client) #.with(client1, ping) + + ## send ping to client 2 + #client2.onmsgblock.call ping.to_s + + #music_session_member2 = FactoryGirl.create(:connection, :user => user2, :music_session => music_session, :client_id => "2") + + done + #end + end + end +end + diff --git a/websocket-gateway/spec/spec_db.rb b/websocket-gateway/spec/spec_db.rb new file mode 100644 index 000000000..1f2eb905d --- /dev/null +++ b/websocket-gateway/spec/spec_db.rb @@ -0,0 +1,12 @@ +class SpecDb + + TEST_DB_NAME="jam_websockets_test" + + def self.recreate_database + conn = PG::Connection.open("dbname=postgres user=postgres password=postgres host=localhost") + conn.exec("DROP DATABASE IF EXISTS #{TEST_DB_NAME}") + conn.exec("CREATE DATABASE #{TEST_DB_NAME}") + JamDb::Migrator.new.migrate(:dbname => TEST_DB_NAME, :user => "postgres", :password => "postgres", :host => "localhost") + end + +end diff --git a/websocket-gateway/spec/spec_helper.rb b/websocket-gateway/spec/spec_helper.rb new file mode 100644 index 000000000..cf25b462a --- /dev/null +++ b/websocket-gateway/spec/spec_helper.rb @@ -0,0 +1,108 @@ +require 'active_record' +require 'jam_db' +require 'spec_db' + +# recreate test database and migrate it +db_config = YAML::load(File.open('config/database.yml'))["test"] + +SpecDb::recreate_database() +# initialize ActiveRecord's db connection +ActiveRecord::Base.establish_connection(db_config) + + +require 'jam_websockets' +require 'timeout' +require 'evented-spec' + +jamenv = ENV['JAMENV'] +jamenv ||= 'development' + +fn = "#{File.dirname(__FILE__)}/../config/application.yml" +puts "*** spec_helper.rb: fn=#{fn}; #{File.exists?(fn)}; #{jamenv}" + +ff = File.open("#{File.dirname(__FILE__)}/../config/application.yml",'r') +config = YAML::load(ff)[jamenv] +puts "*** spec_helper.rb: jamenv=#{jamenv}; config = #{config}" + +if config["verbose"] + Logging.logger.root.level = :debug +else + Logging.logger.root.level = :info +end + +Logging.logger.root.appenders = Logging.appenders.stdout + + +require 'jam_ruby' +require 'jampb' +require 'rubygems' +#require 'spork' +require 'database_cleaner' +require 'factory_girl' +require 'factories' + +include JamRuby +include JamWebsockets +include Jampb + + +#uncomment the following line to use spork with the debugger +#require 'spork/ext/ruby-debug' + + +#Spork.prefork do + # Loading more in this block will cause your tests to run faster. However, + # if you change any configuration or code from libraries loaded here, you'll + # need to restart spork for it take effect. +# This file is copied to spec/ when you run 'rails generate rspec:install' + #ENV["RAILS_ENV"] ||= 'test' + #require File.expand_path("../../config/environment", __FILE__) + require 'rspec/autorun' + #require 'rspec/rails' +# This file was generated by the `rspec --init` command. Conventionally, all +# specs live under a `spec` directory, which RSpec adds to the `$LOAD_PATH`. +# Require this file using `require "spec_helper"` to ensure that it is only +# loaded once. +# +# See http://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration + RSpec.configure do |config| + config.treat_symbols_as_metadata_keys_with_true_values = true + config.run_all_when_everything_filtered = true + config.filter_run :focus + + config.before(:each) do + DatabaseCleaner.start + end + + config.after(:each) do + DatabaseCleaner.clean + end + + config.before(:suite) do + DatabaseCleaner.strategy = :truncation, {:except => %w[instruments genres] } + DatabaseCleaner.clean_with(:truncation, {:except => %w[instruments genres] }) + end + + #config.after(:each) do + # ActiveRecord::Base.connection.execute('select truncate_tables()') + # # DatabaseCleaner.clean + # end + + # If you're not using ActiveRecord, or you'd prefer not to run each of your + # examples within a transaction, remove the following line or assign false + # instead of true. + #config.use_transactional_fixtures = true + + # Run specs in random order to surface order dependencies. If you find an + # order dependency and want to debug it, you can fix the order by providing + # the seed, which is printed after each run. + # --seed 1234 + config.order = 'random' + end +#end + + +#Spork.each_run do + # This code will be run each time you run your specs. + +#end