Skip to content

Commit 5f64fd2

Browse files
epologeebformamartijnversluis
committed
Implement S2 protocol library
Co-authored-by: Bob Forma <bob@stekker.com> Co-authored-by: Martijn Versluis <martijn@stekker.com>
1 parent daf5a24 commit 5f64fd2

78 files changed

Lines changed: 6104 additions & 3 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,14 @@ json = handshake.to_json
118118
```
119119
┌─────────────────────┐ WebSocket ┌─────────────────┐
120120
│ Your Application │◄─────────────────────►│ CEM Server │
121-
│ (Resource Manager) │ │ │
121+
│ (Resource Manager) │ ← Your custom logic │ │
122122
└──────────┬──────────┘ └─────────────────┘
123123
124124
125125
┌─────────────────────┐
126126
│ S2::Connection │ ← Manages WebSocket lifecycle
127127
│ S2::Session │ ← Handles protocol state
128-
│ S2::MessageHandler │ ← Your custom logic
128+
│ S2::MessageHandler │
129129
└─────────────────────┘
130130
```
131131

lib/s2.rb

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
require "active_support"
2+
require "active_support/core_ext/class/attribute"
3+
require "active_support/core_ext/hash/indifferent_access"
4+
require "active_support/core_ext/numeric/time"
5+
require "active_support/core_ext/object/blank"
6+
require "active_support/callbacks"
7+
require "active_support/concern"
8+
require "active_support/notifications"
9+
require "active_support/rescuable"
10+
require "async"
11+
require "async/http/endpoint"
12+
require "async/notification"
13+
require "async/queue"
14+
require "async/websocket/client"
15+
require "dry-struct"
16+
require "dry-types"
17+
require "json"
18+
require "logger"
19+
require "securerandom"
20+
21+
require_relative "s2/version"
22+
23+
module S2
24+
class << self
25+
attr_writer :logger, :message_handler_class, :supported_protocol_versions
26+
27+
def logger
28+
@logger ||= Logger.new(nil)
29+
end
30+
31+
def message_handler_class
32+
@message_handler_class ||= S2::MessageHandler
33+
end
34+
35+
def supported_protocol_versions
36+
@supported_protocol_versions ||= ["0.0.2-beta"]
37+
end
38+
end
39+
40+
class MessageHandler # rubocop:disable Lint/EmptyClass
41+
end
42+
end
43+
44+
require_relative "s2/messages"
45+
require_relative "s2/message_factory"
46+
require_relative "s2/message_sender"
47+
require_relative "s2/message_handler/base_error"
48+
require_relative "s2/message_handler/dispatching"
49+
require_relative "s2/message_handler/replying"
50+
require_relative "s2/message_handler/error_handling"
51+
require_relative "s2/message_handler/asserting"
52+
53+
require_relative "s2/message_handler"
54+
require_relative "s2/session"
55+
require_relative "s2/connection"

