基于python实现腾讯QQ oauth2.0登录

因网站驴图结合社交化的需要,想通过现在已有的微博用户即可登录网站,当时首先考虑了新浪微博,且Liao Xuefeng 有使用Python编写了python client sdk for sina weibo API OAuth2.0。所以在使用新浪微博进行登录和发送微博等都和正常。但是当上线时才发现没有备案的域名实在是难得到审核通过,尤其服务器还是在国外,所以只好投靠腾讯QQ,查了下相关的资料,QQ官网无正式的python sdk 进行oauth 2.0登录,但是好像有一个针对开放平台的sdk,具体没有详细查看,但里面也没有相关登录的,所以只好基于Liao Xuefeng的新浪微博API,进行了相关修改,符合腾讯的API标准,主要修改包括,scope的添加,相关调用参数的完善(调用接口必须严格按照腾讯的API文档中的描述进行访问,否则会出现很多问题),完善上传图片微博(主要针对驴图网站的使用)等。

下面为对应的代码,另这个主要针对驴图的实现,但是应该也能符合大部分基本的QQ Oauth登陆等需求。有需要的可以在此基础上进行修改。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

__version__ = '1.0'
__author__ = 'H.J (higis.studio@gmail.com)'

'''
Python client SDK for QQ weibo API using OAuth 2.Used in http://www.lvtumap.com
steel code from Liao Xuefeng (askxuefeng@gmail.com) for sina weibo
'''

try:
    import json
except ImportError:
    import simplejson as json
import time,os
import urllib
import urllib2
import logging
import uuid

def _obj_hook(pairs):
    '''
    convert json object to python object.
    '''
    o = JsonObject()
    for k, v in pairs.iteritems():
        o[str(k)] = v
    return o

class APIError(StandardError):
    '''
    raise APIError if got failed json message.
    '''
    def __init__(self, error_code, msg, request):
        self.error_code = error_code
        self.msg = msg
        self.request = request
        StandardError.__init__(self, msg)

    def __str__(self):
        return 'APIError: %s: %s, request: %s' % (self.error_code, self.msg, self.request)

class JsonObject(dict):
    '''
    general json object that can bind any fields but also act as a dict.
    '''
    def __getattr__(self, attr):
        return self[attr]

    def __setattr__(self, attr, value):
        self[attr] = value

def _encode_params(**kw):
    '''
    Encode parameters.
    '''
    args = []
    for k, v in kw.iteritems():
        qv = v.encode('utf-8') if isinstance(v, unicode) else str(v)
        args.append('%s=%s' % (k, urllib.quote(qv)))
    return '&'.join(args)

def _encode_multipart(**kw):
    '''
    Build a multipart/form-data body with generated random boundary.
    '''
    boundary = '----------%s' % hex(int(time.time() * 1000))
    data = []
    for k, v in kw.iteritems():
        data.append('--%s' % boundary)
        if hasattr(v, 'read'):
            # file-like object:
            ext = ''
            filename = getattr(v, 'name', '')
            n = filename.rfind('.')
            if n != (-1):
                ext = filename[n:].lower()
            content = v.read()
            data.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (k,os.path.basename(filename)))
            #data.append('Content-Length: %d' % len(content))
            data.append('Content-Type: %s\r\n' % _guess_content_type(ext))
            data.append(content)
        else:
            data.append('Content-Disposition: form-data; name="%s"\r\n' % k)
            data.append(v.encode('utf-8') if isinstance(v, unicode) else v)
    data.append('--%s--\r\n' % boundary)
    return '\r\n'.join(data), boundary

_CONTENT_TYPES = { '.png': 'image/png', '.gif': 'image/gif', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.jpe': 'image/jpeg' }

def _guess_content_type(ext):
    return _CONTENT_TYPES.get(ext, 'application/octet-stream')

_HTTP_GET = 0
_HTTP_POST = 1
_HTTP_UPLOAD = 2

def _http_get(api_url, client=None, **kw):
    logging.info('GET %s' % api_url)
    return _http_call(api_url, _HTTP_GET, client, **kw)

def _http_post(api_url, client=None, **kw):
    logging.info('POST %s' % api_url)
    return _http_call(api_url, _HTTP_POST, client, **kw)

def _http_upload(api_url, client=None, **kw):
    logging.info('MULTIPART POST %s' % api_url)
    return _http_call(api_url, _HTTP_UPLOAD, client, **kw)

def _http_call(api_url, method, client, **kw):
    '''
    send an http request and expect to return a json object if no error.
    '''
    params = None
    boundary = None

    if method==_HTTP_UPLOAD or method==_HTTP_POST:
        http_url=api_url
        http_body, boundary =_encode_multipart(
            oauth_consumer_key=client.client_id,
            access_token=client.access_token,
            openid=client.openid,
            **kw
            #oauth_version="2.a",
            #scope=client.scope
        )
    else:
        params = _encode_params(**kw)
        if client:
            auth_params = _encode_params(
                oauth_consumer_key=client.client_id,
                access_token=client.access_token,
                openid=client.openid,
                #oauth_version="2.a",
                #scope=client.scope
            )
            api_url = '%s?%s' % (api_url, auth_params)
        http_url = '%s%s' % (api_url, params) if method==_HTTP_GET else api_url
        http_body = None if method==_HTTP_GET else params

    #print 'url:%s body:%s' % (http_url,http_body)
    req = urllib2.Request(http_url, data=http_body)
    if boundary:
        req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % boundary)
    try:
        resp = urllib2.urlopen(req)
        body = resp.read()
    except urllib2.HTTPError, e:
        body = e.read()

    try:
        body=body.replace('callback','').replace('(','').replace(')','').replace(';','').replace('\n','')
        r = json.loads(body, object_hook=_obj_hook)
        if hasattr(r, 'errcode') and r.errcode != 0:
            raise APIError(r.errcode, getattr(r, 'msg', ''), http_url)
    except ValueError:
        r = body
    return r

