Skip to content
Snippets Groups Projects
Commit 6fc0e29e authored by pdler's avatar pdler
Browse files

all i have got

parents
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
# This example shows how new documents can be added.
#
# Documentation: http://docs.basex.org/wiki/Clients
#
# (C) BaseX Team 2005-12, BSD License
import BaseXClient
# create session
session = BaseXClient.Session('localhost', 1984, 'admin', 'admin')
try:
# create empty database
session.execute("create db database")
print(session.info())
# add document
session.add("world/World.xml", "<x>Hello World!</x>")
print(session.info())
# add document
session.add("Universe.xml", "<x>Hello Universe!</x>")
print(session.info())
# run query on database
print("\n" + session.execute("xquery collection('database')"))
# drop database
session.execute("drop db database")
finally:
# close session
if session:
session.close()
# -*- coding: utf-8 -*-
"""
Python 2.7.3 and 3.x client for BaseX.
Works with BaseX 7.0 and later
Requires Python 3.x or Python 2.x having some backports like bytearray.
(I've tested Python 3.2.3, and Python 2.7.3 on Fedora 16 linux x86_64.)
LIMITATIONS:
* binary content would corrupt, maybe. (I didn't test it)
* also, will fail to extract stored binary content, maybe.
(both my code, and original don't care escaped 0xff.)
Documentation: http://docs.basex.org/wiki/Clients
(C) 2012, Hiroaki Itoh. BSD License
updated 2014 by Marc van Grootel
"""
import hashlib
import socket
import threading
# ---------------------------------
#
class SocketWrapper(object):
"""a wrapper to python native socket module."""
def __init__(self, sock,
receive_bytes_encoding='utf-8',
send_bytes_encoding='utf-8'):
self.receive_bytes_encoding = receive_bytes_encoding
self.send_bytes_encoding = send_bytes_encoding
self.terminator = bytearray(chr(0), self.receive_bytes_encoding)
self.__s = sock
self.__buf = bytearray(chr(0) * 0x1000, self.receive_bytes_encoding)
self.__bpos = 0
self.__bsize = 0
def clear_buffer(self):
"""reset buffer status for next invocation ``recv_until_terminator()``
or ``recv_single_byte()``."""
self.__bpos = 0
self.__bsize = 0
def __fill_buffer(self):
"""cache next bytes"""
if self.__bpos >= self.__bsize:
self.__bsize = self.__s.recv_into(self.__buf)
self.__bpos = 0
# Returns a single byte from the socket.
def recv_single_byte(self):
"""recv a single byte from previously fetched buffer."""
self.__fill_buffer()
result_byte = self.__buf[self.__bpos]
self.__bpos += 1
return result_byte
# Reads until terminator byte is found.
def recv_until_terminator(self):
"""recv a nul(or specified as terminator_byte)-terminated whole string
from previously fetched buffer."""
result_bytes = bytearray()
while True:
self.__fill_buffer()
pos = self.__buf.find(self.terminator, self.__bpos, self.__bsize)
if pos >= 0:
result_bytes.extend(self.__buf[self.__bpos:pos])
self.__bpos = pos + 1
break
else:
result_bytes.extend(self.__buf[self.__bpos:self.__bsize])
self.__bpos = self.__bsize
return result_bytes.decode(self.receive_bytes_encoding)
def sendall(self, data):
"""sendall with specified byte encoding if data is not bytearray, bytes
(maybe str). if data is bytearray or bytes, it will be passed to native sendall API
directly."""
if isinstance(data, (bytearray, bytes)):
return self.__s.sendall(data)
return self.__s.sendall(bytearray(data, self.send_bytes_encoding))
def __getattr__(self, name):
return lambda *arg, **kw: getattr(self.__s, name)(*arg, **kw)
# ---------------------------------
#
class Session(object):
"""class Session.
see http://docs.basex.org/wiki/Server_Protocol
"""
def __init__(self, host, port, user, password,
receive_bytes_encoding='utf-8',
send_bytes_encoding='utf-8'):
"""Create and return session with host, port, user name and password"""
self.__info = None
# create server connection
self.__swrapper = SocketWrapper(
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
receive_bytes_encoding=receive_bytes_encoding,
send_bytes_encoding=send_bytes_encoding)
self.__swrapper.connect((host, port))
# receive timestamp
response = self.recv_c_str().split(':')
# send username and hashed password/timestamp
hfun = hashlib.md5()
if len(response) > 1:
code = "%s:%s:%s" % (user, response[0], password)
nonce = response[1]
else:
code = password
nonce = response[0]
hfun.update(hashlib.md5(code.encode('us-ascii')).hexdigest().encode('us-ascii'))
hfun.update(nonce.encode('us-ascii'))
self.send(user + chr(0) + hfun.hexdigest())
# evaluate success flag
if not self.server_response_success():
raise IOError('Access Denied.')
def execute(self, com):
"""Execute a command and return the result"""
# send command to server
self.send(com)
# receive result
result = self.receive()
self.__info = self.recv_c_str()
if not self.server_response_success():
raise IOError(self.__info)
return result
def query(self, querytxt):
"""Creates a new query instance (having id returned from server)."""
return Query(self, querytxt)
def create(self, name, content):
"""Creates a new database with the specified input (may be empty)."""
self.__send_input(8, name, content)
def add(self, path, content):
"""Adds a new resource to the opened database."""
self.__send_input(9, path, content)
def replace(self, path, content):
"""Replaces a resource with the specified input."""
self.__send_input(12, path, content)
def store(self, path, content):
"""Stores a binary resource in the opened database.
api won't escape 0x00, 0xff automatically, so you must do it
yourself explicitly."""
# ------------------------------------------
# chr(13) + path + chr(0) + content + chr(0)
self.__send_binary_input(13, path, content)
#
# ------------------------------------------
def info(self):
"""Return process information"""
return self.__info
def close(self):
"""Close the session"""
self.send('exit')
self.__swrapper.close()
def recv_c_str(self):
"""Retrieve a string from the socket"""
return self.__swrapper.recv_until_terminator()
def send(self, value):
"""Send the defined string"""
self.__swrapper.sendall(value + chr(0))
def __send_input(self, code, arg, content):
"""internal. don't care."""
self.__swrapper.sendall(chr(code) + arg + chr(0) + content + chr(0))
self.__info = self.recv_c_str()
if not self.server_response_success():
raise IOError(self.info())
def __send_binary_input(self, code, path, content):
"""internal. don't care."""
# at this time, we can't use __send_input itself because of encoding
# problem. we have to build bytearray directly.
if not isinstance(content, (bytearray, bytes)):
raise ValueError("Sorry, content must be bytearray or bytes, not " +
str(type(content)))
# ------------------------------------------
# chr(code) + path + chr(0) + content + chr(0)
data = bytearray([code])
try:
data.extend(path)
except:
data.extend(path.encode('utf-8'))
data.extend([0])
data.extend(content)
data.extend([0])
#
# ------------------------------------------
self.__swrapper.sendall(data)
self.__info = self.recv_c_str()
if not self.server_response_success():
raise IOError(self.info())
def server_response_success(self):
"""Return success check"""
return self.__swrapper.recv_single_byte() == 0
def receive(self):
"""Return received string"""
self.__swrapper.clear_buffer()
return self.recv_c_str()
def iter_receive(self):
"""iter_receive() -> (typecode, item)
iterate while the query returns items.
typecode list is in http://docs.basex.org/wiki/Server_Protocol:_Types
"""
self.__swrapper.clear_buffer()
typecode = self.__swrapper.recv_single_byte()
while typecode > 0:
string = self.recv_c_str()
yield (typecode, string)
typecode = self.__swrapper.recv_single_byte()
if not self.server_response_success():
raise IOError(self.recv_c_str())
# ---------------------------------
#
class Query():
"""class Query.
see http://docs.basex.org/wiki/Server_Protocol
"""
def __init__(self, session, querytxt):
"""Create query object with session and query"""
self.__session = session
self.__id = self.__exc(chr(0), querytxt)
def bind(self, name, value, datatype=''):
"""Binds a value to a variable.
An empty string can be specified as data type."""
self.__exc(chr(3), self.__id + chr(0) + name + chr(0) + value + chr(0) + datatype)
def context(self, value, datatype=''):
"""Bind the context item"""
self.__exc(chr(14), self.__id + chr(0) + value + chr(0) + datatype)
def iter(self):
"""iterate while the query returns items"""
self.__session.send(chr(4) + self.__id)
return self.__session.iter_receive()
def execute(self):
"""Execute the query and return the result"""
return self.__exc(chr(5), self.__id)
def info(self):
"""Return query information"""
return self.__exc(chr(6), self.__id)
def options(self):
"""Return serialization parameters"""
return self.__exc(chr(7), self.__id)
def updating(self):
"""Returns true if the query may perform updates; false otherwise."""
return self.__exc(chr(30), self.__id)
def full(self):
"""Returns all resulting items as strings, prefixed by XDM Meta Data."""
return self.__exc(chr(31), self.__id)
def close(self):
"""Close the query"""
self.__exc(chr(2), self.__id)
def __exc(self, cmd, arg):
"""internal. don't care."""
# should we expose this?
# (this makes sense only when mismatch between C/S is existing.)
self.__session.send(cmd + arg)
result = self.__session.receive()
if not self.__session.server_response_success():
raise IOError(self.__session.recv_c_str())
return result
# -*- coding: utf-8 -*-
# This example shows how new databases can be created.
#
# Documentation: http://docs.basex.org/wiki/Clients
#
# (C) BaseX Team 2005-12, BSD License
import BaseXClient
# create session
session = BaseXClient.Session('localhost', 1984, 'admin', 'admin')
try:
# create new database
session.create("database", "<x>Hello World!</x>")
print(session.info())
# run query on database
print("\n" + session.execute("xquery doc('database')"))
# drop database
session.execute("drop db database")
print(session.info())
finally:
# close session
if session:
session.close()
# -*- coding: utf-8 -*-
# This example shows how database commands can be executed.
#
# Documentation: http://docs.basex.org/wiki/Clients
#
# (C) BaseX Team 2005-12, BSD License
import BaseXClient, time
# initialize timer
start = time.clock()
# create session
session = BaseXClient.Session('localhost', 1984, 'admin', 'admin')
# perform command and print returned string
print(session.execute("xquery 1 to 10"))
# close session
session.close()
# print time needed
time = (time.clock() - start) * 1000
print("%.2f ms" % time)
# -*- coding: utf-8 -*-
# This example shows how external variables can be bound to XQuery expressions.
#
# Documentation: http://docs.basex.org/wiki/Clients
#
# (C) BaseX Team 2005-12, BSD License
import BaseXClient
# create session
session = BaseXClient.Session('localhost', 1984, 'admin', 'admin')
try:
# create query instance
input = "declare variable $name external; for $i in 1 to 10 return element { $name } { $i }"
query = session.query(input)
# bind variable
query.bind("$name", "number")
# print result
print(query.execute())
# close query object
query.close()
finally:
# close session
if session:
session.close()
# -*- coding: utf-8 -*-
# This example shows how queries can be executed in an iterative manner.
# Iterative evaluation will be slower, as more server requests are performed.
#
# Documentation: http://docs.basex.org/wiki/Clients
#
# (C) BaseX Team 2005-12, BSD License
import BaseXClient
# create session
session = BaseXClient.Session('localhost', 1984, 'admin', 'admin')
try:
# create query instance
input = "for $i in 1 to 10 return <xml>Text { $i }</xml>"
query = session.query(input)
# loop through all results
for typecode, item in query.iter():
print("typecode=%d" % typecode)
print("item=%s" % item)
# close query object
query.close()
finally:
# close session
if session:
session.close()
# -*- coding: utf-8 -*-
# Documentation: http://docs.basex.org/wiki/Clients
#
# (C) BaseX Team 2005-12, BSD License
import BaseXClient
import sys
if sys.version < '3': # i'm testing with Python 2.7.3
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
import xml.dom.minidom
# input encoding is utf-16le
doc = xml.dom.minidom.parse("UTF16example.xml")
# expat parser will decode it to real unicode, and rewrite processing instruction.
# so, we can send this (->toxml()) as content for basex, safely.
content = doc.toxml()
# str if Python 3.x, unicode if Python 2.x.
# (both are actually real unicode. (ucs2 or ucs4.))
print(type(content))
# create session
session = BaseXClient.Session('localhost', 1984, 'admin', 'admin')
try:
# create empty database
session.execute("create db py3clientexample")
print(session.info())
# add document
session.add("py3clientexample/originally_u16le.xml", content)
print(session.info())
# run query on database
query = session.query("""doc('py3clientexample')""")
for typecode, item in query.iter():
print("typecode=%d" % typecode)
print("item=%s" % item)
# drop database
session.execute("drop db py3clientexample")
print(session.info())
except Exception as e:
# print exception
print(repr(e))
finally:
# close session
if session:
session.close()
File added
api.py 0 → 100644
from flask import Flask, request, jsonify, make_response, g
from flask_cors import CORS
import BaseXClient
import threading
app = Flask(__name__)
CORS(app)
session = BaseXClient.Session('localhost', 1984, 'admin', 'admin')
@app.route('/', methods=['GET'])
def main():
#globvar = session.query("db:open(\"Holzschnitt\")")
session.execute("open Holzschnitt")
globvar = session.query("declare namespace lido=\"http://www.lido-schema.org\"; for $actor in distinct-values(//lido:actor/lido:nameActorSet/lido:appellationValue/text()) return $actor").execute()
#globvar = "ws"
#session.query("db:open(\"Holzschnitt\")").execute()
return globvar
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment