Users API for my twitch game

I’ve been working on this game for a few weekends. Here is the flask API I will be using to create users, update their information, and load them for use in my game. This will integrate with Unity and Twitch. Users are created easily and will automatically be given an id, username, and password which I would like to share in the future with my website which will display user stats.

This project is a work in progress and these files will be likely change regularly. At the time of this post, all endpoints work and examples can be found in the comments for each method.

See methods below for instructions to call

GET

http://127.0.0.1:5000/api/users/ http://127.0.0.1:5000/api/users/<string:username> http://127.0.0.1:5000/api/users/<int:id>

POST

http://127.0.0.1:5000/api/users/ http://127.0.0.1:5000/api/users/update/

#!/usr/bin/env python
import os
from flask import Flask, abort, request, jsonify, g, url_for
from flask_sqlalchemy import SQLAlchemy
from flask_httpauth import HTTPBasicAuth
from passlib.apps import custom_app_context as pwd_context
from itsdangerous import (TimedJSONWebSignatureSerializer
                        as Serializer, BadSignature, SignatureExpired)
import json

## initialization
app = Flask(__name__)
app.config['SECRET_KEY'] = 'the quick brown fox jumps over the lazy dog'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite'
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True

## extensions
db = SQLAlchemy(app)
auth = HTTPBasicAuth()


class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(32), unique=True, nullable=False)
    # only display this using auth methods requiring password to login
    email = db.Column(db.String(120), unique=True)
    # only store password hashes using hash_password() method
    password_hash = db.Column(db.String(64))

    # STATS #
    meleeAtkXp = db.Column(db.Integer, default=0)
    magicAtkXp = db.Column(db.Integer, default=0)
    rangeAtkXp = db.Column(db.Integer, default=0)

    healthXp = db.Column(db.Integer, default=0)
    shieldXp = db.Column(db.Integer, default=0)
    evasionXp = db.Column(db.Integer, default=0)
    sizeXp = db.Column(db.Integer, default=0)
    blockXp = db.Column(db.Integer, default=0)
    ResistXp = db.Column(db.Integer, default=0)
    speedXp = db.Column(db.Integer, default=0)


