96 lines
4.3 KiB
Python
96 lines
4.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
import werkzeug
|
|
from urllib2 import urlopen, Request, HTTPError
|
|
from odoo import api, fields, models
|
|
|
|
API_ENDPOINT = 'https://api.twitter.com'
|
|
API_VERSION = '1.1'
|
|
REQUEST_TOKEN_URL = '%s/oauth2/token' % API_ENDPOINT
|
|
REQUEST_FAVORITE_LIST_URL = '%s/%s/favorites/list.json' % (API_ENDPOINT, API_VERSION)
|
|
URLOPEN_TIMEOUT = 10
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WebsiteTwitter(models.Model):
|
|
_inherit = 'website'
|
|
|
|
twitter_api_key = fields.Char(string='Twitter API key', help='Twitter API Key')
|
|
twitter_api_secret = fields.Char(string='Twitter API secret', help='Twitter API Secret')
|
|
twitter_screen_name = fields.Char(string='Get favorites from this screen name')
|
|
|
|
@api.model
|
|
def _request(self, website, url, params=None):
|
|
"""Send an authenticated request to the Twitter API."""
|
|
access_token = self._get_access_token(website)
|
|
if params:
|
|
params = werkzeug.url_encode(params)
|
|
url = url + '?' + params
|
|
try:
|
|
request = Request(url)
|
|
request.add_header('Authorization', 'Bearer %s' % access_token)
|
|
return json.load(urlopen(request, timeout=URLOPEN_TIMEOUT))
|
|
except HTTPError, e:
|
|
_logger.debug("Twitter API request failed with code: %r, msg: %r, content: %r",
|
|
e.code, e.msg, e.fp.read())
|
|
raise
|
|
|
|
@api.model
|
|
def _refresh_favorite_tweets(self):
|
|
''' called by cron job '''
|
|
website = self.env['website'].search([('twitter_api_key', '!=', False),
|
|
('twitter_api_secret', '!=', False),
|
|
('twitter_screen_name', '!=', False)])
|
|
_logger.debug("Refreshing tweets for website IDs: %r", website.ids)
|
|
website.fetch_favorite_tweets()
|
|
|
|
@api.multi
|
|
def fetch_favorite_tweets(self):
|
|
WebsiteTweets = self.env['website.twitter.tweet']
|
|
tweet_ids = []
|
|
for website in self:
|
|
if not all((website.twitter_api_key, website.twitter_api_secret, website.twitter_screen_name)):
|
|
_logger.debug("Skip fetching favorite tweets for unconfigured website %s", website)
|
|
continue
|
|
params = {'screen_name': website.twitter_screen_name}
|
|
last_tweet = WebsiteTweets.search([('website_id', '=', website.id),
|
|
('screen_name', '=', website.twitter_screen_name)],
|
|
limit=1, order='tweet_id desc')
|
|
if last_tweet:
|
|
params['since_id'] = int(last_tweet.tweet_id)
|
|
_logger.debug("Fetching favorite tweets using params %r", params)
|
|
response = self._request(website, REQUEST_FAVORITE_LIST_URL, params=params)
|
|
for tweet_dict in response:
|
|
tweet_id = tweet_dict['id'] # unsigned 64-bit snowflake ID
|
|
tweet_ids = WebsiteTweets.search([('tweet_id', '=', tweet_id)]).ids
|
|
if not tweet_ids:
|
|
new_tweet = WebsiteTweets.create(
|
|
{
|
|
'website_id': website.id,
|
|
'tweet': json.dumps(tweet_dict),
|
|
'tweet_id': tweet_id, # stored in NUMERIC PG field
|
|
'screen_name': website.twitter_screen_name,
|
|
})
|
|
_logger.debug("Found new favorite: %r, %r", tweet_id, tweet_dict)
|
|
tweet_ids.append(new_tweet.id)
|
|
return tweet_ids
|
|
|
|
def _get_access_token(self, website):
|
|
"""Obtain a bearer token."""
|
|
bearer_token_cred = '%s:%s' % (website.twitter_api_key, website.twitter_api_secret)
|
|
encoded_cred = base64.b64encode(bearer_token_cred)
|
|
request = Request(REQUEST_TOKEN_URL)
|
|
request.add_header('Content-Type',
|
|
'application/x-www-form-urlencoded;charset=UTF-8')
|
|
request.add_header('Authorization',
|
|
'Basic %s' % encoded_cred)
|
|
request.add_data('grant_type=client_credentials')
|
|
data = json.load(urlopen(request, timeout=URLOPEN_TIMEOUT))
|
|
access_token = data['access_token']
|
|
return access_token
|