Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import os 

2 

3from webapp import api 

4from webapp.api.exceptions import ( 

5 ApiResponseDecodeError, 

6 ApiResponseError, 

7 ApiResponseErrorList, 

8) 

9 

10BASE_URL = "https://066-EOV-335.mktorest.com/" 

11 

12LEAD_BY_EMAIL = BASE_URL + ( 

13 "rest/v1/leads.json?access_token={token}" 

14 "&filterType=email&filterValues={email}&fields=id" 

15) 

16 

17LEAD_NEWSLETTER_SUBSCRIPTION = BASE_URL + ( 

18 "rest/v1/lead/{lead_id}.json?access_token={token}" 

19 "&fields=id,email,snapcraftnewsletter" 

20) 

21 

22LEADS = BASE_URL + "rest/v1/leads.json?access_token={token}" 

23 

24 

25class Marketo: 

26 # Marketo isn't fast, so give it plenty of time to make a connection, 

27 # and respond 

28 def __init__(self, api_session=api.requests.Session()): 

29 self.api_session = api_session 

30 self.token = None 

31 

32 def _authenticate(self): 

33 client_id = os.environ["MARKETO_CLIENT_ID"] 

34 client_secret = os.environ["MARKETO_CLIENT_SECRET"] 

35 auth_url = BASE_URL + ( 

36 "identity/oauth/token?grant_type=client_credentials&" 

37 f"client_id={client_id}&client_secret={client_secret}" 

38 ) 

39 request = self.api_session.get(auth_url) 

40 response = self._process_response(request) 

41 self.token = response["access_token"] 

42 

43 def _process_response(self, response): 

44 try: 

45 body = response.json() 

46 except ValueError as decode_error: 

47 api_error_exception = ApiResponseDecodeError( 

48 "JSON decoding failed: {}".format(decode_error) 

49 ) 

50 raise api_error_exception 

51 

52 if not response.ok: 

53 if body.get("success") is False: 

54 api_error_exception = ApiResponseErrorList( 

55 "The api returned a list of errors", 

56 body["errors"][0]["code"], 

57 map(lambda error: error["message"], body["errors"]), 

58 ) 

59 raise api_error_exception 

60 else: 

61 status_code = response.status_code 

62 message = f"{status_code} - {body}" 

63 raise ApiResponseError(message, 500) 

64 

65 return body 

66 

67 def request(self, method, url, url_args={}, json=None): 

68 if not self.token: 

69 self._authenticate() 

70 

71 # Always format token here to make sure it's set 

72 url = url.format(token=self.token, **url_args) 

73 response = self.api_session.request(method=method, url=url, json=json) 

74 

75 # If there was a failure because the 

76 # token expired try to authentiacte again 

77 if response.status_code in [601, 602]: 

78 self._authenticate() 

79 response = self.api_session.request(method=method, url=url) 

80 

81 return self._process_response(response) 

82 

83 def get_user(self, email): 

84 response = self.request( 

85 method="GET", url=LEAD_BY_EMAIL, url_args=dict(email=email) 

86 ) 

87 if "result" in response and len(response["result"]) > 0: 

88 return response["result"][0] 

89 

90 else: 

91 return None 

92 

93 def get_newsletter_subscription(self, lead_id): 

94 response = self.request( 

95 method="GET", 

96 url=LEAD_NEWSLETTER_SUBSCRIPTION, 

97 url_args=dict(lead_id=lead_id), 

98 ) 

99 return response["result"][0] if "result" in response else {} 

100 

101 def set_newsletter_subscription(self, lead_email, newsletter_status): 

102 payload = { 

103 "action": "updateOnly", 

104 "asyncProcessing": False, 

105 "input": [ 

106 { 

107 "email": lead_email, 

108 "snapcraftnewsletter": ( 

109 True if newsletter_status else False 

110 ), 

111 } 

112 ], 

113 } 

114 

115 response = self.request(method="POST", url=LEADS, json=payload) 

116 return response