lib/s2/connection.rb

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
module S2
2+
class Connection
3+
INITIAL_BACKOFF = 5.seconds
4+
MAX_BACKOFF = 1.hour
5+
6+
delegate :state,
7+
to: :@session,
8+
allow_nil: true
9+
10+
attr_reader :connected_at, :status
11+
12+
def initialize(resource_id:, task:, ws_url:)
13+
@connected_at = nil
14+
@queue = nil
15+
@resource_id = resource_id
16+
@session = nil
17+
@task = task
18+
@ws_url = ws_url
19+
@status = :initialized
20+
@stopping = false
21+
@backoff = INITIAL_BACKOFF
22+
@endpoint = Async::HTTP::Endpoint.parse(
23+
ws_url,
24+
alpn_protocols: Async::HTTP::Protocol::HTTP11.names,
25+
)
26+
end
27+
28+
def connect
29+
until @stopping
30+
begin
31+
connect_and_run
32+
@backoff = INITIAL_BACKOFF
33+
rescue StandardError => e
34+
ActiveSupport::Notifications.instrument("connection_errored.session.s2", exception: e) unless @stopping
35+
ensure
36+
break if @stopping
37+
38+
S2.logger.info "[#{self.class.name}] [#{@resource_id}] Reconnecting in #{@backoff}s..."
39+
@status = :reconnecting
40+
sleep @backoff
41+
@backoff = [@backoff * 2, MAX_BACKOFF].min
42+
end
43+
end
44+
end
45+
46+
def disconnect
47+
@status = :disconnecting
48+
@stopping = true
49+
@session&.stop
50+
51+
begin
52+
@queue&.close
53+
rescue StandardError
54+
# ignore
55+
end
56+
57+
@status = :disconnected
58+
end
59+
60+
def send_message(message)
61+
@queue.push(message)
62+
end
63+
64+
private
65+
66+
def connect_and_run
67+
connect_websocket do |ws|
68+
@queue = Async::Queue.new
69+
@session = S2::Session.new(resource_id: @resource_id, ws:, queue: @queue)
70+
@connected_at = Time.current
71+
@status = :connected
72+
send_handshake
73+
@session.start
74+
ensure
75+
@session = nil
76+
@queue = nil
77+
@status = :disconnected
78+
end
79+
end
80+
81+
def connect_websocket(&)
82+
@status = :connecting
83+
84+
Async::WebSocket::Client.connect(@endpoint) do |ws|
85+
ActiveSupport::Notifications.instrument(
86+
"connected.session.s2",
87+
resource_id: @resource_id,
88+
url: @ws_url,
89+
)
90+
91+
yield ws
92+
ensure
93+
ActiveSupport::Notifications.instrument(
94+
"disconnected.session.s2",
95+
resource_id: @resource_id,
96+
)
97+
end
98+
end
99+
100+
def send_handshake
101+
handshake = S2::Messages::Handshake.new(
102+
message_id: SecureRandom.uuid,
103+
message_type: S2::Messages::HandshakeMessageType::Handshake,
104+
role: S2::Messages::EnergyManagementRole::Rm,
105+
supported_protocol_versions: S2.supported_protocol_versions,
106+
)
107+
108+
send_message(handshake)
109+
end
110+
end
111+
end

lib/s2/message_factory.rb

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
module S2
2+
class MessageFactory
3+
MESSAGE_TYPE_TO_MESSAGE_CLASS = {
4+
"FRBC.ActuatorStatus" => S2::Messages::FRBCActuatorStatus,
5+
"FRBC.FillLevelTargetProfile" => S2::Messages::FRBCFillLevelTargetProfile,
6+
"FRBC.Instruction" => S2::Messages::FRBCInstruction,
7+
"FRBC.LeakageBehaviour" => S2::Messages::FRBCLeakageBehaviour,
8+
"FRBC.StorageStatus" => S2::Messages::FRBCStorageStatus,
9+
"FRBC.SystemDescription" => S2::Messages::FRBCSystemDescription,
10+
"FRBC.TimerStatus" => S2::Messages::FRBCTimerStatus,
11+
"FRBC.UsageForecast" => S2::Messages::FRBCUsageForecast,
12+
"Handshake" => S2::Messages::Handshake,
13+
"HandshakeResponse" => S2::Messages::HandshakeResponse,
14+
"InstructionStatusUpdate" => S2::Messages::InstructionStatusUpdate,
15+
"PowerForecast" => S2::Messages::PowerForecast,
16+
"PowerMeasurement" => S2::Messages::PowerMeasurement,
17+
"ReceptionStatus" => S2::Messages::ReceptionStatus,
18+
"ResourceManagerDetails" => S2::Messages::ResourceManagerDetails,
19+
"RevokeObject" => S2::Messages::RevokeObject,
20+
"SelectControlType" => S2::Messages::SelectControlType,
21+
"SessionRequest" => S2::Messages::SessionRequest,
22+
}.freeze
23+
24+
class BaseError < StandardError
25+
attr_reader :message_id
26+
27+
def initialize(message, message_id = nil)
28+
super(message)
29+
@message_id = message_id
30+
end
31+
end
32+
33+
class InvalidMessageFormat < BaseError; end
34+
class MissingMessageType < BaseError; end
35+
class UnsupportedMessageType < BaseError; end
36+
class InvalidMessagePayload < BaseError; end
37+
38+
class << self
39+
def create_message(data)
40+
message = data.is_a?(String) ? parse_json(data) : data
41+
message_type = message["message_type"]
42+
message_id = message["message_id"]
43+
44+
raise(MissingMessageType, "Message type not provided", message_id) if message_type.blank?
45+
46+
message_class = MESSAGE_TYPE_TO_MESSAGE_CLASS[message_type]
47+
48+
if message_class.nil?
49+
raise(
50+
UnsupportedMessageType,
51+
"Message type not supported: #{message_type}",
52+
message_id,
53+
)
54+
end
55+
56+
build_message(message_class, message)
57+
end
58+
59+
private
60+
61+
def parse_json(data)
62+
JSON.parse(data)
63+
rescue JSON::ParserError
64+
raise(InvalidMessageFormat, "Invalid JSON")
65+
end
66+
67+
def build_message(message_class, message)
68+
message_class.from_dynamic!(message)
69+
rescue KeyError => e
70+
raise(InvalidMessagePayload, "Field missing in payload: #{e.key}", message["message_id"])
71+
end
72+
end
73+
end
74+
end

