"""
Here we implement :class:`DialogflowEsConnector`, an implementation of
:class:`Connector` that allows Agents to operate on Dialogflow ES.
"""
import os
import logging
import tempfile
from dataclasses import dataclass, field
from typing import Set, Dict, Union, Iterable, Type
import google.auth.credentials
from google.cloud.dialogflow_v2.types import TextInput, QueryInput, EventInput
from google.cloud.dialogflow_v2.services.sessions import SessionsClient
from google.cloud.dialogflow_v2.services.agents import AgentsClient
from google.cloud.dialogflow_v2 import types as pb
from intents import Agent, Intent, LanguageCode, FulfillmentContext, FulfillmentResult
from intents.model.relations import intent_relations
from intents.language_codes import ensure_language_code
from intents.connectors.interface import Connector, Prediction, FulfillmentRequest, WebhookConfiguration, deserialize_intent_parameters
from intents.connectors.dialogflow_es.auth import resolve_credentials
from intents.connectors.dialogflow_es.util import dict_to_protobuf
from intents.connectors.dialogflow_es import webhook
from intents.connectors.dialogflow_es import names as df_names
from intents.connectors.dialogflow_es import entities as df_entities
from intents.connectors.dialogflow_es import export as df_export
from intents.connectors.dialogflow_es.prediction import PredictionBody, DetectIntentBody, WebhookRequestBody, intent_responses
logger = logging.getLogger(__name__)
RICH_RESPONSE_PLATFORMS = ["telegram", "facebook", "slack", "line", "hangouts"]
# Dialogflow makes use of Protobuffer for its data structures, and protobuf may be
# tricky to deal with. For instance, `MessageToDict` will convert snake_case to
# lowerCamelCase, so API is documented snake_case, protobuf is snake_case,
# REST dialogflow results are camelCase, protobuf converted to dict is camelCase
# (unless you use a flag, in that case it could also be snake_case) 💀 This is one
# of the reasons this library exists.
[docs]@dataclass
class DialogflowPrediction(Prediction):
"""
This is an implementation of :class:`~intents.connectors.interface.Prediction`
that comes from Dialogflow. It adds a `df_response` field, through which the
full Dialogflow prediction payload can be accessed. Note that this is a tool
for debugging: relying on Dialogflow data in your business logic is likely
to make it harder to connect your Agent to different platforms.
`DialogflowPredictions` are produced internally by
:class:`DialogflowEsConnector`, and are returned by its :meth:`~DialogflowEsConnector.predict` and
:meth:`~DialogflowEsConnector.trigger` methods.
Args:
intent: An instance of the predicted Intent
confidence: Dialogflow's confidence on its prediction
fulfillment_messages: A map of Intent Responses, as they were
returned by the Service
fulfillment_text: A plain-text version of the response
df_response: Raw Dialogflow response data. It is advisable not to rely
on this is production, if you want to keep cross-service compatibility
"""
df_response: DetectIntentBody = field(default=False, repr=False)
[docs]class DialogflowEsConnector(Connector):
"""
This is an implementation of :class:`~intents.connectors.interface.Connector`
that enables Agents to work as Dialogflow projects.
An Agent can be connected to Dialogflow by providing its :class:`~intents.model.agent.Agent`
class and service account credentials for the the Google Cloud project
that hosts the Dialogflow ES agent:
.. code-block:: python
from example_agent import ExampleAgent
from intents.connectors import DialogflowEsConnector
df = DialogflowEsConnector('/path/to/your/service-account-credentials.json', ExampleAgent)
The Connector can now be used, mainly to
* Export the Agent with :meth:`DialogflowEsConnector.export`
* Predict an utterance with :meth:`DialogflowEsConnector.predict`
* Trigger an Intent with :meth:`DialogflowEsConnector.trigger`
Args:
google_credentials: Path to service account JSON credentials, or a Credentials object
agent_cls: The Agent to connect
default_session: An arbitrary string to identify the conversation during
predictions. If None, Connector will generate a random string
default_language: Default language to use during predictions. If None, Connector
will use the Agent's firs defined language.
rich_platforms: Platforms to include when exporting Rich response messages
webhook_configuration: Webhook connection parameters
"""
entity_mappings = df_entities.MAPPINGS
rich_platforms: Iterable[str]
webhook_configuration: WebhookConfiguration
_credentials: google.auth.credentials.Credentials
_session_client: SessionsClient
_need_context_set: Set[type(Intent)]
_intents_by_context: Dict[str, type(Intent)]
def __init__(
self,
google_credentials: Union[str, google.auth.credentials.Credentials],
agent_cls: type(Agent),
default_session: str=None,
default_language: Union[LanguageCode, str]=None,
rich_platforms: Iterable[str]=("telegram",),
webhook_configuration: WebhookConfiguration=None
):
super().__init__(agent_cls, default_session=default_session,
default_language=default_language)
self._credentials = resolve_credentials(google_credentials)
assert all([p in RICH_RESPONSE_PLATFORMS for p in rich_platforms])
self._session_client = SessionsClient(credentials=self._credentials)
self.rich_platforms = rich_platforms
self.webhook_configuration = webhook_configuration
self._need_context_set = _build_need_context_set(agent_cls)
self._intents_by_context = _build_intents_by_context(agent_cls)
@property
def gcp_project_id(self) -> str:
"""
Return the Google Cloud Project ID that is associated with the current Connection
"""
return self._credentials.project_id
[docs] def export(self, destination: str):
agent_name = 'py-' + self.agent_cls.__name__
return df_export.export(self, destination, agent_name)
[docs] def upload(self):
agents_client = AgentsClient(credentials=self._credentials)
with tempfile.TemporaryDirectory() as tmp_dir:
export_path = os.path.join(tmp_dir, 'agent.zip')
self.export(export_path)
with open(export_path, 'rb') as f:
agent_content = f.read()
restore_request = pb.RestoreAgentRequest(
parent=f"projects/{self.gcp_project_id}",
agent_content=agent_content
)
agents_client.restore_agent(request=restore_request)
[docs] def predict(self, message: str, session: str = None, language: Union[LanguageCode, str] = None) -> DialogflowPrediction:
if not session:
session = self.default_session
if not language:
language = self.default_language
language = ensure_language_code(language)
text_input = TextInput(text=message, language_code=language.value)
query_input = QueryInput(text=text_input)
session_path = self._session_client.session_path(
self.gcp_project_id, session)
df_result = self._session_client.detect_intent(
session=session_path,
query_input=query_input
)
df_response = DetectIntentBody(df_result)
return self._df_body_to_prediction(df_response)
[docs] def trigger(self, intent: Intent, session: str=None, language: Union[LanguageCode, str]=None) -> DialogflowPrediction:
if not session:
session = self.default_session
if not language:
language = self.default_language
language = ensure_language_code(language)
intent_name = intent.name
event_name = df_names.event_name(intent.__class__)
event_parameters = {}
for param_name, param_metadata in intent.parameter_schema.items():
param_mapping = df_entities.MAPPINGS[param_metadata.entity_cls]
if param_name in intent.__dict__:
param_value = intent.__dict__[param_name]
event_parameters[param_name] = param_mapping.to_service(param_value)
logger.info("Triggering event '%s' in session '%s' with parameters: %s",
event_name, session, event_parameters)
if not event_parameters:
event_parameters = {}
event_input = EventInput(
name=event_name,
parameters=dict_to_protobuf(event_parameters),
language_code=language.value
)
query_input = QueryInput(event=event_input)
session_path = self._session_client.session_path(
self.gcp_project_id, session)
df_result = self._session_client.detect_intent(
session=session_path,
query_input=query_input
)
df_response = DetectIntentBody(df_result)
return self._df_body_to_prediction(df_response)
[docs] def fulfill(self, fulfillment_request: FulfillmentRequest) -> dict:
webhook_body = WebhookRequestBody(fulfillment_request.body)
intent = self._df_body_to_intent(webhook_body)
context = self._df_body_to_fulfillment_context(webhook_body)
fulfillment_result = FulfillmentResult.ensure(intent.fulfill(context))
logger.debug("Returning fulfillment result: %s", fulfillment_result)
if fulfillment_result:
return webhook.fulfillment_result_to_response(fulfillment_result, context)
return {}
def _df_body_to_fulfillment_context(self, df_body: DetectIntentBody) -> DialogflowPrediction:
return FulfillmentContext(
confidence=df_body.queryResult.intentDetectionConfidence,
fulfillment_messages=intent_responses(df_body),
fulfillment_text=df_body.queryResult.fulfillmentText,
language=LanguageCode(df_body.queryResult.languageCode)
)
def _df_body_to_prediction(self, df_body: DetectIntentBody) -> DialogflowPrediction:
return DialogflowPrediction(
intent=self._df_body_to_intent(df_body),
confidence=df_body.queryResult.intentDetectionConfidence,
fulfillment_messages=intent_responses(df_body),
fulfillment_text=df_body.queryResult.fulfillmentText,
df_response=df_body.detect_intent
)
def _df_body_to_intent(
self,
df_body: PredictionBody,
build_related_cls: Type[Intent]=None,
visited_intents: Set[Type[Intent]]=None
) -> Intent:
"""
Convert a Dialogflow prediction response into an instance of
:class:`Intent`.
This method is recursive on intent relations. When an intent has a
:meth:`~intents.model.relations.follow` field, that field must be filled
with an instance of the followed intent; in this case
:meth:`_df_body_to_intent` will call itself passing the parent intent
class as `build_related_cls`, to force building that intent from the
same `df_body`; contexts and parameters will be checked for consistency.
Args:
df_body: A Dialogflow Response
build_related_cls: Force to build the related intent instead of
the predicted one
visited_intents: This is used internally to prevent recursion loops
"""
if not visited_intents:
visited_intents = set()
contexts, context_parameters = df_body.contexts()
# Slot filling in progress
# TODO: also check queryResult.cancelsSlotFilling
# if "__system_counters__" in contexts:
if not df_body.queryResult.allRequiredParamsPresent:
logger.warning("Prediction doesn't have values for all required parameters. "
"Slot filling may be in progress, but this is not modeled yet: "
"Intent object will be None")
return None
if build_related_cls:
# TODO: adjust lifespan
intent_cls = build_related_cls
df_parameters = {
p_name: p.value for p_name, p in context_parameters.items()
if p_name in intent_cls.parameter_schema
}
else:
intent_name = df_body.intent_name
intent_cls: Intent = self.agent_cls._intents_by_name.get(intent_name)
if not intent_cls:
raise ValueError(f"Prediction returned intent '{intent_name}', " +
"but this was not found in Agent definition. Make sure to restore a latest " +
"Agent export from `services.dialogflow_es.export.export()`. If the problem " +
"persists, please file a bug on the Intents repository.")
df_parameters = df_body.intent_parameters
visited_intents.add(intent_cls)
parameter_dict = deserialize_intent_parameters(df_parameters, intent_cls, self.entity_mappings)
related_intents_dict = {}
for rel in intent_relations(intent_cls).follow:
if rel.target_cls in visited_intents:
raise ValueError(f"Loop detected: {rel.target_cls} was already visited. Make sure "
"your Agent has no circular dependencies")
related_intent = self._df_body_to_intent(df_body, rel.target_cls, visited_intents)
related_intent.lifespan = df_body.context_lifespans.get(df_names.context_name(rel.target_cls), 0)
related_intents_dict[rel.field_name] = related_intent
result = intent_cls(**parameter_dict, **related_intents_dict)
result.lifespan = df_body.context_lifespans.get(df_names.context_name(intent_cls), 0)
return result
def _intent_needs_context(self, intent: Intent) -> bool:
return intent in self._need_context_set
def _build_need_context_set(agent_cls: type(Agent)) -> Set[Intent]:
"""
Return a list of intents that need to spawn a context, based on their
relations:
* If intent *B* follows intent *A*, then intent *A* needs to spawn a context
"""
result = set()
for intent in agent_cls.intents:
related = intent_relations(intent)
for rel in related.follow:
result.add(rel.target_cls)
return result
def _build_intents_by_context(agent_cls: Type[Agent]) -> Dict[str, Type[Intent]]:
result = {}
for intent_cls in agent_cls.intents:
context_name = df_names.context_name(intent_cls)
if context_name in result:
raise ValueError(f"Intents '{intent_cls.name}' and '{result[context_name].name}' " +
"have ambiguous context name. This is a bug: please file an issue on the " +
"Intents repo. Quick fix: change the name of one of the two intents.")
result[context_name] = intent_cls
return result