## The hash_password() method takes a plain password as argument and stores a hash of it with the user.
## This method is called when a new user is registering with the server, or when the user changes the password.
    def hash_password(self, password):
        self.password_hash = pwd_context.encrypt(password)

    # The verify_password() method takes a plain password as argument and returns True if the password is correct or
    False if not. This method is called whenever the user provides credentials and they need to be validated.

    def verify_password(self, password):
        return pwd_context.verify(password, self.password_hash)

    def generate_auth_token(self, expiration=600):
        s = Serializer(app.config['SECRET_KEY'], expires_in=expiration)
        return s.dumps({'id': self.id})

    @staticmethod
    def verify_auth_token(token):
        s = Serializer(app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except SignatureExpired:
            return None    # valid token, but expired
        except BadSignature:
            return None    # invalid token
        user = User.query.get(data['id'])
        return user

    def update_password(self, password, new_password):
        if(self.verify_password(password) == True):
            self.hash_password(new_password)
            print("Updated password for {}".format(self.username))
        else:
            return("Password change failed, password hash did not match for user: {}".format(self.username))


    @auth.verify_password
    def verify_password(username_or_token, password):
    # first try to authenticate by token
    user = User.verify_auth_token(username_or_token)
    if not user:
        # try to authenticate with username/password
        user = User.query.filter_by(username=username_or_token).first()
        if not user or not user.verify_password(password):
            return False
    g.user = user
    return True

Update user information email, password by supplying username, password, new_password, and email key value pairs.

Example: curl -i -X POST -H "Content-Type: application/json" -d '{"username":"kicksent","password":"python", "new_password": "python3", "email":"email@address.com"}' http://127.0.0.1:5000/api/users/update/ curl -i -X POST -H “Content-Type: application/json” -d ‘{“username”:”kicksent”,”password”:”python”, “new_password”: “python3”, “email”:”email@address.com”}’ http://127.0.0.1:5000/api/users/update/

@app.route('/api/users/update/', methods=['POST'])
def update_user():
    username = request.json.get('username')
    if username is None:
        abort(400, "Username not specified")    # missing arguments
    user = User.query.filter_by(username=username).first()
    if user is None:
        # cannot update a user that does not exist
        abort(400, ("User does not already exists in database."))
    password = request.json.get('password')
    new_password = request.json.get('password')
    if (new_password and password):
        user.update_password(password, new_password)

    email = request.json.get('email')
    if(email):
        user.email = email
    print("updated password for {}".format(user.username))

    db.session.commit()
    return (jsonify({'username': user.username}), 201,
            {'Location': url_for('update_user', id=user.id, _external=True)})

Create a new user by supplying json in a post request with username and password key value pairs.

Example: curl -i -X POST -H "Content-Type: application/json" -d '{"username":"kicksent","password":"python"}' http://127.0.0.1:5000/api/users

@app.route('/api/users', methods=['POST'])
def new_user():
    username = request.json.get('username')
    password = request.json.get('password')
    if username is None or password is None:
        abort(400, "Username or password not specified")    # missing arguments
    if User.query.filter_by(username=username).first() is not None:
        abort(409, "User: {} already exists in database.".format(
            username))    # existing user
    user = User(username=username)
    user.hash_password(password)
    db.session.add(user)
    db.session.commit()
    return (jsonify({'username': user.username}), 201,
            {'Location': url_for('new_user', id=user.id, _external=True)})

GET list all usernames

http://127.0.0.1:5000/api/users/

@app.route('/api/users/', methods=['GET'])
def get_users():
    user = User.query.all()
    if not user:
        abort(400, "No users found.")
    data = {'users': []}
    for item in user:
        data['users'].append(item.username)
    print(data)
    return jsonify(data, 201, {'Location': url_for('get_users',  _external=True)})

GET user by username

Returns a list of User objects

http://127.0.0.1:5000/api/users/kicksent/

@app.route('/api/users/<string:username>')
def get_user_by_username(username):
    user = User.query.filter_by(username=username).first()
    if not user:
        abort(400)
    # return jsonify({'username': user.username})
    return jsonify({
        'username': 'Hello, %s!' % user.username,
        'stats': {
            'meleeAtkXp': user.meleeAtkXp,
            'mageAtkXp': user.magicAtkXp,
            'rangeAtkXp': user.rangeAtkXp,

            'sizeXp': user.sizeXp,
            'healthXp': user.healthXp,
            'shieldXp': user.healthXp,
            'evasionXp': user.evasionXp,
            'blockXp': user.blockXp,
            'ResistXp': user.ResistXp,

            'speedXp': user.speedXp,
        }
    }, 201, {'Location': url_for('get_user_by_username', username=user.username, _external=True)})

GET user by ID http://127.0.0.1:5000/api/users/2/

@app.route('/api/users/<int:id>')
def get_user_by_id(id):
    user = User.query.get(id)
    if not user:
        abort(400)
    return jsonify({'username': user.username}, 201, {'Location': url_for('get_user_by_id', id=user.id, _external=True)})
    return jsonify({
        'username': 'Hello, %s!' % user.username,
        'stats': {
            'meleeAtkXp': user.meleeAtkXp,
            'mageAtkXp': user.magicAtkXp,
            'rangeAtkXp': user.rangeAtkXp,

            'sizeXp': user.sizeXp,
            'healthXp': user.healthXp,
            'shieldXp': user.healthXp,
            'evasionXp': user.evasionXp,
            'blockXp': user.blockXp,
            'ResistXp': user.ResistXp,

            'speedXp': user.speedXp,
        }
    }, 201, {'Location': url_for('get_user_by_id', id=user.id, _external=True)})


@app.route('/api/token/')
@auth.login_required
def get_auth_token():
    token = g.user.generate_auth_token(600)
    return jsonify({'token': token.decode('ascii'), 'duration': 600})

Login with your user credentials to see your stats, email, and user id.

@app.route('/api/resource/')
@auth.login_required
def get_resource():
    return jsonify({
        'username': 'Hello, %s!' % g.user.username,
        'id': g.user.id,
        'stats': {
            'email': g.user.email,
            'meleeAtkXp': g.user.meleeAtkXp,
            'mageAtkXp': g.user.magicAtkXp,
            'rangeAtkXp': g.user.rangeAtkXp,
            'sizeXp': g.user.sizeXp,
            'healthXp': g.user.healthXp,
            'shieldXp': g.user.healthXp,
            'evasionXp': g.user.evasionXp,
            'blockXp': g.user.blockXp,
            'ResistXp': g.user.ResistXp,

            'speedXp': g.user.speedXp,
        }
    }, 201, {'Location': url_for('get_resource', _external=True)})


if __name__ == '__main__':
    if not os.path.exists('db.sqlite'):
        db.create_all()
    app.run(debug=True)

2019

ASCII Otter

less than 1 minute read

Enjoy this otter ascii art.

Back to top ↑