Csv Editor

from fasthtml.common import *
from uuid import uuid4

db = database('sqlite.db')
hdrs = (Style('''
button,input { margin: 0 1rem; }
[role="group"] { border: 1px solid #ccc; }
.edited { outline: 2px solid orange; }
'''), )
app, rt = fast_app(hdrs=hdrs)

@rt
async def get_test_file():
    import httpx
    url = "https://raw.githubusercontent.com/AnswerDotAI/FastHTML-Gallery/main/applications/start_simple/csv_editor/ex_data.csv"
    response = await httpx.AsyncClient().get(url)
    return Response(response.text, media_type="text/csv",
                        headers={'Content-Disposition': 'attachment; filename="ex_data.csv"'})

@rt
def index(sess):
    if 'id' not in sess: sess['id'] = str(uuid4())
    return Titled("CSV Uploader",
                 A('Download Example CSV', href="get_test_file", download="ex_data.csv"),
                 Group(Input(type="file", name="csv_file", accept=".csv"),
                       Button("Upload", hx_post="upload", hx_target="#results",
                              hx_encoding="multipart/form-data", hx_include='previous input'),
                       A('Download', href='download', type="button")),
                 Div(id="results"))

def render_row(row):
    vals = [Td(Input(value=v, name=k, oninput="this.classList.add('edited')")) for k,v in row.items()]
    vals.append(Td(Group(Button('delete', hx_delete=remove.rt(id=row['id']).lstrip('/')),
                   Button('update', hx_post='update', hx_include="closest tr"))))
    return Tr(*vals, hx_target='closest tr', hx_swap='outerHTML')

@rt
def download(sess):
    tbl = db[sess['id']]
    csv_data = [",".join(map(str, tbl.columns_dict))]
    csv_data += [",".join(map(str, row.values())) for row in tbl()]
    headers = {'Content-Disposition': 'attachment; filename="data.csv"'}
    return Response("\n".join(csv_data), media_type="text/csv", headers=headers)

@rt('/update')
def post(d:dict, sess): return render_row(db[sess['id']].update(d))

@app.delete('/remove')
def remove(id:int, sess): db[sess['id']].delete(id)

@rt("/upload")
def post(csv_file: UploadFile, sess):
    db[sess['id']].drop(ignore=True)
    if not csv_file.filename.endswith('.csv'): return "Please upload a CSV file"
    content = b''
    for i, line in enumerate(csv_file.file):
        if i >= 51: break
        content += line
    tbl = db.import_file(sess['id'], content, pk='id')
    header = Tr(*map(Th, tbl.columns_dict))
    vals = [render_row(row) for row in tbl()]
    return (Span('First 50 rows only', style="color: red;") if i>=51 else '', Table(Thead(header), Tbody(*vals)))

serve()