class HttpObject(object):

    def __init__(self, client, method):
        self.client = client
        self.method = method

    def __getattr__(self, attr):
        def wrap(**kw):
            if self.client.is_expires():
                raise APIError('10019', 'expired_token', attr)
            #return _http_call('%s%s.json' % (self.client.api_url, attr.replace('__', '/')), self.method, self.client.access_token, self.openid, **kw)
            return _http_call('%s%s' % (self.client.api_url, attr.replace('__', '/')), self.method, self.client, **kw)
        return wrap

class APIClient(object):
    '''
    API client using synchronized invocation.
    '''
    def __init__(self, app_key, app_secret, redirect_uri=None,
                 clientip='127.0.0.1', response_type='code',
                 domain='graph.qq.com'):
        self.client_id = app_key
        self.client_secret = app_secret
        self.redirect_uri = redirect_uri
        self.response_type = response_type
        self.auth_url = 'https://%s/oauth2.0/' % domain
        self.api_url = 'https://%s/' % domain
        self.access_token = None
        self.openid = None
        self.expires = 0.0
        self.get = HttpObject(self, _HTTP_GET)
        self.post = HttpObject(self, _HTTP_POST)
        self.upload = HttpObject(self, _HTTP_UPLOAD)
        self.state=uuid.uuid4()
        self.scope='get_info,add_t,add_pic_t'

    def set_access_token(self, access_token, openid, expires_in):
        self.access_token = str(access_token)
        self.openid = openid
        self.expires = float(expires_in)

    def get_authorize_url(self, redirect_uri=None):
        '''
        return the authroize url that should be redirect.
        '''
        redirect = redirect_uri if redirect_uri else self.redirect_uri
        if not redirect:
            raise APIError('10013', 'Parameter absent: redirect_uri', 'OAuth2 request')
        return '%s%s?%s' % (self.auth_url, 'authorize', \
                _encode_params(client_id = self.client_id, \
                        response_type = 'code',\
                        scope=self.scope,\
                        redirect_uri = redirect))

    def request_openid(self,access_token):
        '''
        return openid {"client_id":"YOUR_APPID","openid":"YOUR_OPENID"}
        '''
        body = _http_get('%s%s?' % (self.auth_url, 'me'),
            access_token=access_token)
        if body is not str:
            return body
        r = _obj_hook(dict([p.split('=') for p in body.split('&')]))
        return r

    def request_access_token(self, code, redirect_uri=None, ):
        '''
        return access token as object: {"access_token":"your-access-token","expires_in":12345678}, expires_in is standard unix-epoch-time
        '''
        redirect = redirect_uri if redirect_uri else self.redirect_uri
        if not redirect:
            raise APIError('10013', 'Parameter absent: redirect_uri', 'OAuth2 request')
        body = _http_get('%s%s?' % (self.auth_url, 'token'),
                grant_type = 'authorization_code',
                client_id = self.client_id,
                redirect_uri = redirect,
                client_secret = self.client_secret,
                code = code,state=self.state)

        r = _obj_hook(dict([p.split('=') for p in body.split('&')]))
        r.expires_in = int(r.expires_in) + int(time.time())
        return r

    def refresh_token(self, refresh_token):
        body = _http_get('%s%s?' % (self.auth_url, 'access_token'), \
                client_id = self.client_id, \
                client_secret = self.client_secret, \
                refresh_token = refresh_token, \
                grant_type = 'refresh_token')
        r = _obj_hook(dict([p.split('=') for p in body.split('&')]))
        r.expires_in = int(r.expires_in) + int(time.time())
        return r

    def is_expires(self):
        return not self.access_token or time.time() > self.expires

    def __getattr__(self, attr):
        return getattr(self.get, attr)

 

本文固定链接: http://www.higis.org/2012/11/23/qq-oauth2/ | Hi,GIS


该日志由 H.J 于2012年11月23日发表在 程序 分类下, 你可以发表评论,并在保留原文地址及作者的情况下引用到你的网站或博客。
原创文章转载请注明: 基于python实现腾讯QQ oauth2.0登录 | Hi,GIS
关键字: , , , ,

基于python实现腾讯QQ oauth2.0登录:目前有1 条留言

发表评论

快捷键:Ctrl+Enter