lib/s2/message_handler.rb

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
class S2::MessageHandler
2+
include ActiveSupport::Callbacks
3+
include Asserting
4+
include Dispatching
5+
include ErrorHandling
6+
include Replying
7+
8+
define_callbacks :handle
9+
reply_with :@message_sender
10+
11+
STATUSES = [
12+
:websocket_connected,
13+
:initialized,
14+
].freeze
15+
16+
class << self
17+
def before_handle(method_name = nil, &blk)
18+
set_callback :handle, :before, method_name || wrap_callback_block(blk)
19+
end
20+
21+
def after_handle(method_name = nil, &blk)
22+
set_callback :handle, :after, method_name || wrap_callback_block(blk)
23+
end
24+
25+
def around_handle(&)
26+
set_callback :handle, :around do |_, inner|
27+
instance_exec(Fiber[:s2_payload], -> { inner.call }, &)
28+
end
29+
end
30+
31+
private
32+
33+
def wrap_callback_block(blk)
34+
raise ArgumentError, "block required" unless blk
35+
36+
proc do
37+
instance_exec(Fiber[:s2_payload], &blk)
38+
end
39+
end
40+
end
41+
42+
attr_reader :state
43+
44+
def initialize(message_sender:, state: { status: :websocket_connected })
45+
@message_sender = message_sender
46+
@state = state
47+
end
48+
49+
def handle_message(payload)
50+
Fiber[:s2_payload] = payload
51+
52+
run_callbacks :handle do
53+
message = S2::MessageFactory.create_message(payload)
54+
dispatch_message(message)
55+
end
56+
rescue StandardError => e
57+
rescue_with_handler(e) || raise
58+
end
59+
60+
protected
61+
62+
def update_state(**kwargs)
63+
@state.merge!(kwargs)
64+
end
65+
66+
def update_status(new_status)
67+
raise ArgumentError, "Invalid status: #{new_status}" unless STATUSES.include?(new_status)
68+
69+
update_state status: new_status
70+
end
71+
end
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
module S2::MessageHandler::Asserting
2+
extend ActiveSupport::Concern
3+
4+
class PermanentError < S2::MessageHandler::BaseError; end
5+
class InvalidContent < S2::MessageHandler::BaseError; end
6+
7+
included do
8+
include S2::MessageHandler::ErrorHandling
9+
10+
rescue_from InvalidContent do |error|
11+
reply to: error.message_id, status: :invalid_content, diagnostic_label: error.message
12+
end
13+
14+
rescue_from PermanentError do |error|
15+
reply to: error.message_id, status: :permanent_error, diagnostic_label: error.message
16+
end
17+
end
18+
19+
def assert_state!(message, **expected)
20+
expected.each do |key, value|
21+
actual = @state[key]
22+
next if actual == value
23+
24+
raise(
25+
PermanentError.new(
26+
"Invalid state: expected #{key} to be '#{value}', got '#{actual}'",
27+
message.message_id,
28+
),
29+
)
30+
end
31+
end
32+
33+
def assert_attribute!(message, **expected)
34+
expected.each do |key, value|
35+
actual = message.public_send(key)
36+
next if actual == value
37+
38+
raise(
39+
PermanentError.new(
40+
"Invalid attribute: expected #{key} to be '#{value}', got '#{actual}'",
41+
message.message_id,
42+
),
43+
)
44+
end
45+
end
46+
end
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
class S2::MessageHandler::BaseError < StandardError
2+
attr_reader :message_id
3+
4+
def initialize(message, message_id)
5+
super(message)
6+
7+
@message_id = message_id
8+
end
9+
end

0 commit comments

Comments
 (0)