pandas协助工具
2024-08-25 12:37:41
pandas有时候操作很不方便,也有可能是我不熟练吧,反正就是各种别扭。下面是我写的一个简单的json数据操作工具,能够完成简单的数据分析工作,后续会不断完善的
# coding=utf-8
import logging
import sys
import ujson as json
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s [%(module)s][%(funcName)s][%(lineno)s] \n%(levelname)s: [%(message)s]\n",
)
log = logging.getLogger(__name__)
"""
'asctime',
'created',
'filename',
'funcName',
'levelname',
'levelno',
'lineno',
'module',
'msecs',
'message',
'name',
'pathname',
'process',
'processName',
'relativeCreated',
'thread',
'threadName'
"""
class pipe(object):
def __init__(self, data=None):
self._d = data
def pipe(self, func):
self._d = func(self._d)
return self
def result(self):
return self._d
class Field(object):
def __init__(self, data=None):
self._d = data or {}
def to_dict(self):
return self._d
def set(self, **kwargs):
self._d.update(kwargs)
return self
def __setitem__(self, i, y):
self._d[i] = y
def __getitem__(self, y):
if isinstance(y, (str, unicode)):
return self._d.get(y)
if isinstance(y, list):
return [self._d.get(i) for i in y]
def __delitem__(self, y):
if not isinstance(y, list):
y = [y]
for i in y:
try:
del self._d[i]
except:
pass
def drop(self, *y):
del self[y]
return self
class KTable(object):
def __init__(self, data=None, header=None):
data = data or []
self._d = {}
self._header = header
self.append(data)
def __set(self, i, y=None):
if isinstance(i, int):
try:
self._d[i].set(**y)
except:
self._d[i] = Field(y)
else:
k, v = i
self._d[k][v] = y
def __setitem__(self, i, y=None):
self.__set(i, y=y)
def set(self, i, y=None):
self.__set(i, y=y)
return self
def append(self, y=None):
if not isinstance(y, list):
y = [y]
for i in y:
try:
self[max(self._d.keys()) + 1] = i
except:
self[0] = i
return self
def map(self, func, *sequence_1):
if sequence_1 and isinstance(sequence_1[0], KTable):
sequence_1 = [i._d.itervalues() for i in sequence_1]
map(func, self._d.itervalues(), *sequence_1)
return self
def group_by(self, label, func=None):
k_label = self._d[0].to_dict().keys() if self._d else []
if not k_label:
return self
label = label if isinstance(label, list) else [label]
[k_label.remove(i) for i in label]
_d = {}
for k in self._d.itervalues():
_k = [k[i] for i in label]
_v = {i: k[i] for i in k_label}
try:
_d[tuple(_k)].append(_v)
except:
_d[tuple(_k)] = [_v]
_res = []
for i, v in _d.iteritems():
try:
_v = dict(map(lambda x, y: (x, y), label, list(i)))
_v.update(func(KTable(v)))
_res.append(_v)
except Exception, e:
log.error("{},{}".format(i, e.message))
return KTable(_res)
def sort_by(self, label=None):
if type(label) == str:
return KTable(sorted(self.to_dict(), key=lambda x: x[label]))
return self
def echo(self, y=None):
if not y:
print self
else:
print self[y]
return self
def distinct(self):
return KTable([dict(i) for i in {tuple(v.to_dict().items()) for v in self._d.itervalues()}])
def drop(self, k):
if getattr(k, '__call__'):
for i in self._d.iterkeys():
if k(self._d[i]):
del self._d[i]
else:
if not isinstance(k, list):
k = [k]
if isinstance(k[0], str):
for i in self._d.iterkeys():
del self._d[i][k]
else:
for i in self._d.iterkeys():
del self._d[i]
return self
def min(self):
pass
def max(self):
pass
def mean(self):
pass
def __get(self, y):
if isinstance(y, (int, str)):
y = [y]
if isinstance(y, tuple):
y = list(set((self._d.get(0) or Field()).to_dict().keys()) - set(y))
if isinstance(y[0], int):
return [(self._d.get(i) or Field()).to_dict() for i in y]
else:
self._header = y
return [{i: v[i] for i in y} for v in self._d.itervalues()]
def __getitem__(self, y):
if callable(y):
_d = []
for i in self._d.iterkeys():
_v = self._d[i]
_v['_i'] = i
if not y(_v):
continue
_d.append(_v.to_dict())
__v = KTable(_d, header=self._header)
self._header = None
return __v
if not isinstance(y, (str, int)) and len(y) == 2 and isinstance(y[0], int) and isinstance(y[1], str):
_d = self._d.get(y[0])
if not _d:
return None
return _d[y[1]]
__v = KTable(self.__get(y), header=self._header)
self._header = None
return __v
@property
def header(self):
return self._header
@header.setter
def header(self, header=None):
self._header = header
def pipe(self, func):
return func(self)
def __len__(self):
return len(self._d.keys())
def to_json(self):
return json.dumps([v.to_dict() for v in self._d.itervalues()])
def to_dict(self):
return [v.to_dict() for v in self._d.itervalues()]
def __str__(self):
print "--**-- data --**--"
if not self._header:
for v in self._d.itervalues():
print v.to_dict()
else:
print u"".join(["%-35s" % i for i in self._header])
_kk = self._d.keys()
_kk.sort()
for v in _kk:
print u"".join(["{" + "{}".format(i) + ":<30}" for i in self._header]).format(**self._d[v].to_dict())
print "--**-- end --**--"
return ""
if __name__ == '__main__':
tk = KTable([{
"a": 1,
"b": 2,
"c": "rr"
}, {
"a": 1,
"b": 2,
"c": "rrr"
}, {
"a": 1,
"b": 3,
"c": "rr"
}, {
"a": 2,
"b": 3,
"c": "rrr"
}])
print tk['a'].to_dict()
t = tk.map(lambda x: x.set(dd=0))
t.header = None
print t.map(lambda x: x.set(dd=0))
print t.map(
lambda x: x.set(w=x['c'], dd=x['dd'] + 1))[('a', 'b')][['c', 'dd']] # 用元组取值为exclude,list为include
print t.map(lambda x, y: x.set(w=x['a'] + y['a'], dd=x['dd'] + 1), t)
print t[1]
print t[lambda x: x['a'] == 1]
print t[lambda x: x['_i'] == 1]
print t['c'].distinct()
a = t[['a', 'b']].distinct().to_dict()[0]
print t[lambda x: x['a'] == 1 and x['b'] == 3][0, 'a']
print t.group_by('a', lambda x: {
"min": 1
}).sort_by('a')
最新文章
- hdu3549还是网络流
- 10款最好的 Bootstrap 3.0 免费主题和模板
- C# List<;T>;用法
- BOOTSTRAP定制
- ThinkPHP 修改,删除数据,全部显示
- webApp前端开发技巧总结
- OSPF+LVS ,qugga,vconfig,...感觉这些很有想法啊
- 设计模式之Application Programs and Toolkits
- SSH—Struts(三)—跑步者(Action)
- java 完全二叉树的构建与四种遍历方法
- TC358749XBG:HDMI转MIPI CSI芯片简介
- Mistakes in Hello World
- Android开发心得-使用File ExPlorer无法访问系统内部文件
- socket编程初识
- html弹出div
- Mysql索引的类型
- Maven常用的几个命令
- Google 开发的、最好用、功能最强大的网页测速与网站性能分析工具
- ELK实战(Springboot日志输出查找)
- C++ 获取程序编译时间