Source code for ipfshttpclient.client.pubsub
import typing as ty
from . import base
class SubChannel:
"""Wrapper for a pubsub subscription object that allows for easy
closing of subscriptions.
"""
def __init__(self, sub):
self.__sub = sub # type: str
def read_message(self):
return next(self.__sub)
def __iter__(self):
return self.__sub
def close(self):
self.__sub.close()
def __enter__(self):
return self
def __exit__(self, *a):
self.close()
class Section(base.SectionBase):
@base.returns_single_item(base.ResponseBase)
def ls(self, **kwargs: base.CommonArgs):
"""Lists subscribed topics by name
This method returns data that contains a list of
all topics the user is subscribed to. In order
to subscribe to a topic ``pubsub.sub`` must be called.
.. code-block:: python
# subscribe to a channel
>>> with client.pubsub.sub("hello") as sub:
... client.pubsub.ls()
{
'Strings' : ["hello"]
}
Returns
-------
dict
+---------+-------------------------------------------------+
| Strings | List of topic the IPFS daemon is subscribbed to |
+---------+-------------------------------------------------+
"""
return self._client.request('/pubsub/ls', decoder='json', **kwargs)
@base.returns_single_item(base.ResponseBase)
def peers(self, topic: ty.Optional[str] = None, **kwargs: base.CommonArgs):
"""Lists the peers we are pubsubbing with
Lists the IDs of other IPFS users who we
are connected to via some topic. Without specifying
a topic, IPFS peers from all subscribed topics
will be returned in the data. If a topic is specified
only the IPFS id's of the peers from the specified
topic will be returned in the data.
.. code-block:: python
>>> client.pubsub.peers()
{'Strings':
[
'QmPbZ3SDgmTNEB1gNSE9DEf4xT8eag3AFn5uo7X39TbZM8',
'QmQKiXYzoFpiGZ93DaFBFDMDWDJCRjXDARu4wne2PRtSgA',
...
'QmepgFW7BHEtU4pZJdxaNiv75mKLLRQnPi1KaaXmQN4V1a'
]
}
## with a topic
# subscribe to a channel
>>> with client.pubsub.sub('hello') as sub:
... client.pubsub.peers(topic='hello')
{'String':
[
'QmPbZ3SDgmTNEB1gNSE9DEf4xT8eag3AFn5uo7X39TbZM8',
...
# other peers connected to the same channel
]
}
Parameters
----------
topic
The topic to list connected peers of
(defaults to None which lists peers for all topics)
Returns
-------
dict
+---------+-------------------------------------------------+
| Strings | List of PeerIDs of peers we are pubsubbing with |
+---------+-------------------------------------------------+
"""
args = (topic,) if topic is not None else ()
return self._client.request('/pubsub/peers', args, decoder='json', **kwargs)
@base.returns_no_item
def publish(self, topic: str, payload: str, **kwargs: base.CommonArgs):
"""Publish a message to a given pubsub topic
Publishing will publish the given payload (string) to
everyone currently subscribed to the given topic.
All data (including the ID of the publisher) is automatically
base64 encoded when published.
.. code-block:: python
# publishes the message 'message' to the topic 'hello'
>>> client.pubsub.publish('hello', 'message')
[]
Parameters
----------
topic
Topic to publish to
payload
Data to be published to the given topic
Returns
-------
list
An empty list
"""
args = (topic, payload)
return self._client.request('/pubsub/pub', args, decoder='json', **kwargs)
def subscribe(self, topic: str, discover: bool = False, **kwargs: base.CommonArgs):
"""Subscribes to mesages on a given topic
Subscribing to a topic in IPFS means anytime
a message is published to a topic, the subscribers
will be notified of the publication.
The connection with the pubsub topic is opened and read.
The Subscription returned should be used inside a context
manager to ensure that it is closed properly and not left
hanging.
.. code-block:: python
>>> sub = client.pubsub.subscribe('testing')
>>> with client.pubsub.subscribe('testing') as sub:
... # publish a message 'hello' to the topic 'testing'
... client.pubsub.publish('testing', 'hello')
... for message in sub:
... print(message)
... # Stop reading the subscription after
... # we receive one publication
... break
{'from': '<base64encoded IPFS id>',
'data': 'aGVsbG8=',
'topicIDs': ['testing']}
# NOTE: in order to receive published data
# you must already be subscribed to the topic at publication
# time.
Parameters
----------
topic
Name of a topic to subscribe to
discover
Try to discover other peers subscibed to the same topic
(defaults to False)
Returns
-------
:class:`SubChannel`
Generator wrapped in a context manager that maintains a
connection stream to the given topic.
"""
args = (topic, discover)
return SubChannel(self._client.request('/pubsub/sub', args, stream=True, decoder='json'))