Source code for sender
# -*- coding: utf-8 -*-
"""sender.py
Provides the Sender, which wraps Broker to send messages to the associated
communication partner.
"""
# The following import _must_ not be made, otherwise some python binding calls
# for broker distributed datastores do not work anymore. The import is left
# commented-out to avoid someone trapping in this issue.
# from __future__ import unicode_literals
import pybroker as broker
import logging
from time import sleep
[docs]class Sender(object):
"""The sender.
Sends Broker messages to an Broker endpoint.
"""
def __init__(self, master_address, master_port,
broker_endpoint, broker_topic,
connector_id):
"""Sender(master_address, port)
Initialises the Sender. The master_address/port are used to peer to the
corresponding Broker partner.
:param master_address: The address/hostname of the master bro
instance. (str)
:param master_port: The port to send to. (int)
:param broker_endpoint: The broker endpoint name to use for
connecting to the master. (str)
:param broker_topic: The broker topic to send to. (str)
:param connector_id: The connector ID. (str)
"""
self.log = logging.getLogger(self.__class__.__name__)
self.master_name = "{}:{}".format(master_address, master_port)
# these variables cannot be passed by reference -> "getter / setter"
self.master_status = broker.incoming_connection_status.tag_established
self.slave_status = broker.incoming_connection_status.tag_established
self.broker_topic = broker_topic
self.broker_endpoint = broker_endpoint
self.connector_id = connector_id
self.connector_to_master = broker.endpoint(broker_endpoint)
# Peer with broker endpoint of bro master instance
self.connector_to_master.peer(master_address, master_port, 1)
# dedicated endpoint for sending to slaves
self.connector_to_slave = broker.endpoint(connector_id)
# note: "connectors" is the name of the distributed datastore
self.balanced_slaves = broker.clone_create(self.connector_to_master,
"connectors", 1)
self.connector_to_slave_peering = None
self.current_slave = self._lookup_and_get_current_slave()
if self.current_slave:
self.connector_to_slave_peering = self._peer_connector_to_slave()
# TODO: provide a channel to accept commands (change config etc.)
[docs] def send(self, msg):
"""Send the Broker message to the peer.
:param msg: The message to be sent. (Broker message)
"""
msg.append(broker.data(self.connector_id))
try:
self._repeer_connector_to_slave()
if self.current_slave:
self.log.info("Sending to {}".format(self.current_slave))
self._send_to_bro(self.connector_to_slave, False, msg)
else:
self.log.warn("Not peered with any slave, falling back to "
"master: {}".format(self.master_name))
self._send_to_bro(self.connector_to_master, True, msg)
except Exception, e:
local_endpoint = self.current_slave or self.master_name
self.log.error("Error sending data from {} to {}. Exception: {}"
.format(self.broker_endpoint, local_endpoint, str(e)
))
def _send_to_bro(self, bro_endpoint, is_master, msg):
"""Send message if connection is established, log warning otherwise"""
if self._bro_connection_established(bro_endpoint, is_master):
bro_endpoint.send(self.broker_topic, msg)
else:
self.log.warn("Sending failed - no connection established!")
def _bro_connection_established(self, bro_endpoint, is_master):
"""Return True if connection to master is established."""
# get bro endpoint connection status
status = self.master_status if is_master else self.slave_status
ocs = bro_endpoint.outgoing_connection_status()
for m in ocs.want_pop(): # Returns message only on change
status = m.status
# set the bro endpoint connection status variable
if is_master:
self.master_status = status
else:
self.slave_status = status
return status == broker.incoming_connection_status.tag_established
def _lookup_and_get_current_slave(self):
"""Return the slave bro (name) that should be peered with"""
current_slave = None
if self.balanced_slaves:
try:
current_slave = self.balanced_slaves.lookup(
broker.data(self.broker_endpoint)).data().as_string()
self.log.debug("Lookup {} returns {}"
.format(self.broker_endpoint, current_slave))
except Exception, e:
self.log.error("Error looking up slave on connector '{}'. "
"Error: '{}'".format(
self.broker_endpoint, str(e)))
return current_slave
def _peer_connector_to_slave(self):
"""Return the peer"""
slave_ip = self.current_slave[len("bro-slave-"):].split(":")[0]
slave_port = self.current_slave[len("bro-slave-"):].split(":")[1]
return self.connector_to_slave.peer(slave_ip, int(slave_port), 1)
def _repeer_connector_to_slave(self):
"""Repeer the connector to the slave bro if necessary"""
current_slave = self._lookup_and_get_current_slave()
if current_slave != self.current_slave:
# update the receiver, so repeer
self.current_slave = current_slave
if self.current_slave:
self.log.info("Repeering with {}".format(self.current_slave))
if self.connector_to_slave_peering:
self.connector_to_slave.unpeer(
self.connector_to_slave_peering)
self.connector_to_slave_peering = \
self._peer_connector_to_slave()
sleep(0.1) # repeering may take a moment, make sure..
else:
self.log.warn("No slave peered anymore.")
self.connector_to_slave.unpeer(self.connector_to_slave_peering)