Difference between revisions of "Short Notes on Python"
(→SQLAlchemy) |
(→Some Basics) |
||
Line 232: | Line 232: | ||
=== Some Basics === | === Some Basics === | ||
+ | |||
+ | ==== Select, Where, Group ==== | ||
==== Updating ==== | ==== Updating ==== | ||
<pre> | <pre> | ||
− | session.query( | + | session.query(Table).\ |
− | filter(...).\ | + | filter(Table.id.in_(...)).\ |
− | update({ | + | update({ |
+ | Table.column1: 10, | ||
+ | Table.column2: Table.column2 + 50 | ||
+ | }) | ||
+ | </pre> | ||
+ | |||
+ | ==== Scalar Values ==== | ||
+ | |||
+ | <pre> | ||
+ | max_value = session.query(sqlalchemy.func.max(Table.column)).scalar() | ||
+ | |||
+ | # or, for multiple, just use | ||
+ | session.query(sqlalchemy.func.min(Table.column), sqlalchemy.func.max(Table.column)).first() | ||
</pre> | </pre> | ||
Revision as of 02:24, 7 August 2020
Contents
Timing, and memory, on Linux
Timing
On Linux, it's safer to use time.time()
import time t = time.time() # do some stuff print "stuff took %1.3f", time.time() - t, "seconds"
On Windows, AFAIK, it's safer to use time.clock()
Memory
For me, the following does a good job getting memory usage (in kB) on Linux:
import resource print resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
Since resource is standard package, it should work on Windows too, but I don't know if it does, or what units are used if it works.
Importing Files
If you need to import a file '../mylib/commons.py', you can use the following snippet:
import sys, os fld = os.path.realpath(os.path.abspath(os.path.join('..', 'mylib'))) if fld not in sys.path: sys.path.insert(0, fld) import commons # use your commons.py module now... commons.super_function()
Merging Dictionaries
Below are few solutions, the first one works for any list of dictionaries, the rest is just for 2 dict's, with the last 2 suitable for inlining:
# as a generic function that merges list of dict's def merge_dicts(dicts: list) -> dict: res = dict() for d in dicts: res.update(d) return res # if you're merging known number of dict's: def merge_two_1(a: dict, b: dict) -> dict: return dict(a, **b) # python 3.5+, the fastest of the lot def merge_two_1(a: dict, b: dict) -> dict: return {**a, **b}
uWSGI, nginx, Flask
- install uwsgi (incl. uwsgi python plugin), python flask, and nginx,
Setting Up uWSGI
- create main.py file that will hold the server logic, for instance:
from flask import Flask app = Flask(__name__) @app.route("/") def hello(): return "hello there!"
- create uwsgi config file, wsgi.ini (minimal version here; read uwsgi docs for head-spinning array of configurables):
[uwsgi] module = main:app master = true processes = 5 socket = 127.0.0.1:8000 protocol = http plugin = python
- run uwsgi
uwsgi --ini wsgi.ini
- check that all works on http://localhost:8000/
Adding nginx Layer
- remove the "protocol" directive from wsgi.ini, and add "die-on-term":
[uwsgi] module = main:app master = true processes = 5 socket = 127.0.0.1:8000 plugin = python die-on-term = true
- add a new vhost to nginx - /etc/nginx/sites-available/app.nginx:
server { listen 80; server_name my.awesome.domain; location / { include uwsgi_params; uwsgi_pass 127.0.0.1:8000; } }
- communication through socket is also possible (see socket, chmod-socket, vacuum and other directives for uWSGI)
- of course, create link in /etc/nginx/sites-enabled/, and restart nginx,
Run uWSGI daemon on boot - supervisor
- install supervisor
apt-get install supervisor
- add/edit /etc/supervisord.conf with content like this:
[unix_http_server] file=/run/supervisor.sock [supervisord] logfile=/var/log/supervisord.log logfile_maxbytes=1MB logfile_backups=1 loglevel=info ; (others: warn,debug,trace) pidfile=/run/supervisord.pid nodaemon=false minfds=1024 minprocs=200 [supervisorctl] serverurl=unix:///run/supervisor.sock [program:your app] directory=/path/to/app command=/usr/bin/uwsgi --ini wsgi.ini autostart=true autorestart=true stdout_logfile=/var/log/uwsgi.log redirect_stderr=true stopsignal=QUIT
Run uWSGI daemon on boot - systemd
- create systemd file for uWSGI, /etc/systemd/system/uwsgi-app.service:
[Unit] Description=Job that runs the uWSGI app [Service] Type=simple WorkingDirectory=/home/project/flask-test/ ExecStart=/usr/bin/uwsgi --ini wsgi.ini [Install] WantedBy=multi-user.target
Then you can start and stop the uwsgi service using:
# systemctl start uwsgi-app.service # systemctl stop uwsgi-app.service
Once you're happy with the settings, enable the daemon to be run on boot:
# systemctl enable uwsgi-app.service
Decorators
needs a bit of refreshment and updating...
Decorators are simple and expressive way to modify function without editing the source of the function itself. Or, the other way around, to modify multiple functions in the same way, without code duplication (DRY).
Decorators can be spotted in the code by starting with @ character. Decorator is a function (or class) that can do some additional work before or after the call to the decorated function. It can even call the decorated function multiple times, or not at all.
Decorators can be implemented as closures (my fave), or as classes; the following approaches are equivalent:
# using a function (closure) as a decorator def beforeAndAfter(f): def decorated_fn(): print("Before", f.__name__) f() print("After", f.__name__) return decorated_fn @entryExit def func(): print "func() is in da' house!" func()
# using a class as a decorator class beforeAndAfter(object): def __init__(self, f): self.f = f def __call__(self): print("Before", self.f.__name__) self.f() print("After", self.f.__name__) @beforeAndAfter def func(): print "func() is in da' house!" func()
Note that in functional decorator, anything outside the body of decorated_fn() is equivalent to content of the __init__() constructor of the class-based decorator. This code is run during the initialization, only once for each decorated function, regardless of whether the decorated function is ever called in the code - you should avoid any heavy lifting there.
Decorators can also accept arguments, and (obviously should) forward arguments to the decorated function:
def beforeAndAfter(p1, p2): def wrap(f): def wrapped_f(*args, **kwargs): print "Decorator arguments:", p1, p2 print("Before", f.__name__) f(*args, **kwargs) print("After", f.__name__) return wrapped_f return wrap @beforeAndAfter("hello", "world") def func(a, b=2): print "func() is in da' house,", a, b func(1)
virtualenv
I prefer installing virtualenv tool through pip, to make sure those are in sync version-wise.
# create a new venv # it is better to keep venv's in some separate folder, not to pollute your project folder $ virtualenv ~/.virtualenvs/my-new-env # "log into" your venv; success can be seen by your command line being prefixed by venv name $ source ~/.virtualenvs/my-new-env/bin/activate # now you can install anything you need, tucked away in your venv: (my-new-env) $ pip install -r requirements.txt # when done, just deactivate (my-new-env) $ deactivate
SQLAlchemy
Some Basics
Select, Where, Group
Updating
session.query(Table).\ filter(Table.id.in_(...)).\ update({ Table.column1: 10, Table.column2: Table.column2 + 50 })
Scalar Values
max_value = session.query(sqlalchemy.func.max(Table.column)).scalar() # or, for multiple, just use session.query(sqlalchemy.func.min(Table.column), sqlalchemy.func.max(Table.column)).first()
Make SQLAlchemy Read-Only
All write operations in SQLAlchemy pass through flush() method of your session. Just monkey-path it to do nothing!
engine = create_engine("connection string") Session = sessionmaker(bind=engine, autoflush=False, autocommit=False) session = Session() session.flush = lambda *args,**kwargs: None
Making Session Self-Recoverable
Some DB's, most prominently Postgres, do not recover well from errors (e.g. Pg just keeps saying that there's error in current transaction). One way is to rollback the transaction on "any" error:
from contextlib import contextmanager from typing import ContextManager from sqlalchemy.orm.session import Session @contextmanager def db_session() -> ContextManager[Session]: session = init() # get the session any way you like try: yield session except: session.rollback() raise else: session.commit() def use_db(): with db_session() as session: # use session object
Show Create Statement for Your Model, including Indexes
from sqlalchemy.schema import CreateTable, CreateIndex from sqlalchemy.dialects import postgresql def print_create_statement(model): print(CreateTable(model.__table__).compile(dialect=postgresql.dialect())) for index in model.__table__.indexes: print(CreateIndex(index).compile(dialect=postgresql.dialect()))
List Tables and Their Columns
import sqlalchemy as sqla db_conn_str = "..your connection string.." engine = sqla.create_engine(db_conn_str) inspector = sqla.inspect(engine) schemas = inspector.get_schema_names() for schema in schemas: print("schema: {}".format(schema)) for table_name in inspector.get_table_names(schema=schema): cols = ['{} ({})'.format(col['name'], col['type']) for col in inspector.get_columns(table_name, schema=schema)] print("Table: {} ({})".format(table_name, ', '.join(cols)))
select count(*)
session.query(MyModel).with_entities(func.count()).scalar()
Subqueries
Creating subqueries is pretty straightforward in SQLAlchemy.
To emit
SELECT sub.c1, sub.c2 FROM (SELECT x as c1, y as c2 FROM table) AS sub
you define the sub and then query on it as usual, with the exception of bit more explicit access to columns:
sub = session.query(table.x.label('c1'), table.y.label('c2')).\ cte('sub') res = session.query(sub.c.c1, sub.c.c2).\ all()